diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/balancer_v1_wrapper.go')
-rw-r--r-- | vendor/google.golang.org/grpc/balancer_v1_wrapper.go | 326 |
1 files changed, 326 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/balancer_v1_wrapper.go b/vendor/google.golang.org/grpc/balancer_v1_wrapper.go new file mode 100644 index 0000000..42b60fe --- /dev/null +++ b/vendor/google.golang.org/grpc/balancer_v1_wrapper.go | |||
@@ -0,0 +1,326 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2017 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package grpc | ||
20 | |||
21 | import ( | ||
22 | "context" | ||
23 | "strings" | ||
24 | "sync" | ||
25 | |||
26 | "google.golang.org/grpc/balancer" | ||
27 | "google.golang.org/grpc/connectivity" | ||
28 | "google.golang.org/grpc/grpclog" | ||
29 | "google.golang.org/grpc/resolver" | ||
30 | ) | ||
31 | |||
32 | type balancerWrapperBuilder struct { | ||
33 | b Balancer // The v1 balancer. | ||
34 | } | ||
35 | |||
36 | func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { | ||
37 | targetAddr := cc.Target() | ||
38 | targetSplitted := strings.Split(targetAddr, ":///") | ||
39 | if len(targetSplitted) >= 2 { | ||
40 | targetAddr = targetSplitted[1] | ||
41 | } | ||
42 | |||
43 | bwb.b.Start(targetAddr, BalancerConfig{ | ||
44 | DialCreds: opts.DialCreds, | ||
45 | Dialer: opts.Dialer, | ||
46 | }) | ||
47 | _, pickfirst := bwb.b.(*pickFirst) | ||
48 | bw := &balancerWrapper{ | ||
49 | balancer: bwb.b, | ||
50 | pickfirst: pickfirst, | ||
51 | cc: cc, | ||
52 | targetAddr: targetAddr, | ||
53 | startCh: make(chan struct{}), | ||
54 | conns: make(map[resolver.Address]balancer.SubConn), | ||
55 | connSt: make(map[balancer.SubConn]*scState), | ||
56 | csEvltr: &balancer.ConnectivityStateEvaluator{}, | ||
57 | state: connectivity.Idle, | ||
58 | } | ||
59 | cc.UpdateBalancerState(connectivity.Idle, bw) | ||
60 | go bw.lbWatcher() | ||
61 | return bw | ||
62 | } | ||
63 | |||
64 | func (bwb *balancerWrapperBuilder) Name() string { | ||
65 | return "wrapper" | ||
66 | } | ||
67 | |||
68 | type scState struct { | ||
69 | addr Address // The v1 address type. | ||
70 | s connectivity.State | ||
71 | down func(error) | ||
72 | } | ||
73 | |||
74 | type balancerWrapper struct { | ||
75 | balancer Balancer // The v1 balancer. | ||
76 | pickfirst bool | ||
77 | |||
78 | cc balancer.ClientConn | ||
79 | targetAddr string // Target without the scheme. | ||
80 | |||
81 | mu sync.Mutex | ||
82 | conns map[resolver.Address]balancer.SubConn | ||
83 | connSt map[balancer.SubConn]*scState | ||
84 | // This channel is closed when handling the first resolver result. | ||
85 | // lbWatcher blocks until this is closed, to avoid race between | ||
86 | // - NewSubConn is created, cc wants to notify balancer of state changes; | ||
87 | // - Build hasn't return, cc doesn't have access to balancer. | ||
88 | startCh chan struct{} | ||
89 | |||
90 | // To aggregate the connectivity state. | ||
91 | csEvltr *balancer.ConnectivityStateEvaluator | ||
92 | state connectivity.State | ||
93 | } | ||
94 | |||
95 | // lbWatcher watches the Notify channel of the balancer and manages | ||
96 | // connections accordingly. | ||
97 | func (bw *balancerWrapper) lbWatcher() { | ||
98 | <-bw.startCh | ||
99 | notifyCh := bw.balancer.Notify() | ||
100 | if notifyCh == nil { | ||
101 | // There's no resolver in the balancer. Connect directly. | ||
102 | a := resolver.Address{ | ||
103 | Addr: bw.targetAddr, | ||
104 | Type: resolver.Backend, | ||
105 | } | ||
106 | sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{}) | ||
107 | if err != nil { | ||
108 | grpclog.Warningf("Error creating connection to %v. Err: %v", a, err) | ||
109 | } else { | ||
110 | bw.mu.Lock() | ||
111 | bw.conns[a] = sc | ||
112 | bw.connSt[sc] = &scState{ | ||
113 | addr: Address{Addr: bw.targetAddr}, | ||
114 | s: connectivity.Idle, | ||
115 | } | ||
116 | bw.mu.Unlock() | ||
117 | sc.Connect() | ||
118 | } | ||
119 | return | ||
120 | } | ||
121 | |||
122 | for addrs := range notifyCh { | ||
123 | grpclog.Infof("balancerWrapper: got update addr from Notify: %v\n", addrs) | ||
124 | if bw.pickfirst { | ||
125 | var ( | ||
126 | oldA resolver.Address | ||
127 | oldSC balancer.SubConn | ||
128 | ) | ||
129 | bw.mu.Lock() | ||
130 | for oldA, oldSC = range bw.conns { | ||
131 | break | ||
132 | } | ||
133 | bw.mu.Unlock() | ||
134 | if len(addrs) <= 0 { | ||
135 | if oldSC != nil { | ||
136 | // Teardown old sc. | ||
137 | bw.mu.Lock() | ||
138 | delete(bw.conns, oldA) | ||
139 | delete(bw.connSt, oldSC) | ||
140 | bw.mu.Unlock() | ||
141 | bw.cc.RemoveSubConn(oldSC) | ||
142 | } | ||
143 | continue | ||
144 | } | ||
145 | |||
146 | var newAddrs []resolver.Address | ||
147 | for _, a := range addrs { | ||
148 | newAddr := resolver.Address{ | ||
149 | Addr: a.Addr, | ||
150 | Type: resolver.Backend, // All addresses from balancer are all backends. | ||
151 | ServerName: "", | ||
152 | Metadata: a.Metadata, | ||
153 | } | ||
154 | newAddrs = append(newAddrs, newAddr) | ||
155 | } | ||
156 | if oldSC == nil { | ||
157 | // Create new sc. | ||
158 | sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{}) | ||
159 | if err != nil { | ||
160 | grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err) | ||
161 | } else { | ||
162 | bw.mu.Lock() | ||
163 | // For pickfirst, there should be only one SubConn, so the | ||
164 | // address doesn't matter. All states updating (up and down) | ||
165 | // and picking should all happen on that only SubConn. | ||
166 | bw.conns[resolver.Address{}] = sc | ||
167 | bw.connSt[sc] = &scState{ | ||
168 | addr: addrs[0], // Use the first address. | ||
169 | s: connectivity.Idle, | ||
170 | } | ||
171 | bw.mu.Unlock() | ||
172 | sc.Connect() | ||
173 | } | ||
174 | } else { | ||
175 | bw.mu.Lock() | ||
176 | bw.connSt[oldSC].addr = addrs[0] | ||
177 | bw.mu.Unlock() | ||
178 | oldSC.UpdateAddresses(newAddrs) | ||
179 | } | ||
180 | } else { | ||
181 | var ( | ||
182 | add []resolver.Address // Addresses need to setup connections. | ||
183 | del []balancer.SubConn // Connections need to tear down. | ||
184 | ) | ||
185 | resAddrs := make(map[resolver.Address]Address) | ||
186 | for _, a := range addrs { | ||
187 | resAddrs[resolver.Address{ | ||
188 | Addr: a.Addr, | ||
189 | Type: resolver.Backend, // All addresses from balancer are all backends. | ||
190 | ServerName: "", | ||
191 | Metadata: a.Metadata, | ||
192 | }] = a | ||
193 | } | ||
194 | bw.mu.Lock() | ||
195 | for a := range resAddrs { | ||
196 | if _, ok := bw.conns[a]; !ok { | ||
197 | add = append(add, a) | ||
198 | } | ||
199 | } | ||
200 | for a, c := range bw.conns { | ||
201 | if _, ok := resAddrs[a]; !ok { | ||
202 | del = append(del, c) | ||
203 | delete(bw.conns, a) | ||
204 | // Keep the state of this sc in bw.connSt until its state becomes Shutdown. | ||
205 | } | ||
206 | } | ||
207 | bw.mu.Unlock() | ||
208 | for _, a := range add { | ||
209 | sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{}) | ||
210 | if err != nil { | ||
211 | grpclog.Warningf("Error creating connection to %v. Err: %v", a, err) | ||
212 | } else { | ||
213 | bw.mu.Lock() | ||
214 | bw.conns[a] = sc | ||
215 | bw.connSt[sc] = &scState{ | ||
216 | addr: resAddrs[a], | ||
217 | s: connectivity.Idle, | ||
218 | } | ||
219 | bw.mu.Unlock() | ||
220 | sc.Connect() | ||
221 | } | ||
222 | } | ||
223 | for _, c := range del { | ||
224 | bw.cc.RemoveSubConn(c) | ||
225 | } | ||
226 | } | ||
227 | } | ||
228 | } | ||
229 | |||
230 | func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { | ||
231 | bw.mu.Lock() | ||
232 | defer bw.mu.Unlock() | ||
233 | scSt, ok := bw.connSt[sc] | ||
234 | if !ok { | ||
235 | return | ||
236 | } | ||
237 | if s == connectivity.Idle { | ||
238 | sc.Connect() | ||
239 | } | ||
240 | oldS := scSt.s | ||
241 | scSt.s = s | ||
242 | if oldS != connectivity.Ready && s == connectivity.Ready { | ||
243 | scSt.down = bw.balancer.Up(scSt.addr) | ||
244 | } else if oldS == connectivity.Ready && s != connectivity.Ready { | ||
245 | if scSt.down != nil { | ||
246 | scSt.down(errConnClosing) | ||
247 | } | ||
248 | } | ||
249 | sa := bw.csEvltr.RecordTransition(oldS, s) | ||
250 | if bw.state != sa { | ||
251 | bw.state = sa | ||
252 | } | ||
253 | bw.cc.UpdateBalancerState(bw.state, bw) | ||
254 | if s == connectivity.Shutdown { | ||
255 | // Remove state for this sc. | ||
256 | delete(bw.connSt, sc) | ||
257 | } | ||
258 | } | ||
259 | |||
260 | func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) { | ||
261 | bw.mu.Lock() | ||
262 | defer bw.mu.Unlock() | ||
263 | select { | ||
264 | case <-bw.startCh: | ||
265 | default: | ||
266 | close(bw.startCh) | ||
267 | } | ||
268 | // There should be a resolver inside the balancer. | ||
269 | // All updates here, if any, are ignored. | ||
270 | } | ||
271 | |||
272 | func (bw *balancerWrapper) Close() { | ||
273 | bw.mu.Lock() | ||
274 | defer bw.mu.Unlock() | ||
275 | select { | ||
276 | case <-bw.startCh: | ||
277 | default: | ||
278 | close(bw.startCh) | ||
279 | } | ||
280 | bw.balancer.Close() | ||
281 | } | ||
282 | |||
283 | // The picker is the balancerWrapper itself. | ||
284 | // Pick should never return ErrNoSubConnAvailable. | ||
285 | // It either blocks or returns error, consistent with v1 balancer Get(). | ||
286 | func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { | ||
287 | failfast := true // Default failfast is true. | ||
288 | if ss, ok := rpcInfoFromContext(ctx); ok { | ||
289 | failfast = ss.failfast | ||
290 | } | ||
291 | a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast}) | ||
292 | if err != nil { | ||
293 | return nil, nil, err | ||
294 | } | ||
295 | var done func(balancer.DoneInfo) | ||
296 | if p != nil { | ||
297 | done = func(i balancer.DoneInfo) { p() } | ||
298 | } | ||
299 | var sc balancer.SubConn | ||
300 | bw.mu.Lock() | ||
301 | defer bw.mu.Unlock() | ||
302 | if bw.pickfirst { | ||
303 | // Get the first sc in conns. | ||
304 | for _, sc = range bw.conns { | ||
305 | break | ||
306 | } | ||
307 | } else { | ||
308 | var ok bool | ||
309 | sc, ok = bw.conns[resolver.Address{ | ||
310 | Addr: a.Addr, | ||
311 | Type: resolver.Backend, | ||
312 | ServerName: "", | ||
313 | Metadata: a.Metadata, | ||
314 | }] | ||
315 | if !ok && failfast { | ||
316 | return nil, nil, balancer.ErrTransientFailure | ||
317 | } | ||
318 | if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) { | ||
319 | // If the returned sc is not ready and RPC is failfast, | ||
320 | // return error, and this RPC will fail. | ||
321 | return nil, nil, balancer.ErrTransientFailure | ||
322 | } | ||
323 | } | ||
324 | |||
325 | return sc, done, nil | ||
326 | } | ||