3
3
import com .google .common .collect .ImmutableSet ;
4
4
import com .google .common .collect .Sets ;
5
5
import com .google .protobuf .Message ;
6
+ import io .envoyproxy .envoy .api .v2 .ClusterLoadAssignment ;
6
7
import io .envoyproxy .envoy .api .v2 .DiscoveryRequest ;
7
8
import java .util .Collection ;
9
+ import java .util .Collections ;
8
10
import java .util .HashMap ;
9
11
import java .util .Map ;
10
12
import java .util .Objects ;
18
20
import java .util .function .Consumer ;
19
21
import java .util .stream .Collectors ;
20
22
import javax .annotation .concurrent .GuardedBy ;
23
+
21
24
import org .slf4j .Logger ;
22
25
import org .slf4j .LoggerFactory ;
23
26
@@ -261,21 +264,42 @@ private Response createResponse(DiscoveryRequest request, Map<String, ? extends
261
264
262
265
private boolean respond (Watch watch , Snapshot snapshot , T group ) {
263
266
Map <String , ? extends Message > snapshotResources = snapshot .resources (watch .request ().getTypeUrl ());
267
+ Map <String , ClusterLoadAssignment > snapshotWithMissingResources = Collections .emptyMap ();
264
268
265
269
if (!watch .request ().getResourceNamesList ().isEmpty () && watch .ads ()) {
266
270
Collection <String > missingNames = watch .request ().getResourceNamesList ().stream ()
267
271
.filter (name -> !snapshotResources .containsKey (name ))
268
272
.collect (Collectors .toList ());
269
273
270
- if (!missingNames .isEmpty ()) {
274
+ // When Envoy receive CDS response and reconnect to new instance of control-plane before EDS was sent, Envoy might
275
+ // stack in warming phase. New instance of control-plane might not have cluster in snapshot and won't be able to
276
+ // respond. First request from Envoy contains empty string version info.
277
+ if (!missingNames .isEmpty ()
278
+ && watch .request ().getTypeUrl ().equals (Resources .ENDPOINT_TYPE_URL )
279
+ && watch .request ().getVersionInfo ().equals ("" )) {
280
+ LOGGER .info ("adding missing resources [{}] to response for {} in ADS mode from node {} at version {}" ,
281
+ String .join (", " , missingNames ),
282
+ watch .request ().getTypeUrl (),
283
+ group ,
284
+ snapshot .version (watch .request ().getTypeUrl (), watch .request ().getResourceNamesList ())
285
+ );
286
+ snapshotWithMissingResources = new HashMap <>(missingNames .size () + snapshotResources .size ());
287
+ for (String missingName : missingNames ) {
288
+ snapshotWithMissingResources .put (
289
+ missingName ,
290
+ ClusterLoadAssignment .newBuilder ().setClusterName (missingName ).build ()
291
+ );
292
+ snapshotWithMissingResources .putAll (
293
+ (Map <? extends String , ? extends ClusterLoadAssignment >) snapshotResources );
294
+ }
295
+ } else if (!missingNames .isEmpty ()) {
271
296
LOGGER .info (
272
297
"not responding in ADS mode for {} from node {} at version {} for request [{}] since [{}] not in snapshot" ,
273
298
watch .request ().getTypeUrl (),
274
299
group ,
275
300
snapshot .version (watch .request ().getTypeUrl (), watch .request ().getResourceNamesList ()),
276
301
String .join (", " , watch .request ().getResourceNamesList ()),
277
302
String .join (", " , missingNames ));
278
-
279
303
return false ;
280
304
}
281
305
}
@@ -287,11 +311,18 @@ private boolean respond(Watch watch, Snapshot snapshot, T group) {
287
311
group ,
288
312
watch .request ().getVersionInfo (),
289
313
version );
290
-
291
- Response response = createResponse (
292
- watch .request (),
293
- snapshotResources ,
294
- version );
314
+ Response response ;
315
+ if (!snapshotWithMissingResources .isEmpty ()) {
316
+ response = createResponse (
317
+ watch .request (),
318
+ snapshotWithMissingResources ,
319
+ version );
320
+ } else {
321
+ response = createResponse (
322
+ watch .request (),
323
+ snapshotResources ,
324
+ version );
325
+ }
295
326
296
327
try {
297
328
watch .respond (response );
0 commit comments