Skip to content

Commit

Permalink
ensure we unsubscribe removed resources on reconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
sschepens committed Jan 24, 2024
1 parent 4577722 commit a8b3df6
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -526,10 +526,12 @@ private boolean respond(Watch watch, U snapshot, T group) {
}

private List<String> findRemovedResources(DeltaWatch watch, Map<String, VersionedResource<?>> snapshotResources) {
// remove resources for which client has a tracked version but do not exist in snapshot
return watch.trackedResources().keySet()
.stream()
// remove resources for which client has a tracked version or is pending a response but do not exist in snapshot.
// when reconnections occur, envoy sends a request subscribing to resources that could no longer exist in the snapshot
// and we don't count those as trackedResources because we don't know the version.
return Stream.concat(watch.trackedResources().keySet().stream(), watch.pendingResources().stream())
.filter(s -> !snapshotResources.containsKey(s))
.distinct()
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.protobuf.Message;
import io.envoyproxy.controlplane.cache.DeltaResponse;
import io.envoyproxy.controlplane.cache.DeltaWatch;
import io.envoyproxy.controlplane.cache.DeltaXdsRequest;
import io.envoyproxy.controlplane.cache.NodeGroup;
import io.envoyproxy.controlplane.cache.Resources;
import io.envoyproxy.controlplane.cache.Response;
Expand All @@ -20,6 +24,7 @@
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret;
import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -543,6 +548,35 @@ public void groups() {
assertThat(cache.groups()).containsExactly(SingleNodeGroup.GROUP);
}


@Test
public void respondRemovedResourcesWhenPendingResourceButNotTracked() {
SimpleCache<String> cache = new SimpleCache<>(new SingleNodeGroup());

cache.setSnapshot(SingleNodeGroup.GROUP, SNAPSHOT1);

DeltaResponseTracker responseTracker = new DeltaResponseTracker();

DeltaWatch watch = cache.createDeltaWatch(
DeltaXdsRequest.create(DeltaDiscoveryRequest.newBuilder()
.setNode(Node.getDefaultInstance())
.setTypeUrl(CLUSTER_TYPE_URL)
.addResourceNamesSubscribe("non_existant")
.addResourceNamesSubscribe("cluster0")
.build()),
"",
Collections.emptyMap(),
ImmutableSet.of("non_existant"),
true,
responseTracker, false);

assertThat(watch.isCancelled()).isFalse();
Assertions.assertThat(responseTracker.responses).isNotEmpty();

Assertions.assertThat(responseTracker.responses.get(0).removedResources()).isNotEmpty();
Assertions.assertThat(responseTracker.responses.get(0).removedResources()).containsOnly("non_existant");
}

private static class ResponseTracker implements Consumer<Response> {

private final LinkedList<Response> responses = new LinkedList<>();
Expand All @@ -564,6 +598,17 @@ public void accept(Response response) {
}
}

private static class DeltaResponseTracker implements Consumer<DeltaResponse> {

private final LinkedList<DeltaResponse> responses = new LinkedList<>();

@Override
public void accept(DeltaResponse response) {
responses.add(response);
}

}

private static class SingleNodeGroup implements NodeGroup<String> {

private static final String GROUP = "node";
Expand Down

0 comments on commit a8b3df6

Please sign in to comment.