diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/health/server.go')
-rw-r--r-- | vendor/google.golang.org/grpc/health/server.go | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/health/server.go b/vendor/google.golang.org/grpc/health/server.go new file mode 100644 index 0000000..c79f9d2 --- /dev/null +++ b/vendor/google.golang.org/grpc/health/server.go | |||
@@ -0,0 +1,165 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2017 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | //go:generate ./regenerate.sh | ||
20 | |||
21 | // Package health provides a service that exposes server's health and it must be | ||
22 | // imported to enable support for client-side health checks. | ||
23 | package health | ||
24 | |||
25 | import ( | ||
26 | "context" | ||
27 | "sync" | ||
28 | |||
29 | "google.golang.org/grpc/codes" | ||
30 | "google.golang.org/grpc/grpclog" | ||
31 | healthgrpc "google.golang.org/grpc/health/grpc_health_v1" | ||
32 | healthpb "google.golang.org/grpc/health/grpc_health_v1" | ||
33 | "google.golang.org/grpc/status" | ||
34 | ) | ||
35 | |||
36 | // Server implements `service Health`. | ||
37 | type Server struct { | ||
38 | mu sync.Mutex | ||
39 | // If shutdown is true, it's expected all serving status is NOT_SERVING, and | ||
40 | // will stay in NOT_SERVING. | ||
41 | shutdown bool | ||
42 | // statusMap stores the serving status of the services this Server monitors. | ||
43 | statusMap map[string]healthpb.HealthCheckResponse_ServingStatus | ||
44 | updates map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus | ||
45 | } | ||
46 | |||
47 | // NewServer returns a new Server. | ||
48 | func NewServer() *Server { | ||
49 | return &Server{ | ||
50 | statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING}, | ||
51 | updates: make(map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus), | ||
52 | } | ||
53 | } | ||
54 | |||
55 | // Check implements `service Health`. | ||
56 | func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { | ||
57 | s.mu.Lock() | ||
58 | defer s.mu.Unlock() | ||
59 | if servingStatus, ok := s.statusMap[in.Service]; ok { | ||
60 | return &healthpb.HealthCheckResponse{ | ||
61 | Status: servingStatus, | ||
62 | }, nil | ||
63 | } | ||
64 | return nil, status.Error(codes.NotFound, "unknown service") | ||
65 | } | ||
66 | |||
67 | // Watch implements `service Health`. | ||
68 | func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { | ||
69 | service := in.Service | ||
70 | // update channel is used for getting service status updates. | ||
71 | update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1) | ||
72 | s.mu.Lock() | ||
73 | // Puts the initial status to the channel. | ||
74 | if servingStatus, ok := s.statusMap[service]; ok { | ||
75 | update <- servingStatus | ||
76 | } else { | ||
77 | update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN | ||
78 | } | ||
79 | |||
80 | // Registers the update channel to the correct place in the updates map. | ||
81 | if _, ok := s.updates[service]; !ok { | ||
82 | s.updates[service] = make(map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus) | ||
83 | } | ||
84 | s.updates[service][stream] = update | ||
85 | defer func() { | ||
86 | s.mu.Lock() | ||
87 | delete(s.updates[service], stream) | ||
88 | s.mu.Unlock() | ||
89 | }() | ||
90 | s.mu.Unlock() | ||
91 | |||
92 | var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1 | ||
93 | for { | ||
94 | select { | ||
95 | // Status updated. Sends the up-to-date status to the client. | ||
96 | case servingStatus := <-update: | ||
97 | if lastSentStatus == servingStatus { | ||
98 | continue | ||
99 | } | ||
100 | lastSentStatus = servingStatus | ||
101 | err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus}) | ||
102 | if err != nil { | ||
103 | return status.Error(codes.Canceled, "Stream has ended.") | ||
104 | } | ||
105 | // Context done. Removes the update channel from the updates map. | ||
106 | case <-stream.Context().Done(): | ||
107 | return status.Error(codes.Canceled, "Stream has ended.") | ||
108 | } | ||
109 | } | ||
110 | } | ||
111 | |||
112 | // SetServingStatus is called when need to reset the serving status of a service | ||
113 | // or insert a new service entry into the statusMap. | ||
114 | func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) { | ||
115 | s.mu.Lock() | ||
116 | defer s.mu.Unlock() | ||
117 | if s.shutdown { | ||
118 | grpclog.Infof("health: status changing for %s to %v is ignored because health service is shutdown", service, servingStatus) | ||
119 | return | ||
120 | } | ||
121 | |||
122 | s.setServingStatusLocked(service, servingStatus) | ||
123 | } | ||
124 | |||
125 | func (s *Server) setServingStatusLocked(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) { | ||
126 | s.statusMap[service] = servingStatus | ||
127 | for _, update := range s.updates[service] { | ||
128 | // Clears previous updates, that are not sent to the client, from the channel. | ||
129 | // This can happen if the client is not reading and the server gets flow control limited. | ||
130 | select { | ||
131 | case <-update: | ||
132 | default: | ||
133 | } | ||
134 | // Puts the most recent update to the channel. | ||
135 | update <- servingStatus | ||
136 | } | ||
137 | } | ||
138 | |||
139 | // Shutdown sets all serving status to NOT_SERVING, and configures the server to | ||
140 | // ignore all future status changes. | ||
141 | // | ||
142 | // This changes serving status for all services. To set status for a perticular | ||
143 | // services, call SetServingStatus(). | ||
144 | func (s *Server) Shutdown() { | ||
145 | s.mu.Lock() | ||
146 | defer s.mu.Unlock() | ||
147 | s.shutdown = true | ||
148 | for service := range s.statusMap { | ||
149 | s.setServingStatusLocked(service, healthpb.HealthCheckResponse_NOT_SERVING) | ||
150 | } | ||
151 | } | ||
152 | |||
153 | // Resume sets all serving status to SERVING, and configures the server to | ||
154 | // accept all future status changes. | ||
155 | // | ||
156 | // This changes serving status for all services. To set status for a perticular | ||
157 | // services, call SetServingStatus(). | ||
158 | func (s *Server) Resume() { | ||
159 | s.mu.Lock() | ||
160 | defer s.mu.Unlock() | ||
161 | s.shutdown = false | ||
162 | for service := range s.statusMap { | ||
163 | s.setServingStatusLocked(service, healthpb.HealthCheckResponse_SERVING) | ||
164 | } | ||
165 | } | ||