]>
Commit | Line | Data |
---|---|---|
107c1cdb ND |
1 | /* |
2 | * | |
3 | * Copyright 2017 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | //go:generate ./regenerate.sh | |
20 | ||
21 | // Package health provides a service that exposes server's health and it must be | |
22 | // imported to enable support for client-side health checks. | |
23 | package health | |
24 | ||
25 | import ( | |
26 | "context" | |
27 | "sync" | |
28 | ||
29 | "google.golang.org/grpc/codes" | |
30 | "google.golang.org/grpc/grpclog" | |
31 | healthgrpc "google.golang.org/grpc/health/grpc_health_v1" | |
32 | healthpb "google.golang.org/grpc/health/grpc_health_v1" | |
33 | "google.golang.org/grpc/status" | |
34 | ) | |
35 | ||
36 | // Server implements `service Health`. | |
37 | type Server struct { | |
38 | mu sync.Mutex | |
39 | // If shutdown is true, it's expected all serving status is NOT_SERVING, and | |
40 | // will stay in NOT_SERVING. | |
41 | shutdown bool | |
42 | // statusMap stores the serving status of the services this Server monitors. | |
43 | statusMap map[string]healthpb.HealthCheckResponse_ServingStatus | |
44 | updates map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus | |
45 | } | |
46 | ||
47 | // NewServer returns a new Server. | |
48 | func NewServer() *Server { | |
49 | return &Server{ | |
50 | statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING}, | |
51 | updates: make(map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus), | |
52 | } | |
53 | } | |
54 | ||
55 | // Check implements `service Health`. | |
56 | func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { | |
57 | s.mu.Lock() | |
58 | defer s.mu.Unlock() | |
59 | if servingStatus, ok := s.statusMap[in.Service]; ok { | |
60 | return &healthpb.HealthCheckResponse{ | |
61 | Status: servingStatus, | |
62 | }, nil | |
63 | } | |
64 | return nil, status.Error(codes.NotFound, "unknown service") | |
65 | } | |
66 | ||
67 | // Watch implements `service Health`. | |
68 | func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { | |
69 | service := in.Service | |
70 | // update channel is used for getting service status updates. | |
71 | update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1) | |
72 | s.mu.Lock() | |
73 | // Puts the initial status to the channel. | |
74 | if servingStatus, ok := s.statusMap[service]; ok { | |
75 | update <- servingStatus | |
76 | } else { | |
77 | update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN | |
78 | } | |
79 | ||
80 | // Registers the update channel to the correct place in the updates map. | |
81 | if _, ok := s.updates[service]; !ok { | |
82 | s.updates[service] = make(map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus) | |
83 | } | |
84 | s.updates[service][stream] = update | |
85 | defer func() { | |
86 | s.mu.Lock() | |
87 | delete(s.updates[service], stream) | |
88 | s.mu.Unlock() | |
89 | }() | |
90 | s.mu.Unlock() | |
91 | ||
92 | var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1 | |
93 | for { | |
94 | select { | |
95 | // Status updated. Sends the up-to-date status to the client. | |
96 | case servingStatus := <-update: | |
97 | if lastSentStatus == servingStatus { | |
98 | continue | |
99 | } | |
100 | lastSentStatus = servingStatus | |
101 | err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus}) | |
102 | if err != nil { | |
103 | return status.Error(codes.Canceled, "Stream has ended.") | |
104 | } | |
105 | // Context done. Removes the update channel from the updates map. | |
106 | case <-stream.Context().Done(): | |
107 | return status.Error(codes.Canceled, "Stream has ended.") | |
108 | } | |
109 | } | |
110 | } | |
111 | ||
112 | // SetServingStatus is called when need to reset the serving status of a service | |
113 | // or insert a new service entry into the statusMap. | |
114 | func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) { | |
115 | s.mu.Lock() | |
116 | defer s.mu.Unlock() | |
117 | if s.shutdown { | |
118 | grpclog.Infof("health: status changing for %s to %v is ignored because health service is shutdown", service, servingStatus) | |
119 | return | |
120 | } | |
121 | ||
122 | s.setServingStatusLocked(service, servingStatus) | |
123 | } | |
124 | ||
125 | func (s *Server) setServingStatusLocked(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) { | |
126 | s.statusMap[service] = servingStatus | |
127 | for _, update := range s.updates[service] { | |
128 | // Clears previous updates, that are not sent to the client, from the channel. | |
129 | // This can happen if the client is not reading and the server gets flow control limited. | |
130 | select { | |
131 | case <-update: | |
132 | default: | |
133 | } | |
134 | // Puts the most recent update to the channel. | |
135 | update <- servingStatus | |
136 | } | |
137 | } | |
138 | ||
139 | // Shutdown sets all serving status to NOT_SERVING, and configures the server to | |
140 | // ignore all future status changes. | |
141 | // | |
142 | // This changes serving status for all services. To set status for a perticular | |
143 | // services, call SetServingStatus(). | |
144 | func (s *Server) Shutdown() { | |
145 | s.mu.Lock() | |
146 | defer s.mu.Unlock() | |
147 | s.shutdown = true | |
148 | for service := range s.statusMap { | |
149 | s.setServingStatusLocked(service, healthpb.HealthCheckResponse_NOT_SERVING) | |
150 | } | |
151 | } | |
152 | ||
153 | // Resume sets all serving status to SERVING, and configures the server to | |
154 | // accept all future status changes. | |
155 | // | |
156 | // This changes serving status for all services. To set status for a perticular | |
157 | // services, call SetServingStatus(). | |
158 | func (s *Server) Resume() { | |
159 | s.mu.Lock() | |
160 | defer s.mu.Unlock() | |
161 | s.shutdown = false | |
162 | for service := range s.statusMap { | |
163 | s.setServingStatusLocked(service, healthpb.HealthCheckResponse_SERVING) | |
164 | } | |
165 | } |