@@ -364,10 +364,11 @@ public Collection<T> groups() {
364
364
public synchronized void setSnapshot (T group , U snapshot ) {
365
365
// we take a writeLock to prevent watches from being created while we update the snapshot
366
366
ConcurrentMap <ResourceType , CacheStatusInfo <T >> status ;
367
+ U previousSnapshot ;
367
368
writeLock .lock ();
368
369
try {
369
370
// Update the existing snapshot entry.
370
- snapshots .put (group , snapshot );
371
+ previousSnapshot = snapshots .put (group , snapshot );
371
372
status = statuses .get (group );
372
373
} finally {
373
374
writeLock .unlock ();
@@ -379,7 +380,7 @@ public synchronized void setSnapshot(T group, U snapshot) {
379
380
380
381
// Responses should be in specific order and typeUrls has a list of resources in the right
381
382
// order.
382
- respondWithSpecificOrder (group , snapshot , status );
383
+ respondWithSpecificOrder (group , previousSnapshot , snapshot , status );
383
384
}
384
385
385
386
/**
@@ -403,7 +404,7 @@ public StatusInfo statusInfo(T group) {
403
404
404
405
@ VisibleForTesting
405
406
protected void respondWithSpecificOrder (T group ,
406
- U snapshot ,
407
+ U previousSnapshot , U snapshot ,
407
408
ConcurrentMap <ResourceType , CacheStatusInfo <T >> statusMap ) {
408
409
for (ResourceType resourceType : RESOURCE_TYPES_IN_ORDER ) {
409
410
CacheStatusInfo <T > status = statusMap .get (resourceType );
@@ -435,6 +436,53 @@ protected void respondWithSpecificOrder(T group,
435
436
// Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond.
436
437
return false ;
437
438
});
439
+
440
+ Map <String , SnapshotResource <?>> previousResources = previousSnapshot == null
441
+ ? Collections .emptyMap ()
442
+ : previousSnapshot .resources (resourceType );
443
+ Map <String , SnapshotResource <?>> snapshotResources = snapshot .resources (resourceType );
444
+
445
+ Map <String , SnapshotResource <?>> snapshotChangedResources = snapshotResources .entrySet ()
446
+ .stream ()
447
+ .filter (entry -> {
448
+ SnapshotResource <?> snapshotResource = previousResources .get (entry .getKey ());
449
+ return snapshotResource == null || !snapshotResource .version ().equals (entry .getValue ().version ());
450
+ })
451
+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
452
+
453
+ Set <String > snapshotRemovedResources = previousResources .keySet ()
454
+ .stream ()
455
+ .filter (s -> !snapshotResources .containsKey (s ))
456
+ .collect (Collectors .toSet ());
457
+
458
+ status .deltaWatchesRemoveIf ((id , watch ) -> {
459
+ String version = snapshot .version (watch .request ().getResourceType (), Collections .emptyList ());
460
+
461
+ if (!watch .version ().equals (version )) {
462
+ if (LOGGER .isDebugEnabled ()) {
463
+ LOGGER .debug ("responding to open watch {}[{}] with new version {}" ,
464
+ id ,
465
+ String .join (", " , watch .trackedResources ().keySet ()),
466
+ version );
467
+ }
468
+
469
+ List <String > removedResources = snapshotRemovedResources .stream ()
470
+ .filter (s -> watch .trackedResources ().get (s ) != null )
471
+ .collect (Collectors .toList ());
472
+
473
+ ResponseState responseState = respondDeltaTracked (watch ,
474
+ snapshotChangedResources ,
475
+ removedResources ,
476
+ version ,
477
+ group );
478
+ // Discard the watch if it was responded or cancelled.
479
+ // A new watch will be created for future snapshots once envoy ACKs the response.
480
+ return ResponseState .RESPONDED .equals (responseState ) || ResponseState .CANCELLED .equals (responseState );
481
+ }
482
+
483
+ // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond.
484
+ return false ;
485
+ });
438
486
}
439
487
}
440
488
0 commit comments