From f6b5f3b1226e41f49f32f03f4600888883f17f29 Mon Sep 17 00:00:00 2001 From: Sebastian Schepens Date: Thu, 22 Oct 2020 14:38:18 -0300 Subject: [PATCH 1/3] delta xds Signed-off-by: Sebastian Schepens --- .../controlplane/cache/CacheStatusInfo.java | 58 +++++ .../controlplane/cache/ConfigWatcher.java | 22 ++ .../controlplane/cache/DeltaResponse.java | 41 +++ .../controlplane/cache/DeltaWatch.java | 121 +++++++++ .../controlplane/cache/DeltaXdsRequest.java | 118 +++++++++ .../cache/GroupCacheStatusInfo.java | 10 + .../controlplane/cache/Resources.java | 5 +- .../controlplane/cache/SimpleCache.java | 224 +++++++++++++++- .../controlplane/cache/Snapshot.java | 18 +- .../controlplane/cache/SnapshotResource.java | 32 +++ .../controlplane/cache/SnapshotResources.java | 15 +- .../controlplane/cache/StatusInfo.java | 10 + .../controlplane/cache/v2/Snapshot.java | 86 +++--- .../controlplane/cache/v3/Snapshot.java | 58 ++--- .../controlplane/cache/ResourcesTest.java | 127 +++++---- .../cache/SnapshotResourcesTest.java | 6 +- .../cache/v2/SimpleCacheTest.java | 97 ++++--- .../controlplane/cache/v2/SnapshotTest.java | 38 +-- .../cache/v3/SimpleCacheTest.java | 97 ++++--- .../controlplane/cache/v3/SnapshotTest.java | 37 +-- ...dsDeltaDiscoveryRequestStreamObserver.java | 163 ++++++++++++ .../AdsDiscoveryRequestStreamObserver.java | 8 +- .../DeltaDiscoveryRequestStreamObserver.java | 246 ++++++++++++++++++ .../DiscoveryRequestStreamObserver.java | 12 +- .../controlplane/server/DiscoveryServer.java | 59 ++++- .../server/DiscoveryServerCallbacks.java | 57 +++- .../server/LatestDeltaDiscoveryResponse.java | 26 ++ .../server/V2DiscoveryServer.java | 106 +++++++- .../server/V3DiscoveryServer.java | 105 +++++++- ...dsDeltaDiscoveryRequestStreamObserver.java | 126 +++++++++ .../XdsDiscoveryRequestStreamObserver.java | 8 +- .../callback/SnapshotCollectingCallback.java | 46 ++-- .../controlplane/server/TestMain.java | 29 ++- .../V2DiscoveryServerAdsWarmingClusterIT.java | 21 +- .../server/V2DiscoveryServerTest.java | 25 ++ .../V2OnlyDiscoveryServerCallbacks.java | 12 + .../controlplane/server/V2TestSnapshots.java | 15 +- .../V3DiscoveryServerAdsWarmingClusterIT.java | 21 +- .../server/V3DiscoveryServerTest.java | 25 ++ .../V3OnlyDiscoveryServerCallbacks.java | 12 + .../controlplane/server/V3TestSnapshots.java | 15 +- 41 files changed, 2017 insertions(+), 340 deletions(-) create mode 100644 cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaResponse.java create mode 100644 cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaWatch.java create mode 100644 cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaXdsRequest.java create mode 100644 cache/src/main/java/io/envoyproxy/controlplane/cache/SnapshotResource.java create mode 100644 server/src/main/java/io/envoyproxy/controlplane/server/AdsDeltaDiscoveryRequestStreamObserver.java create mode 100644 server/src/main/java/io/envoyproxy/controlplane/server/DeltaDiscoveryRequestStreamObserver.java create mode 100644 server/src/main/java/io/envoyproxy/controlplane/server/LatestDeltaDiscoveryResponse.java create mode 100644 server/src/main/java/io/envoyproxy/controlplane/server/XdsDeltaDiscoveryRequestStreamObserver.java diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfo.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfo.java index 30b7c454b..271b4e037 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfo.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfo.java @@ -17,7 +17,9 @@ public class CacheStatusInfo implements StatusInfo { private final T nodeGroup; private final ConcurrentMap watches = new ConcurrentHashMap<>(); + private final ConcurrentMap deltaWatches = new ConcurrentHashMap<>(); private volatile long lastWatchRequestTime; + private volatile long lastDeltaWatchRequestTime; public CacheStatusInfo(T nodeGroup) { this.nodeGroup = nodeGroup; @@ -31,6 +33,11 @@ public long lastWatchRequestTime() { return lastWatchRequestTime; } + @Override + public long lastDeltaWatchRequestTime() { + return lastDeltaWatchRequestTime; + } + /** * {@inheritDoc} */ @@ -47,6 +54,11 @@ public int numWatches() { return watches.size(); } + @Override + public int numDeltaWatches() { + return deltaWatches.size(); + } + /** * Removes the given watch from the tracked collection of watches. * @@ -56,6 +68,15 @@ public void removeWatch(long watchId) { watches.remove(watchId); } + /** + * Removes the given delta watch from the tracked collection of watches. + * + * @param watchId the ID for the delta watch that should be removed + */ + public void removeDeltaWatch(long watchId) { + deltaWatches.remove(watchId); + } + /** * Sets the timestamp of the last discovery watch request. * @@ -65,6 +86,15 @@ public void setLastWatchRequestTime(long lastWatchRequestTime) { this.lastWatchRequestTime = lastWatchRequestTime; } + /** + * Sets the timestamp of the last discovery delta watch request. + * + * @param lastDeltaWatchRequestTime the latest delta watch request timestamp + */ + public void setLastDeltaWatchRequestTime(long lastDeltaWatchRequestTime) { + this.lastDeltaWatchRequestTime = lastDeltaWatchRequestTime; + } + /** * Adds the given watch to the tracked collection of watches. * @@ -75,6 +105,16 @@ public void setWatch(long watchId, Watch watch) { watches.put(watchId, watch); } + /** + * Adds the given watch to the tracked collection of watches. + * + * @param watchId the ID for the watch that should be added + * @param watch the watch that should be added + */ + public void setDeltaWatch(long watchId, DeltaWatch watch) { + deltaWatches.put(watchId, watch); + } + /** * Returns the set of IDs for all watched currently being tracked. */ @@ -82,6 +122,13 @@ public Set watchIds() { return ImmutableSet.copyOf(watches.keySet()); } + /** + * Returns the set of IDs for all watched currently being tracked. + */ + public Set deltaWatchIds() { + return ImmutableSet.copyOf(deltaWatches.keySet()); + } + /** * Iterate over all tracked watches and execute the given function. If it returns {@code true}, then the watch is * removed from the tracked collection. If it returns {@code false}, then the watch is not removed. @@ -91,4 +138,15 @@ public Set watchIds() { public void watchesRemoveIf(BiFunction filter) { watches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue())); } + + /** + * Iterate over all tracked delta watches and execute the given function. If it returns {@code true}, + * then the watch is removed from the tracked collection. If it returns {@code false}, then + * the watch is not removed. + * + * @param filter the function to execute on each delta watch + */ + public void deltaWatchesRemoveIf(BiFunction filter) { + deltaWatches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue())); + } } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/ConfigWatcher.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/ConfigWatcher.java index fc94fe8d3..d51b82eb6 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/ConfigWatcher.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/ConfigWatcher.java @@ -1,5 +1,6 @@ package io.envoyproxy.controlplane.cache; +import java.util.Map; import java.util.Set; import java.util.function.Consumer; import javax.annotation.concurrent.ThreadSafe; @@ -28,4 +29,25 @@ Watch createWatch( Set knownResourceNames, Consumer responseConsumer, boolean hasClusterChanged); + + /** + * Returns a new configuration resource {@link Watch} for the given discovery request. + * + * @param request the discovery request (node, names, etc.) to use to generate the watch + * @param requesterVersion the last version applied by the requester + * @param resourceVersions resources that are already known to the requester + * @param pendingResources resources that the caller is waiting for + * @param isWildcard indicates if the stream is in wildcard mode + * @param responseConsumer the response handler, used to process outgoing response messages + * @param hasClusterChanged indicates if EDS should be sent immediately, even if version has not been changed. + * Supported in ADS mode. + */ + DeltaWatch createDeltaWatch( + DeltaXdsRequest request, + String requesterVersion, + Map resourceVersions, + Set pendingResources, + boolean isWildcard, + Consumer responseConsumer, + boolean hasClusterChanged); } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaResponse.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaResponse.java new file mode 100644 index 000000000..390bd82fa --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaResponse.java @@ -0,0 +1,41 @@ +package io.envoyproxy.controlplane.cache; + +import com.google.auto.value.AutoValue; +import com.google.protobuf.Message; +import java.util.List; +import java.util.Map; + +/** + * {@code Response} is a data class that contains the response for an assumed configuration type. + */ +@AutoValue +public abstract class DeltaResponse { + + public static DeltaResponse create(DeltaXdsRequest request, + Map> resources, + List removedResources, + String version) { + return new AutoValue_DeltaResponse(request, resources, removedResources, version); + } + + /** + * Returns the original request associated with the response. + */ + public abstract DeltaXdsRequest request(); + + /** + * Returns the resources to include in the response. + */ + public abstract Map> resources(); + + /** + * Returns the removed resources to include in the response. + */ + public abstract List removedResources(); + + /** + * Returns the version of the resources as tracked by the cache for the given type. Envoy responds with this version + * as an acknowledgement. + */ + public abstract String version(); +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaWatch.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaWatch.java new file mode 100644 index 000000000..fc9697291 --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaWatch.java @@ -0,0 +1,121 @@ +package io.envoyproxy.controlplane.cache; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Consumer; + +/** + * {@code Watch} is a dedicated stream of configuration resources produced by the configuration cache and consumed by + * the xDS server. + */ +public class DeltaWatch { + private static final AtomicIntegerFieldUpdater isCancelledUpdater = + AtomicIntegerFieldUpdater.newUpdater(DeltaWatch.class, "isCancelled"); + private final DeltaXdsRequest request; + private final Consumer responseConsumer; + private final Map resourceVersions; + private final Set pendingResources; + private final boolean isWildcard; + private final String version; + private volatile int isCancelled = 0; + private Runnable stop; + + /** + * Construct a watch. + * + * @param request the original request for the watch + * @param version indicates the stream current version + * @param isWildcard indicates if the stream is in wildcard mode + * @param responseConsumer handler for outgoing response messages + */ + public DeltaWatch(DeltaXdsRequest request, + Map resourceVersions, + Set pendingResources, + String version, + boolean isWildcard, + Consumer responseConsumer) { + this.request = request; + this.resourceVersions = resourceVersions; + this.pendingResources = pendingResources; + this.version = version; + this.isWildcard = isWildcard; + this.responseConsumer = responseConsumer; + } + + /** + * Cancel the watch. A watch must be cancelled in order to complete its resource stream and free resources. Cancel + * may be called multiple times, with each subsequent call being a no-op. + */ + public void cancel() { + if (isCancelledUpdater.compareAndSet(this, 0, 1)) { + if (stop != null) { + stop.run(); + } + } + } + + /** + * Returns boolean indicating whether or not the watch has been cancelled. + */ + public boolean isCancelled() { + return isCancelledUpdater.get(this) == 1; + } + + /** + * Returns the original request for the watch. + */ + public DeltaXdsRequest request() { + return request; + } + + /** + * Returns the tracked resources for the watch. + */ + public Map trackedResources() { + return resourceVersions; + } + + /** + * Returns the pending resources for the watch. + */ + public Set pendingResources() { + return pendingResources; + } + + /** + * Returns the stream current version. + */ + public String version() { + return version; + } + + /** + * Indicates if the stream is in wildcard mode. + */ + public boolean isWildcard() { + return isWildcard; + } + + /** + * Sends the given response to the watch's response handler. + * + * @param response the response to be handled + * @throws WatchCancelledException if the watch has already been cancelled + */ + public void respond(DeltaResponse response) throws WatchCancelledException { + if (isCancelled()) { + throw new WatchCancelledException(); + } + + responseConsumer.accept(response); + } + + /** + * Sets the callback method to be executed when the watch is cancelled. Even if cancel is executed multiple times, it + * ensures that this stop callback is only executed once. + */ + public void setStop(Runnable stop) { + this.stop = stop; + } +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaXdsRequest.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaXdsRequest.java new file mode 100644 index 000000000..4cd872d3d --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaXdsRequest.java @@ -0,0 +1,118 @@ +package io.envoyproxy.controlplane.cache; + +import static io.envoyproxy.controlplane.cache.Resources.TYPE_URLS_TO_RESOURCE_TYPE; + +import com.google.auto.value.AutoValue; +import io.envoyproxy.controlplane.cache.Resources.ResourceType; +import io.envoyproxy.envoy.api.v2.DeltaDiscoveryRequest; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +/** + * XdsRequest wraps a v2 or v3 DiscoveryRequest of and provides common methods as a + * workaround to the proto messages not implementing a common interface that can be used to + * abstract away xDS version. XdsRequest is passed around the codebase through common code, + * however the callers that need the raw request from it have knowledge of whether it is a v2 or + * a v3 request. + */ +@AutoValue +public abstract class DeltaXdsRequest { + public static DeltaXdsRequest create(DeltaDiscoveryRequest discoveryRequest) { + return new AutoValue_DeltaXdsRequest(discoveryRequest, null); + } + + public static DeltaXdsRequest create( + io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest discoveryRequest) { + return new AutoValue_DeltaXdsRequest(null, discoveryRequest); + } + + /** + * Returns the underlying v2 request, or null if this was a v3 request. Callers should have + * knowledge of whether the request was v2 or not. + * + * @return v2 DiscoveryRequest or null + */ + @Nullable + public abstract DeltaDiscoveryRequest v2Request(); + + /** + * Returns he underlying v3 request, or null if this was a v2 request. Callers should have + * knowledge of whether the request was v3 or not. + * + * @return v3 DiscoveryRequest or null + */ + @Nullable + public abstract io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest v3Request(); + + /** + * Returns the type URL of the v2 or v3 request. + */ + public String getTypeUrl() { + if (v2Request() != null) { + return v2Request().getTypeUrl(); + } + return v3Request().getTypeUrl(); + } + + /** + * Returns the ResourceType of the underlying request. This is useful for accepting requests + * for both v2 and v3 resource types and having a key to normalize on the logical resource. + */ + public ResourceType getResourceType() { + if (v2Request() != null) { + return TYPE_URLS_TO_RESOURCE_TYPE.get(v2Request().getTypeUrl()); + } + return TYPE_URLS_TO_RESOURCE_TYPE.get(v3Request().getTypeUrl()); + } + + /** + * Returns the response nonse from the underlying DiscoveryRequest. + */ + public String getResponseNonce() { + if (v2Request() != null) { + return v2Request().getResponseNonce(); + } + return v3Request().getResponseNonce(); + } + + /** + * Returns the error_detail from the underlying v2 or v3 request. + */ + public boolean hasErrorDetail() { + if (v2Request() != null) { + return v2Request().hasErrorDetail(); + } + return v3Request().hasErrorDetail(); + } + + /** + * Returns the resource_names_subscribe from the underlying v2 or v3 request. + */ + public List getResourceNamesSubscribeList() { + if (v2Request() != null) { + return v2Request().getResourceNamesSubscribeList(); + } + return v3Request().getResourceNamesSubscribeList(); + } + + /** + * Returns the resource_names_unsubscribe from the underlying v2 or v3 request. + */ + public List getResourceNamesUnsubscribeList() { + if (v2Request() != null) { + return v2Request().getResourceNamesUnsubscribeList(); + } + return v3Request().getResourceNamesUnsubscribeList(); + } + + /** + * Returns the initial_resource_versions from the underlying v2 or v3 request. + */ + public Map getInitialResourceVersionsMap() { + if (v2Request() != null) { + return v2Request().getInitialResourceVersionsMap(); + } + return v3Request().getInitialResourceVersionsMap(); + } +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/GroupCacheStatusInfo.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/GroupCacheStatusInfo.java index 1b192764b..666682848 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/GroupCacheStatusInfo.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/GroupCacheStatusInfo.java @@ -23,6 +23,11 @@ public long lastWatchRequestTime() { return statuses.stream().mapToLong(CacheStatusInfo::lastWatchRequestTime).max().orElse(0); } + @Override + public long lastDeltaWatchRequestTime() { + return statuses.stream().mapToLong(CacheStatusInfo::lastDeltaWatchRequestTime).max().orElse(0); + } + /** * {@inheritDoc} */ @@ -38,4 +43,9 @@ public T nodeGroup() { public int numWatches() { return statuses.stream().mapToInt(CacheStatusInfo::numWatches).sum(); } + + @Override + public int numDeltaWatches() { + return statuses.stream().mapToInt(CacheStatusInfo::numDeltaWatches).sum(); + } } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/Resources.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/Resources.java index 98ba25bd0..a00806b28 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/Resources.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/Resources.java @@ -216,10 +216,11 @@ public static String getResourceName(Any anyResource) { * * @param resources the resource whose dependencies we are calculating */ - public static Set getResourceReferences(Collection resources) { + public static Set getResourceReferences(Collection> resources) { final ImmutableSet.Builder refs = ImmutableSet.builder(); - for (Message r : resources) { + for (SnapshotResource sr : resources) { + Message r = sr.resource(); if (r instanceof ClusterLoadAssignment || r instanceof RouteConfiguration) { // Endpoints have no dependencies. diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java index 141cf5858..def32fd91 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java @@ -4,12 +4,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.protobuf.Message; import io.envoyproxy.controlplane.cache.Resources.ResourceType; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -20,6 +23,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.concurrent.GuardedBy; import org.slf4j.Logger; @@ -199,6 +203,138 @@ public Watch createWatch( } } + /** + * {@inheritDoc} + */ + @Override + public DeltaWatch createDeltaWatch( + DeltaXdsRequest request, + String requesterVersion, + Map resourceVersions, + Set pendingResources, + boolean isWildcard, + Consumer responseConsumer, + boolean hasClusterChanged) { + + ResourceType requestResourceType = request.getResourceType(); + Preconditions.checkNotNull(requestResourceType, "unsupported type URL %s", + request.getTypeUrl()); + T group; + if (request.v3Request() != null) { + group = groups.hash(request.v3Request().getNode()); + } else { + group = groups.hash(request.v2Request().getNode()); + } + + // even though we're modifying, we take a readLock to allow multiple watches to be created in parallel since it + // doesn't conflict + readLock.lock(); + try { + CacheStatusInfo status = statuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>()) + .computeIfAbsent(requestResourceType, s -> new CacheStatusInfo<>(group)); + status.setLastWatchRequestTime(System.currentTimeMillis()); + + U snapshot = snapshots.get(group); + String version = snapshot == null ? "" : snapshot.version(requestResourceType, Collections.emptyList()); + + DeltaWatch watch = new DeltaWatch(request, + ImmutableMap.copyOf(resourceVersions), + ImmutableSet.copyOf(pendingResources), + requesterVersion, + isWildcard, + responseConsumer); + + // If no snapshot, leave an open watch. + + if (snapshot == null) { + long watchId = setDeltaWatch(status, watch); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("open watch {} for {}[{}] from node {} for version {}", + watchId, + request.getTypeUrl(), + String.join(", ", watch.trackedResources().keySet()), + group, + requesterVersion); + } + + return watch; + } + + // If the requested version is up-to-date or missing a response, leave an open watch. + if (version.equals(requesterVersion)) { + // If the request is not wildcard, we have pending resources and we have them, we should respond immediately. + if (!isWildcard && watch.pendingResources().size() != 0) { + // If any of the pending resources are in the snapshot respond immediately. If not we'll fall back to + // version comparisons. + Map> resources = snapshot.resources(request.getResourceType()); + Map> requestedResources = watch.pendingResources() + .stream() + .filter(resources::containsKey) + .collect(Collectors.toMap(Function.identity(), resources::get)); + ResponseState responseState = respondDelta(watch, + requestedResources, + Collections.emptyList(), + version, + group); + if (responseState.equals(ResponseState.RESPONDED) || responseState.equals(ResponseState.CANCELLED)) { + return watch; + } + } else if (hasClusterChanged && requestResourceType.equals(ResourceType.ENDPOINT)) { + ResponseState responseState = respondDeltaTracked( + watch, + snapshot.resources(request.getResourceType()), + version, + group); + if (responseState.equals(ResponseState.RESPONDED) || responseState.equals(ResponseState.CANCELLED)) { + return watch; + } + } + + long watchId = setDeltaWatch(status, watch); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("open watch {} for {}[{}] from node {} for version {}", + watchId, + request.getTypeUrl(), + String.join(", ", watch.trackedResources().keySet()), + group, + requesterVersion); + } + + return watch; + } + + // Otherwise, version is different, the watch may be responded immediately + ResponseState responseState = respondDeltaTracked(watch, + snapshot.resources(request.getResourceType()), + version, + group); + if (responseState.equals(ResponseState.RESPONDED) || responseState.equals(ResponseState.CANCELLED)) { + return watch; + } + + long watchId = setDeltaWatch(status, watch); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("did not respond immediately, leaving open watch {} for {}[{}] from node {} for version {}", + watchId, + request.getTypeUrl(), + String.join(", ", watch.trackedResources().keySet()), + group, + requesterVersion); + } + + return watch; + } finally { + readLock.unlock(); + } + } + + private long setDeltaWatch(CacheStatusInfo status, DeltaWatch watch) { + long watchId = watchCount.incrementAndGet(); + status.setDeltaWatch(watchId, watch); + watch.setStop(() -> status.removeDeltaWatch(watchId)); + return watchId; + } + /** * {@inheritDoc} */ @@ -302,20 +438,23 @@ protected void respondWithSpecificOrder(T group, } } - private Response createResponse(XdsRequest request, Map resources, - String version) { + private Response createResponse(XdsRequest request, Map> resources, + String version) { Collection filtered = request.getResourceNamesList().isEmpty() - ? resources.values() + ? resources.values().stream() + .map(SnapshotResource::resource) + .collect(Collectors.toList()) : request.getResourceNamesList().stream() .map(resources::get) .filter(Objects::nonNull) + .map(SnapshotResource::resource) .collect(Collectors.toList()); return Response.create(request, filtered, version); } private boolean respond(Watch watch, U snapshot, T group) { - Map snapshotResources = snapshot.resources(watch.request().getResourceType()); + Map> snapshotResources = snapshot.resources(watch.request().getResourceType()); if (!watch.request().getResourceNamesList().isEmpty() && watch.ads()) { Collection missingNames = watch.request().getResourceNamesList().stream() @@ -363,4 +502,81 @@ private boolean respond(Watch watch, U snapshot, T group) { return false; } + + /** + * Responds a delta watch using resource version comparison. + * + * @return if the watch has been responded. + */ + private ResponseState respondDeltaTracked(DeltaWatch watch, + Map> snapshotResources, + String version, + T group) { + // remove resources for which client has a tracked version but do not exist in snapshot + List removedResources = watch.trackedResources().keySet() + .stream() + .filter(s -> !snapshotResources.containsKey(s)) + .collect(Collectors.toList()); + + return respondDeltaTracked(watch, snapshotResources, removedResources, version, group); + } + + private ResponseState respondDeltaTracked(DeltaWatch watch, + Map> snapshotResources, + List removedResources, + String version, + T group) { + + Map> resources = snapshotResources.entrySet() + .stream() + .filter(entry -> { + if (watch.pendingResources().contains(entry.getKey())) { + return true; + } + String resourceVersion = watch.trackedResources().get(entry.getKey()); + if (resourceVersion == null) { + // resource is not tracked, should respond it only if watch is wildcard + return watch.isWildcard(); + } + return !entry.getValue().version().equals(resourceVersion); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return respondDelta(watch, resources, removedResources, version, group); + } + + private ResponseState respondDelta(DeltaWatch watch, + Map> resources, + List removedResources, + String version, + T group) { + if (resources.isEmpty() && removedResources.isEmpty()) { + return ResponseState.UNRESPONDED; + } + + DeltaResponse response = DeltaResponse.create( + watch.request(), + resources, + removedResources, + version); + + try { + watch.respond(response); + return ResponseState.RESPONDED; + } catch (WatchCancelledException e) { + LOGGER.error( + "failed to respond for {} from node {} with version {} because watch was already cancelled", + watch.request().getTypeUrl(), + group, + version); + } + + return ResponseState.CANCELLED; + } + + private enum ResponseState { + RESPONDED, + UNRESPONDED, + CANCELLED + } } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/Snapshot.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/Snapshot.java index 41ec42b7b..b0365093f 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/Snapshot.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/Snapshot.java @@ -8,24 +8,20 @@ public abstract class Snapshot { - public abstract String version(ResourceType resourceType, List resourceNames); - - public abstract Map resources(ResourceType resourceType); - /** * Asserts that all of the given resource names have corresponding values in the given resources collection. * - * @param parentTypeUrl the type of the parent resources (source of the resource name refs) + * @param parentTypeUrl the type of the parent resources (source of the resource name refs) * @param dependencyTypeUrl the type of the given dependent resources - * @param resourceNames the set of dependent resource names that must exist - * @param resources the collection of resources whose names are being checked + * @param resourceNames the set of dependent resource names that must exist + * @param resources the collection of resources whose names are being checked * @throws SnapshotConsistencyException if a name is given that does not exist in the resources collection */ - protected static void ensureAllResourceNamesExist( + protected static void ensureAllResourceNamesExist( String parentTypeUrl, String dependencyTypeUrl, Set resourceNames, - Map resources) throws SnapshotConsistencyException { + Map> resources) throws SnapshotConsistencyException { if (resourceNames.size() != resources.size()) { throw new SnapshotConsistencyException( @@ -49,4 +45,8 @@ protected static void ensureAllResourceNamesExist( } } } + + public abstract String version(ResourceType resourceType, List resourceNames); + + public abstract Map> resources(ResourceType resourceType); } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/SnapshotResource.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/SnapshotResource.java new file mode 100644 index 000000000..bcf135a1e --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/SnapshotResource.java @@ -0,0 +1,32 @@ +package io.envoyproxy.controlplane.cache; + +import com.google.auto.value.AutoValue; +import com.google.protobuf.Message; + +@AutoValue +public abstract class SnapshotResource { + + /** + * Returns a new {@link SnapshotResource} instance. + * + * @param resource the resource + * @param version the version associated with the resource + * @param the type of resource + */ + public static SnapshotResource create(T resource, String version) { + return new AutoValue_SnapshotResource<>( + resource, + version + ); + } + + /** + * Returns the resource. + */ + public abstract T resource(); + + /** + * Returns the version associated with the resource. + */ + public abstract String version(); +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/SnapshotResources.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/SnapshotResources.java index 107e173ca..e96a877fd 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/SnapshotResources.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/SnapshotResources.java @@ -19,7 +19,9 @@ public abstract class SnapshotResources { * @param version the version associated with the resources in this collection * @param the type of resources in this collection */ - public static SnapshotResources create(Iterable resources, String version) { + public static SnapshotResources create( + Iterable> resources, + String version) { return new AutoValue_SnapshotResources<>( resourcesMap(resources), (r) -> version @@ -34,19 +36,20 @@ public static SnapshotResources create(Iterable resour * @param the type of resources in this collection */ public static SnapshotResources create( - Iterable resources, + Iterable> resources, ResourceVersionResolver versionResolver) { return new AutoValue_SnapshotResources<>( resourcesMap(resources), versionResolver); } - private static ImmutableMap resourcesMap(Iterable resources) { + private static ImmutableMap> resourcesMap( + Iterable> resources) { return StreamSupport.stream(resources.spliterator(), false) .collect( Collector.of( - ImmutableMap.Builder::new, - (b, e) -> b.put(Resources.getResourceName(e), e), + ImmutableMap.Builder>::new, + (b, e) -> b.put(Resources.getResourceName(e.resource()), e), (b1, b2) -> b1.putAll(b2.build()), ImmutableMap.Builder::build)); } @@ -54,7 +57,7 @@ private static ImmutableMap resourcesMap(Iterable /** * Returns a map of the resources in this collection, where the key is the name of the resource. */ - public abstract Map resources(); + public abstract Map> resources(); /** * Returns the version associated with this all resources in this collection. diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/StatusInfo.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/StatusInfo.java index 0651288a3..4daafdd84 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/StatusInfo.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/StatusInfo.java @@ -12,6 +12,11 @@ public interface StatusInfo { */ long lastWatchRequestTime(); + /** + * Returns the timestamp of the last discovery delta watch request. + */ + long lastDeltaWatchRequestTime(); + /** * Returns the node grouping represented by this status, generated via * {@link NodeGroup#hash(Node)} or {@link NodeGroup#hash(io.envoyproxy.envoy.api.v2.core.Node)}. @@ -22,4 +27,9 @@ public interface StatusInfo { * Returns the number of open watches. */ int numWatches(); + + /** + * Returns the number of open delta watches. + */ + int numDeltaWatches(); } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/v2/Snapshot.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/v2/Snapshot.java index 38e88e7e2..c28b2c321 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/v2/Snapshot.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/v2/Snapshot.java @@ -5,11 +5,11 @@ import com.google.auto.value.AutoValue; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Message; import io.envoyproxy.controlplane.cache.ResourceVersionResolver; import io.envoyproxy.controlplane.cache.Resources; import io.envoyproxy.controlplane.cache.Resources.ResourceType; import io.envoyproxy.controlplane.cache.SnapshotConsistencyException; +import io.envoyproxy.controlplane.cache.SnapshotResource; import io.envoyproxy.controlplane.cache.SnapshotResources; import io.envoyproxy.envoy.api.v2.Cluster; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; @@ -31,18 +31,18 @@ public abstract class Snapshot extends io.envoyproxy.controlplane.cache.Snapshot /** * Returns a new {@link Snapshot} instance that is versioned uniformly across all resources. * - * @param clusters the cluster resources in this snapshot + * @param clusters the cluster resources in this snapshot * @param endpoints the endpoint resources in this snapshot * @param listeners the listener resources in this snapshot - * @param routes the route resources in this snapshot - * @param version the version associated with all resources in this snapshot + * @param routes the route resources in this snapshot + * @param version the version associated with all resources in this snapshot */ public static Snapshot create( - Iterable clusters, - Iterable endpoints, - Iterable listeners, - Iterable routes, - Iterable secrets, + Iterable> clusters, + Iterable> endpoints, + Iterable> listeners, + Iterable> routes, + Iterable> secrets, String version) { return new AutoValue_Snapshot( @@ -56,25 +56,25 @@ public static Snapshot create( /** * Returns a new {@link Snapshot} instance that has separate versions for each resource type. * - * @param clusters the cluster resources in this snapshot - * @param clustersVersion the version of the cluster resources - * @param endpoints the endpoint resources in this snapshot + * @param clusters the cluster resources in this snapshot + * @param clustersVersion the version of the cluster resources + * @param endpoints the endpoint resources in this snapshot * @param endpointsVersion the version of the endpoint resources - * @param listeners the listener resources in this snapshot + * @param listeners the listener resources in this snapshot * @param listenersVersion the version of the listener resources - * @param routes the route resources in this snapshot - * @param routesVersion the version of the route resources + * @param routes the route resources in this snapshot + * @param routesVersion the version of the route resources */ public static Snapshot create( - Iterable clusters, + Iterable> clusters, String clustersVersion, - Iterable endpoints, + Iterable> endpoints, String endpointsVersion, - Iterable listeners, + Iterable> listeners, String listenersVersion, - Iterable routes, + Iterable> routes, String routesVersion, - Iterable secrets, + Iterable> secrets, String secretsVersion) { // TODO(snowp): add a builder alternative @@ -89,27 +89,27 @@ public static Snapshot create( /** * Returns a new {@link Snapshot} instance that has separate versions for each resource type. * - * @param clusters the cluster resources in this snapshot - * @param clusterVersionResolver version resolver of the clusters in this snapshot - * @param endpoints the endpoint resources in this snapshot + * @param clusters the cluster resources in this snapshot + * @param clusterVersionResolver version resolver of the clusters in this snapshot + * @param endpoints the endpoint resources in this snapshot * @param endpointVersionResolver version resolver of the endpoints in this snapshot - * @param listeners the listener resources in this snapshot + * @param listeners the listener resources in this snapshot * @param listenerVersionResolver version resolver of listeners in this snapshot - * @param routes the route resources in this snapshot - * @param routeVersionResolver version resolver of the routes in this snapshot - * @param secrets the secret resources in this snapshot - * @param secretVersionResolver version resolver of the secrets in this snapshot + * @param routes the route resources in this snapshot + * @param routeVersionResolver version resolver of the routes in this snapshot + * @param secrets the secret resources in this snapshot + * @param secretVersionResolver version resolver of the secrets in this snapshot */ public static Snapshot create( - Iterable clusters, + Iterable> clusters, ResourceVersionResolver clusterVersionResolver, - Iterable endpoints, + Iterable> endpoints, ResourceVersionResolver endpointVersionResolver, - Iterable listeners, + Iterable> listeners, ResourceVersionResolver listenerVersionResolver, - Iterable routes, + Iterable> routes, ResourceVersionResolver routeVersionResolver, - Iterable secrets, + Iterable> secrets, ResourceVersionResolver secretVersionResolver) { return new AutoValue_Snapshot( @@ -182,7 +182,7 @@ public void ensureConsistent() throws SnapshotConsistencyException { * * @param typeUrl the type URL of the requested resource type */ - public Map resources(String typeUrl) { + public Map> resources(String typeUrl) { if (Strings.isNullOrEmpty(typeUrl)) { return ImmutableMap.of(); } @@ -200,18 +200,18 @@ public void ensureConsistent() throws SnapshotConsistencyException { * * @param resourceType the requested resource type */ - public Map resources(ResourceType resourceType) { + public Map> resources(ResourceType resourceType) { switch (resourceType) { case CLUSTER: - return clusters().resources(); + return (Map) clusters().resources(); case ENDPOINT: - return endpoints().resources(); + return (Map) endpoints().resources(); case LISTENER: - return listeners().resources(); + return (Map) listeners().resources(); case ROUTE: - return routes().resources(); + return (Map) routes().resources(); case SECRET: - return secrets().resources(); + return (Map) secrets().resources(); default: return ImmutableMap.of(); } @@ -229,7 +229,7 @@ public String version(String typeUrl) { /** * Returns the version in this snapshot for the given resource type. * - * @param typeUrl the type URL of the requested resource type + * @param typeUrl the type URL of the requested resource type * @param resourceNames list of requested resource names, * used to calculate a version for the given resources */ @@ -253,7 +253,7 @@ public String version(ResourceType resourceType) { /** * Returns the version in this snapshot for the given resource type. * - * @param resourceType the the requested resource type + * @param resourceType the the requested resource type * @param resourceNames list of requested resource names, * used to calculate a version for the given resources */ @@ -274,6 +274,4 @@ public String version(ResourceType resourceType, List resourceNames) { return ""; } } - - } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/v3/Snapshot.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/v3/Snapshot.java index 0c495d47b..f5c5f518e 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/v3/Snapshot.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/v3/Snapshot.java @@ -5,10 +5,10 @@ import com.google.auto.value.AutoValue; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Message; import io.envoyproxy.controlplane.cache.Resources; import io.envoyproxy.controlplane.cache.Resources.ResourceType; import io.envoyproxy.controlplane.cache.SnapshotConsistencyException; +import io.envoyproxy.controlplane.cache.SnapshotResource; import io.envoyproxy.controlplane.cache.SnapshotResources; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; @@ -30,18 +30,18 @@ public abstract class Snapshot extends io.envoyproxy.controlplane.cache.Snapshot * Returns a new {@link io.envoyproxy.controlplane.cache.v2.Snapshot} instance that is versioned * uniformly across all resources. * - * @param clusters the cluster resources in this snapshot + * @param clusters the cluster resources in this snapshot * @param endpoints the endpoint resources in this snapshot * @param listeners the listener resources in this snapshot - * @param routes the route resources in this snapshot - * @param version the version associated with all resources in this snapshot + * @param routes the route resources in this snapshot + * @param version the version associated with all resources in this snapshot */ public static Snapshot create( - Iterable clusters, - Iterable endpoints, - Iterable listeners, - Iterable routes, - Iterable secrets, + Iterable> clusters, + Iterable> endpoints, + Iterable> listeners, + Iterable> routes, + Iterable> secrets, String version) { return new AutoValue_Snapshot( @@ -56,25 +56,25 @@ public static Snapshot create( * Returns a new {@link io.envoyproxy.controlplane.cache.v2.Snapshot} instance that has separate * versions for each resource type. * - * @param clusters the cluster resources in this snapshot - * @param clustersVersion the version of the cluster resources - * @param endpoints the endpoint resources in this snapshot + * @param clusters the cluster resources in this snapshot + * @param clustersVersion the version of the cluster resources + * @param endpoints the endpoint resources in this snapshot * @param endpointsVersion the version of the endpoint resources - * @param listeners the listener resources in this snapshot + * @param listeners the listener resources in this snapshot * @param listenersVersion the version of the listener resources - * @param routes the route resources in this snapshot - * @param routesVersion the version of the route resources + * @param routes the route resources in this snapshot + * @param routesVersion the version of the route resources */ public static Snapshot create( - Iterable clusters, + Iterable> clusters, String clustersVersion, - Iterable endpoints, + Iterable> endpoints, String endpointsVersion, - Iterable listeners, + Iterable> listeners, String listenersVersion, - Iterable routes, + Iterable> routes, String routesVersion, - Iterable secrets, + Iterable> secrets, String secretsVersion) { // TODO(snowp): add a builder alternative @@ -139,7 +139,7 @@ public void ensureConsistent() throws SnapshotConsistencyException { * * @param typeUrl the type URL of the requested resource type */ - public Map resources(String typeUrl) { + public Map> resources(String typeUrl) { if (Strings.isNullOrEmpty(typeUrl)) { return ImmutableMap.of(); } @@ -157,18 +157,18 @@ public void ensureConsistent() throws SnapshotConsistencyException { * * @param resourceType the requested resource type */ - public Map resources(ResourceType resourceType) { + public Map> resources(ResourceType resourceType) { switch (resourceType) { case CLUSTER: - return clusters().resources(); + return (Map) clusters().resources(); case ENDPOINT: - return endpoints().resources(); + return (Map) endpoints().resources(); case LISTENER: - return listeners().resources(); + return (Map) listeners().resources(); case ROUTE: - return routes().resources(); + return (Map) routes().resources(); case SECRET: - return secrets().resources(); + return (Map) secrets().resources(); default: return ImmutableMap.of(); } @@ -186,7 +186,7 @@ public String version(String typeUrl) { /** * Returns the version in this snapshot for the given resource type. * - * @param typeUrl the type URL of the requested resource type + * @param typeUrl the type URL of the requested resource type * @param resourceNames list of requested resource names, * used to calculate a version for the given resources */ @@ -209,7 +209,7 @@ public String version(ResourceType resourceType) { /** * Returns the version in this snapshot for the given resource type. * - * @param resourceType the the requested resource type + * @param resourceType the the requested resource type * @param resourceNames list of requested resource names, * used to calculate a version for the given resources */ diff --git a/cache/src/test/java/io/envoyproxy/controlplane/cache/ResourcesTest.java b/cache/src/test/java/io/envoyproxy/controlplane/cache/ResourcesTest.java index 730a8f938..9448449aa 100644 --- a/cache/src/test/java/io/envoyproxy/controlplane/cache/ResourcesTest.java +++ b/cache/src/test/java/io/envoyproxy/controlplane/cache/ResourcesTest.java @@ -17,51 +17,60 @@ import io.envoyproxy.envoy.api.v2.Listener; import io.envoyproxy.envoy.api.v2.RouteConfiguration; import io.envoyproxy.envoy.api.v2.auth.Secret; +import io.envoyproxy.envoy.api.v2.core.ApiVersion; import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import org.junit.Test; public class ResourcesTest { private static final boolean ADS = ThreadLocalRandom.current().nextBoolean(); - private static final String CLUSTER_NAME = "v2cluster"; + private static final String CLUSTER_NAME = "v2cluster"; private static final String LISTENER_NAME = "v2listener"; - private static final String ROUTE_NAME = "v2route"; - private static final String SECRET_NAME = "v2secret"; - private static final String V3_CLUSTER_NAME = "v3cluster"; + private static final String ROUTE_NAME = "v2route"; + private static final String SECRET_NAME = "v2secret"; + private static final String V3_CLUSTER_NAME = "v3cluster"; private static final String V3_LISTENER_NAME = "v3listener"; - private static final String V3_ROUTE_NAME = "v3route"; - private static final String V3_SECRET_NAME = "v3secret"; + private static final String V3_ROUTE_NAME = "v3route"; + private static final String V3_SECRET_NAME = "v3secret"; private static final int ENDPOINT_PORT = ThreadLocalRandom.current().nextInt(10000, 20000); private static final int LISTENER_PORT = ThreadLocalRandom.current().nextInt(20000, 30000); - private static final Cluster CLUSTER = TestResources.createCluster(CLUSTER_NAME); - private static final ClusterLoadAssignment ENDPOINT = TestResources.createEndpoint(CLUSTER_NAME, ENDPOINT_PORT); - private static final Listener LISTENER = TestResources.createListener(ADS, - io.envoyproxy.envoy.api.v2.core.ApiVersion.V2, io.envoyproxy.envoy.api.v2.core.ApiVersion.V2, - LISTENER_NAME, LISTENER_PORT, - ROUTE_NAME); - private static final RouteConfiguration ROUTE = TestResources.createRoute(ROUTE_NAME, CLUSTER_NAME); - private static final Secret SECRET = TestResources.createSecret(SECRET_NAME); - - private static final io.envoyproxy.envoy.config.cluster.v3.Cluster V3_CLUSTER = - TestResources.createClusterV3(V3_CLUSTER_NAME); - private static final io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment V3_ENDPOINT = - TestResources.createEndpointV3(V3_CLUSTER_NAME, ENDPOINT_PORT); - private static final io.envoyproxy.envoy.config.listener.v3.Listener - V3_LISTENER = TestResources.createListenerV3(ADS, V3, V3, V3_LISTENER_NAME, - LISTENER_PORT, V3_ROUTE_NAME); - private static final io.envoyproxy.envoy.config.route.v3.RouteConfiguration V3_ROUTE = - TestResources.createRouteV3(V3_ROUTE_NAME, V3_CLUSTER_NAME); - private static final io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret V3_SECRET = - TestResources.createSecretV3(V3_SECRET_NAME); + private static final SnapshotResource CLUSTER = SnapshotResource.create( + TestResources.createCluster(CLUSTER_NAME), UUID.randomUUID().toString()); + private static final SnapshotResource ENDPOINT = SnapshotResource.create( + TestResources.createEndpoint(CLUSTER_NAME, ENDPOINT_PORT), UUID.randomUUID().toString()); + private static final SnapshotResource LISTENER = SnapshotResource.create( + TestResources.createListener(ADS, ApiVersion.V2, + ApiVersion.V2, + LISTENER_NAME, LISTENER_PORT, + ROUTE_NAME), UUID.randomUUID().toString()); + private static final SnapshotResource ROUTE = SnapshotResource.create( + TestResources.createRoute(ROUTE_NAME, CLUSTER_NAME), UUID.randomUUID().toString()); + private static final SnapshotResource SECRET = SnapshotResource.create( + TestResources.createSecret(SECRET_NAME), UUID.randomUUID().toString()); + + private static final SnapshotResource V3_CLUSTER = + SnapshotResource.create(TestResources.createClusterV3(V3_CLUSTER_NAME), UUID.randomUUID().toString()); + private static final SnapshotResource V3_ENDPOINT = + SnapshotResource.create(TestResources.createEndpointV3(V3_CLUSTER_NAME, ENDPOINT_PORT), + UUID.randomUUID().toString()); + private static final SnapshotResource + V3_LISTENER = SnapshotResource.create(TestResources.createListenerV3(ADS, V3, V3, V3_LISTENER_NAME, + LISTENER_PORT, V3_ROUTE_NAME), UUID.randomUUID().toString()); + private static final SnapshotResource V3_ROUTE = + SnapshotResource.create(TestResources.createRouteV3(V3_ROUTE_NAME, V3_CLUSTER_NAME), + UUID.randomUUID().toString()); + private static final SnapshotResource V3_SECRET = + SnapshotResource.create(TestResources.createSecretV3(V3_SECRET_NAME), UUID.randomUUID().toString()); @Test public void getResourceNameReturnsExpectedNameForValidResourceMessage() { - Map cases = ImmutableMap.of( + ImmutableMap, String> cases = ImmutableMap.of( CLUSTER, CLUSTER_NAME, ENDPOINT, CLUSTER_NAME, LISTENER, LISTENER_NAME, @@ -69,12 +78,12 @@ public void getResourceNameReturnsExpectedNameForValidResourceMessage() { SECRET, SECRET_NAME); cases.forEach((resource, expectedName) -> - assertThat(Resources.getResourceName(resource)).isEqualTo(expectedName)); + assertThat(Resources.getResourceName(resource.resource())).isEqualTo(expectedName)); } @Test public void getResourceNameReturnsExpectedNameForValidResourceMessageV3() { - Map cases = ImmutableMap.of( + ImmutableMap, String> cases = ImmutableMap.of( V3_CLUSTER, V3_CLUSTER_NAME, V3_ENDPOINT, V3_CLUSTER_NAME, V3_LISTENER, V3_LISTENER_NAME, @@ -82,7 +91,7 @@ public void getResourceNameReturnsExpectedNameForValidResourceMessageV3() { V3_SECRET, V3_SECRET_NAME); cases.forEach((resource, expectedName) -> - assertThat(Resources.getResourceName(resource)).isEqualTo(expectedName)); + assertThat(Resources.getResourceName(resource.resource())).isEqualTo(expectedName)); } @Test @@ -102,22 +111,25 @@ public void getResourceNameAnyThrowsOnBadClass() { @Test public void getResourceReferencesReturnsExpectedReferencesForValidResourceMessages() { String clusterServiceName = "clusterWithServiceName0"; - Cluster clusterWithServiceName = Cluster.newBuilder() - .setName(CLUSTER_NAME) - .setEdsClusterConfig( - EdsClusterConfig.newBuilder() - .setServiceName(clusterServiceName)) - .setType(DiscoveryType.EDS) - .build(); - - Map, Set> cases = ImmutableMap., Set>builder() - .put(ImmutableList.of(CLUSTER), ImmutableSet.of(CLUSTER_NAME)) - .put(ImmutableList.of(clusterWithServiceName), ImmutableSet.of(clusterServiceName)) - .put(ImmutableList.of(ENDPOINT), ImmutableSet.of()) - .put(ImmutableList.of(LISTENER), ImmutableSet.of(ROUTE_NAME)) - .put(ImmutableList.of(ROUTE), ImmutableSet.of()) - .put(ImmutableList.of(CLUSTER, ENDPOINT, LISTENER, ROUTE), ImmutableSet.of(CLUSTER_NAME, ROUTE_NAME)) - .build(); + SnapshotResource clusterWithServiceName = SnapshotResource.create(Cluster.newBuilder() + .setName(CLUSTER_NAME) + .setEdsClusterConfig( + EdsClusterConfig.newBuilder() + .setServiceName(clusterServiceName)) + .setType(DiscoveryType.EDS) + .build(), + UUID.randomUUID().toString()); + + Map>, Set> cases = + ImmutableMap.>, Set>builder() + .put((Collection) ImmutableList.of(CLUSTER), ImmutableSet.of(CLUSTER_NAME)) + .put((Collection) ImmutableList.of(clusterWithServiceName), ImmutableSet.of(clusterServiceName)) + .put((Collection) ImmutableList.of(ENDPOINT), ImmutableSet.of()) + .put((Collection) ImmutableList.of(LISTENER), ImmutableSet.of(ROUTE_NAME)) + .put((Collection) ImmutableList.of(ROUTE), ImmutableSet.of()) + .put((Collection) ImmutableList.of(CLUSTER, ENDPOINT, LISTENER, ROUTE), + ImmutableSet.of(CLUSTER_NAME, ROUTE_NAME)) + .build(); cases.forEach((resources, refs) -> assertThat(Resources.getResourceReferences(resources)).containsExactlyElementsOf(refs)); @@ -126,22 +138,25 @@ public void getResourceReferencesReturnsExpectedReferencesForValidResourceMessag @Test public void getResourceReferencesReturnsExpectedReferencesForValidV3ResourceMessages() { String clusterServiceName = "clusterWithServiceName0"; - io.envoyproxy.envoy.config.cluster.v3.Cluster clusterWithServiceName = io.envoyproxy.envoy.config.cluster.v3.Cluster + SnapshotResource clusterWithServiceName = SnapshotResource.create( + io.envoyproxy.envoy.config.cluster.v3.Cluster .newBuilder() .setName(V3_CLUSTER_NAME) .setEdsClusterConfig( io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig.newBuilder() .setServiceName(clusterServiceName)) .setType(io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType.EDS) - .build(); - - Map, Set> cases = ImmutableMap., Set>builder() - .put(ImmutableList.of(V3_CLUSTER), ImmutableSet.of(V3_CLUSTER_NAME)) - .put(ImmutableList.of(clusterWithServiceName), ImmutableSet.of(clusterServiceName)) - .put(ImmutableList.of(V3_ENDPOINT), ImmutableSet.of()) - .put(ImmutableList.of(V3_LISTENER), ImmutableSet.of(V3_ROUTE_NAME)) - .put(ImmutableList.of(V3_ROUTE), ImmutableSet.of()) - .put(ImmutableList.of(V3_CLUSTER, V3_ENDPOINT, V3_LISTENER, V3_ROUTE), + .build(), + UUID.randomUUID().toString()); + + Map>, Set> cases = + ImmutableMap.>, Set>builder() + .put((Collection) ImmutableList.of(V3_CLUSTER), ImmutableSet.of(V3_CLUSTER_NAME)) + .put((Collection) ImmutableList.of(clusterWithServiceName), ImmutableSet.of(clusterServiceName)) + .put((Collection) ImmutableList.of(V3_ENDPOINT), ImmutableSet.of()) + .put((Collection) ImmutableList.of(V3_LISTENER), ImmutableSet.of(V3_ROUTE_NAME)) + .put((Collection) ImmutableList.of(V3_ROUTE), ImmutableSet.of()) + .put((Collection) ImmutableList.of(V3_CLUSTER, V3_ENDPOINT, V3_LISTENER, V3_ROUTE), ImmutableSet.of(V3_CLUSTER_NAME, V3_ROUTE_NAME)) .build(); diff --git a/cache/src/test/java/io/envoyproxy/controlplane/cache/SnapshotResourcesTest.java b/cache/src/test/java/io/envoyproxy/controlplane/cache/SnapshotResourcesTest.java index 62645ac99..6bcf8c269 100644 --- a/cache/src/test/java/io/envoyproxy/controlplane/cache/SnapshotResourcesTest.java +++ b/cache/src/test/java/io/envoyproxy/controlplane/cache/SnapshotResourcesTest.java @@ -15,8 +15,10 @@ public class SnapshotResourcesTest { private static final String CLUSTER0_NAME = "cluster0"; private static final String CLUSTER1_NAME = "cluster1"; - private static final Cluster CLUSTER0 = TestResources.createCluster(CLUSTER0_NAME); - private static final Cluster CLUSTER1 = TestResources.createCluster(CLUSTER1_NAME); + private static final SnapshotResource CLUSTER0 = SnapshotResource.create( + TestResources.createCluster(CLUSTER0_NAME), UUID.randomUUID().toString()); + private static final SnapshotResource CLUSTER1 = SnapshotResource.create( + TestResources.createCluster(CLUSTER1_NAME), UUID.randomUUID().toString()); @Test public void createBuildsResourcesMapWithNameAndPopulatesVersion() { diff --git a/cache/src/test/java/io/envoyproxy/controlplane/cache/v2/SimpleCacheTest.java b/cache/src/test/java/io/envoyproxy/controlplane/cache/v2/SimpleCacheTest.java index b24cdec51..bb8341054 100644 --- a/cache/src/test/java/io/envoyproxy/controlplane/cache/v2/SimpleCacheTest.java +++ b/cache/src/test/java/io/envoyproxy/controlplane/cache/v2/SimpleCacheTest.java @@ -10,6 +10,7 @@ import io.envoyproxy.controlplane.cache.NodeGroup; import io.envoyproxy.controlplane.cache.Resources; import io.envoyproxy.controlplane.cache.Response; +import io.envoyproxy.controlplane.cache.SnapshotResource; import io.envoyproxy.controlplane.cache.StatusInfo; import io.envoyproxy.controlplane.cache.Watch; import io.envoyproxy.controlplane.cache.XdsRequest; @@ -44,31 +45,66 @@ public class SimpleCacheTest { private static final String VERSION2 = UUID.randomUUID().toString(); private static final Snapshot SNAPSHOT1 = Snapshot.create( - ImmutableList.of(Cluster.newBuilder().setName(CLUSTER_NAME).build()), - ImmutableList.of(ClusterLoadAssignment.getDefaultInstance()), - ImmutableList.of(Listener.newBuilder().setName(LISTENER_NAME).build()), - ImmutableList.of(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build()), - ImmutableList.of(Secret.newBuilder().setName(SECRET_NAME).build()), + ImmutableList.of(SnapshotResource.create(Cluster.newBuilder().setName(CLUSTER_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(ClusterLoadAssignment.getDefaultInstance(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(Listener.newBuilder().setName(LISTENER_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(Secret.newBuilder().setName(SECRET_NAME).build(), + UUID.randomUUID().toString())), VERSION1); private static final Snapshot SNAPSHOT2 = Snapshot.create( - ImmutableList.of(Cluster.newBuilder().setName(CLUSTER_NAME).build()), - ImmutableList.of(ClusterLoadAssignment.getDefaultInstance()), - ImmutableList.of(Listener.newBuilder().setName(LISTENER_NAME).build()), - ImmutableList.of(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build()), - ImmutableList.of(Secret.newBuilder().setName(SECRET_NAME).build()), + ImmutableList.of(SnapshotResource.create(Cluster.newBuilder().setName(CLUSTER_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(ClusterLoadAssignment.getDefaultInstance(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(Listener.newBuilder().setName(LISTENER_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(Secret.newBuilder().setName(SECRET_NAME).build(), + UUID.randomUUID().toString())), VERSION2); private static final Snapshot MULTIPLE_RESOURCES_SNAPSHOT2 = Snapshot.create( - ImmutableList.of(Cluster.newBuilder().setName(CLUSTER_NAME).build(), - Cluster.newBuilder().setName(SECONDARY_CLUSTER_NAME).build()), - ImmutableList.of(ClusterLoadAssignment.newBuilder().setClusterName(CLUSTER_NAME).build(), - ClusterLoadAssignment.newBuilder().setClusterName(SECONDARY_CLUSTER_NAME).build()), - ImmutableList.of(Listener.newBuilder().setName(LISTENER_NAME).build()), - ImmutableList.of(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build()), - ImmutableList.of(Secret.newBuilder().setName(SECRET_NAME).build()), + ImmutableList.of( + SnapshotResource.create(Cluster.newBuilder().setName(CLUSTER_NAME).build(), UUID.randomUUID().toString()), + SnapshotResource.create(Cluster.newBuilder().setName(SECONDARY_CLUSTER_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of( + SnapshotResource.create(ClusterLoadAssignment.newBuilder().setClusterName(CLUSTER_NAME).build(), + UUID.randomUUID().toString()), + SnapshotResource.create(ClusterLoadAssignment.newBuilder().setClusterName(SECONDARY_CLUSTER_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(Listener.newBuilder().setName(LISTENER_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(Secret.newBuilder().setName(SECRET_NAME).build(), + UUID.randomUUID().toString())), VERSION2); + private static void assertThatWatchIsOpenWithNoResponses(WatchAndTracker watchAndTracker) { + assertThat(watchAndTracker.watch.isCancelled()).isFalse(); + Assertions.assertThat(watchAndTracker.tracker.responses).isEmpty(); + } + + private static void assertThatWatchReceivesSnapshot(WatchAndTracker watchAndTracker, Snapshot snapshot) { + Assertions.assertThat(watchAndTracker.tracker.responses).isNotEmpty(); + + Response response = watchAndTracker.tracker.responses.getFirst(); + + assertThat(response).isNotNull(); + assertThat(response.version()).isEqualTo(snapshot.version(watchAndTracker.watch.request().getTypeUrl())); + assertThat(response.resources().toArray(new Message[0])) + .containsExactlyElementsOf(snapshot.resources(watchAndTracker.watch.request().getTypeUrl()).values() + .stream().map(SnapshotResource::resource).collect(Collectors.toList())); + } + @Test public void invalidNamesListShouldReturnWatcherWithNoResponseInAdsMode() { SimpleCache cache = new SimpleCache<>(new SingleNodeGroup()); @@ -417,7 +453,7 @@ public void watchIsLeftOpenIfNotRespondedImmediately() { .setNode(Node.getDefaultInstance()) .setTypeUrl(ROUTE_TYPE_URL) .addAllResourceNames(Collections.singleton(ROUTE_NAME)) - .build()), + .build()), Collections.singleton(ROUTE_NAME), responseTracker); @@ -458,7 +494,8 @@ public void clearSnapshotWithWatches() { .setVersionInfo(SNAPSHOT1.version(CLUSTER_TYPE_URL)) .build()), Collections.emptySet(), - r -> { }); + r -> { + }); // clearSnapshot should fail and the snapshot should be left untouched assertThat(cache.clearSnapshot(SingleNodeGroup.GROUP)).isFalse(); @@ -484,27 +521,12 @@ public void groups() { .setTypeUrl(CLUSTER_TYPE_URL) .build()), Collections.emptySet(), - r -> { }); + r -> { + }); assertThat(cache.groups()).containsExactly(SingleNodeGroup.GROUP); } - private static void assertThatWatchIsOpenWithNoResponses(WatchAndTracker watchAndTracker) { - assertThat(watchAndTracker.watch.isCancelled()).isFalse(); - Assertions.assertThat(watchAndTracker.tracker.responses).isEmpty(); - } - - private static void assertThatWatchReceivesSnapshot(WatchAndTracker watchAndTracker, Snapshot snapshot) { - Assertions.assertThat(watchAndTracker.tracker.responses).isNotEmpty(); - - Response response = watchAndTracker.tracker.responses.getFirst(); - - assertThat(response).isNotNull(); - assertThat(response.version()).isEqualTo(snapshot.version(watchAndTracker.watch.request().getTypeUrl())); - assertThat(response.resources().toArray(new Message[0])) - .containsExactlyElementsOf(snapshot.resources(watchAndTracker.watch.request().getTypeUrl()).values()); - } - private static class ResponseTracker implements Consumer { private final LinkedList responses = new LinkedList<>(); @@ -520,7 +542,8 @@ private static class ResponseOrderTracker implements Consumer { private final LinkedList responseTypes = new LinkedList<>(); - @Override public void accept(Response response) { + @Override + public void accept(Response response) { responseTypes.add(response.request().getTypeUrl()); } } diff --git a/cache/src/test/java/io/envoyproxy/controlplane/cache/v2/SnapshotTest.java b/cache/src/test/java/io/envoyproxy/controlplane/cache/v2/SnapshotTest.java index 293e63783..cd2c00017 100644 --- a/cache/src/test/java/io/envoyproxy/controlplane/cache/v2/SnapshotTest.java +++ b/cache/src/test/java/io/envoyproxy/controlplane/cache/v2/SnapshotTest.java @@ -10,15 +10,14 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.google.common.collect.ImmutableList; -import com.google.protobuf.Message; import io.envoyproxy.controlplane.cache.SnapshotConsistencyException; +import io.envoyproxy.controlplane.cache.SnapshotResource; import io.envoyproxy.controlplane.cache.TestResources; import io.envoyproxy.envoy.api.v2.Cluster; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; import io.envoyproxy.envoy.api.v2.Listener; import io.envoyproxy.envoy.api.v2.RouteConfiguration; import io.envoyproxy.envoy.api.v2.auth.Secret; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import org.junit.Test; @@ -34,12 +33,17 @@ public class SnapshotTest { private static final int ENDPOINT_PORT = ThreadLocalRandom.current().nextInt(10000, 20000); private static final int LISTENER_PORT = ThreadLocalRandom.current().nextInt(20000, 30000); - private static final Cluster CLUSTER = TestResources.createCluster(CLUSTER_NAME); - private static final ClusterLoadAssignment ENDPOINT = TestResources.createEndpoint(CLUSTER_NAME, ENDPOINT_PORT); - private static final Listener LISTENER = TestResources.createListener(ADS, V2, V2, - LISTENER_NAME, LISTENER_PORT, ROUTE_NAME); - private static final RouteConfiguration ROUTE = TestResources.createRoute(ROUTE_NAME, CLUSTER_NAME); - private static final Secret SECRET = TestResources.createSecret(SECRET_NAME); + private static final SnapshotResource CLUSTER = + SnapshotResource.create(TestResources.createCluster(CLUSTER_NAME), UUID.randomUUID().toString()); + private static final SnapshotResource ENDPOINT = + SnapshotResource.create(TestResources.createEndpoint(CLUSTER_NAME, ENDPOINT_PORT), UUID.randomUUID().toString()); + private static final SnapshotResource LISTENER = + SnapshotResource.create(TestResources.createListener(ADS, V2, V2, + LISTENER_NAME, LISTENER_PORT, ROUTE_NAME), UUID.randomUUID().toString()); + private static final SnapshotResource ROUTE = + SnapshotResource.create(TestResources.createRoute(ROUTE_NAME, CLUSTER_NAME), UUID.randomUUID().toString()); + private static final SnapshotResource SECRET = + SnapshotResource.create(TestResources.createSecret(SECRET_NAME), UUID.randomUUID().toString()); @Test public void createSingleVersionSetsResourcesCorrectly() { @@ -89,7 +93,7 @@ public void createSeparateVersionsSetsResourcesCorrectly() { ImmutableList.of(LISTENER), listenersVersion, ImmutableList.of(ROUTE), routesVersion, ImmutableList.of(SECRET), secretsVersion - ); + ); assertThat(snapshot.clusters().resources()) .containsEntry(CLUSTER_NAME, CLUSTER) @@ -127,19 +131,19 @@ public void resourcesReturnsExpectedResources() { // We have to do some lame casting to appease java's compiler, otherwise it fails to compile due to limitations with // generic type constraints. - assertThat((Map) snapshot.resources(CLUSTER_TYPE_URL)) + assertThat(snapshot.resources(CLUSTER_TYPE_URL)) .containsEntry(CLUSTER_NAME, CLUSTER) .hasSize(1); - assertThat((Map) snapshot.resources(ENDPOINT_TYPE_URL)) + assertThat(snapshot.resources(ENDPOINT_TYPE_URL)) .containsEntry(CLUSTER_NAME, ENDPOINT) .hasSize(1); - assertThat((Map) snapshot.resources(LISTENER_TYPE_URL)) + assertThat(snapshot.resources(LISTENER_TYPE_URL)) .containsEntry(LISTENER_NAME, LISTENER) .hasSize(1); - assertThat((Map) snapshot.resources(ROUTE_TYPE_URL)) + assertThat(snapshot.resources(ROUTE_TYPE_URL)) .containsEntry(ROUTE_NAME, ROUTE) .hasSize(1); @@ -228,7 +232,9 @@ public void ensureConsistentThrowsIfEndpointOrRouteNamesMismatch() { Snapshot snapshot1 = Snapshot.create( ImmutableList.of(CLUSTER), - ImmutableList.of(TestResources.createEndpoint(otherClusterName, ENDPOINT_PORT)), + ImmutableList.of( + SnapshotResource.create(TestResources.createEndpoint(otherClusterName, ENDPOINT_PORT), + UUID.randomUUID().toString())), ImmutableList.of(LISTENER), ImmutableList.of(ROUTE), ImmutableList.of(SECRET), @@ -247,7 +253,9 @@ public void ensureConsistentThrowsIfEndpointOrRouteNamesMismatch() { ImmutableList.of(CLUSTER), ImmutableList.of(ENDPOINT), ImmutableList.of(LISTENER), - ImmutableList.of(TestResources.createRoute(otherRouteName, CLUSTER_NAME)), + ImmutableList.of( + SnapshotResource.create(TestResources.createRoute(otherRouteName, CLUSTER_NAME), + UUID.randomUUID().toString())), ImmutableList.of(SECRET), UUID.randomUUID().toString()); diff --git a/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SimpleCacheTest.java b/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SimpleCacheTest.java index 87a41d1c3..72202516b 100644 --- a/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SimpleCacheTest.java +++ b/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SimpleCacheTest.java @@ -10,6 +10,7 @@ import io.envoyproxy.controlplane.cache.NodeGroup; import io.envoyproxy.controlplane.cache.Resources; import io.envoyproxy.controlplane.cache.Response; +import io.envoyproxy.controlplane.cache.SnapshotResource; import io.envoyproxy.controlplane.cache.StatusInfo; import io.envoyproxy.controlplane.cache.Watch; import io.envoyproxy.controlplane.cache.XdsRequest; @@ -44,31 +45,66 @@ public class SimpleCacheTest { private static final String VERSION2 = UUID.randomUUID().toString(); private static final Snapshot SNAPSHOT1 = Snapshot.create( - ImmutableList.of(Cluster.newBuilder().setName(CLUSTER_NAME).build()), - ImmutableList.of(ClusterLoadAssignment.getDefaultInstance()), - ImmutableList.of(Listener.newBuilder().setName(LISTENER_NAME).build()), - ImmutableList.of(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build()), - ImmutableList.of(Secret.newBuilder().setName(SECRET_NAME).build()), + ImmutableList.of(SnapshotResource.create(Cluster.newBuilder().setName(CLUSTER_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(ClusterLoadAssignment.getDefaultInstance(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(Listener.newBuilder().setName(LISTENER_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(Secret.newBuilder().setName(SECRET_NAME).build(), + UUID.randomUUID().toString())), VERSION1); private static final Snapshot SNAPSHOT2 = Snapshot.create( - ImmutableList.of(Cluster.newBuilder().setName(CLUSTER_NAME).build()), - ImmutableList.of(ClusterLoadAssignment.getDefaultInstance()), - ImmutableList.of(Listener.newBuilder().setName(LISTENER_NAME).build()), - ImmutableList.of(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build()), - ImmutableList.of(Secret.newBuilder().setName(SECRET_NAME).build()), + ImmutableList.of(SnapshotResource.create(Cluster.newBuilder().setName(CLUSTER_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(ClusterLoadAssignment.getDefaultInstance(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(Listener.newBuilder().setName(LISTENER_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(Secret.newBuilder().setName(SECRET_NAME).build(), + UUID.randomUUID().toString())), VERSION2); private static final Snapshot MULTIPLE_RESOURCES_SNAPSHOT2 = Snapshot.create( - ImmutableList.of(Cluster.newBuilder().setName(CLUSTER_NAME).build(), - Cluster.newBuilder().setName(SECONDARY_CLUSTER_NAME).build()), - ImmutableList.of(ClusterLoadAssignment.newBuilder().setClusterName(CLUSTER_NAME).build(), - ClusterLoadAssignment.newBuilder().setClusterName(SECONDARY_CLUSTER_NAME).build()), - ImmutableList.of(Listener.newBuilder().setName(LISTENER_NAME).build()), - ImmutableList.of(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build()), - ImmutableList.of(Secret.newBuilder().setName(SECRET_NAME).build()), + ImmutableList.of( + SnapshotResource.create(Cluster.newBuilder().setName(CLUSTER_NAME).build(), UUID.randomUUID().toString()), + SnapshotResource.create(Cluster.newBuilder().setName(SECONDARY_CLUSTER_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of( + SnapshotResource.create(ClusterLoadAssignment.newBuilder().setClusterName(CLUSTER_NAME).build(), + UUID.randomUUID().toString()), + SnapshotResource.create(ClusterLoadAssignment.newBuilder().setClusterName(SECONDARY_CLUSTER_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(Listener.newBuilder().setName(LISTENER_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(RouteConfiguration.newBuilder().setName(ROUTE_NAME).build(), + UUID.randomUUID().toString())), + ImmutableList.of(SnapshotResource.create(Secret.newBuilder().setName(SECRET_NAME).build(), + UUID.randomUUID().toString())), VERSION2); + private static void assertThatWatchIsOpenWithNoResponses(WatchAndTracker watchAndTracker) { + assertThat(watchAndTracker.watch.isCancelled()).isFalse(); + Assertions.assertThat(watchAndTracker.tracker.responses).isEmpty(); + } + + private static void assertThatWatchReceivesSnapshot(WatchAndTracker watchAndTracker, Snapshot snapshot) { + Assertions.assertThat(watchAndTracker.tracker.responses).isNotEmpty(); + + Response response = watchAndTracker.tracker.responses.getFirst(); + + assertThat(response).isNotNull(); + assertThat(response.version()).isEqualTo(snapshot.version(watchAndTracker.watch.request().getTypeUrl())); + assertThat(response.resources().toArray(new Message[0])) + .containsExactlyElementsOf(snapshot.resources(watchAndTracker.watch.request().getTypeUrl()).values() + .stream().map(SnapshotResource::resource).collect(Collectors.toList())); + } + @Test public void invalidNamesListShouldReturnWatcherWithNoResponseInAdsMode() { SimpleCache cache = new SimpleCache<>(new SingleNodeGroup()); @@ -420,7 +456,7 @@ public void watchIsLeftOpenIfNotRespondedImmediately() { .setNode(Node.getDefaultInstance()) .setTypeUrl(ROUTE_TYPE_URL) .addAllResourceNames(Collections.singleton(ROUTE_NAME)) - .build()), + .build()), Collections.singleton(ROUTE_NAME), responseTracker); @@ -461,7 +497,8 @@ public void clearSnapshotWithWatches() { .setVersionInfo(SNAPSHOT1.version(CLUSTER_TYPE_URL)) .build()), Collections.emptySet(), - r -> { }); + r -> { + }); // clearSnapshot should fail and the snapshot should be left untouched assertThat(cache.clearSnapshot(SingleNodeGroup.GROUP)).isFalse(); @@ -487,27 +524,12 @@ public void groups() { .setTypeUrl(CLUSTER_TYPE_URL) .build()), Collections.emptySet(), - r -> { }); + r -> { + }); assertThat(cache.groups()).containsExactly(SingleNodeGroup.GROUP); } - private static void assertThatWatchIsOpenWithNoResponses(WatchAndTracker watchAndTracker) { - assertThat(watchAndTracker.watch.isCancelled()).isFalse(); - Assertions.assertThat(watchAndTracker.tracker.responses).isEmpty(); - } - - private static void assertThatWatchReceivesSnapshot(WatchAndTracker watchAndTracker, Snapshot snapshot) { - Assertions.assertThat(watchAndTracker.tracker.responses).isNotEmpty(); - - Response response = watchAndTracker.tracker.responses.getFirst(); - - assertThat(response).isNotNull(); - assertThat(response.version()).isEqualTo(snapshot.version(watchAndTracker.watch.request().getTypeUrl())); - assertThat(response.resources().toArray(new Message[0])) - .containsExactlyElementsOf(snapshot.resources(watchAndTracker.watch.request().getTypeUrl()).values()); - } - private static class ResponseTracker implements Consumer { private final LinkedList responses = new LinkedList<>(); @@ -523,7 +545,8 @@ private static class ResponseOrderTracker implements Consumer { private final LinkedList responseTypes = new LinkedList<>(); - @Override public void accept(Response response) { + @Override + public void accept(Response response) { responseTypes.add(response.request().getTypeUrl()); } } diff --git a/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SnapshotTest.java b/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SnapshotTest.java index c99b0fae4..dc1942cc8 100644 --- a/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SnapshotTest.java +++ b/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SnapshotTest.java @@ -10,15 +10,14 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.google.common.collect.ImmutableList; -import com.google.protobuf.Message; import io.envoyproxy.controlplane.cache.SnapshotConsistencyException; +import io.envoyproxy.controlplane.cache.SnapshotResource; import io.envoyproxy.controlplane.cache.TestResources; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import org.junit.Test; @@ -33,14 +32,18 @@ public class SnapshotTest { private static final int ENDPOINT_PORT = ThreadLocalRandom.current().nextInt(10000, 20000); private static final int LISTENER_PORT = ThreadLocalRandom.current().nextInt(20000, 30000); - private static final Cluster CLUSTER = TestResources.createClusterV3(CLUSTER_NAME); - private static final ClusterLoadAssignment - ENDPOINT = TestResources.createEndpointV3(CLUSTER_NAME, ENDPOINT_PORT); - private static final Listener - LISTENER = TestResources.createListenerV3(ADS, V3, V3, LISTENER_NAME, LISTENER_PORT, ROUTE_NAME); - private static final RouteConfiguration ROUTE = TestResources.createRouteV3(ROUTE_NAME, - CLUSTER_NAME); - private static final Secret SECRET = TestResources.createSecretV3(SECRET_NAME); + private static final SnapshotResource CLUSTER = + SnapshotResource.create(TestResources.createClusterV3(CLUSTER_NAME), UUID.randomUUID().toString()); + private static final SnapshotResource ENDPOINT = + SnapshotResource.create(TestResources.createEndpointV3(CLUSTER_NAME, ENDPOINT_PORT), + UUID.randomUUID().toString()); + private static final SnapshotResource LISTENER = + SnapshotResource.create(TestResources.createListenerV3(ADS, V3, V3, + LISTENER_NAME, LISTENER_PORT, ROUTE_NAME), UUID.randomUUID().toString()); + private static final SnapshotResource ROUTE = + SnapshotResource.create(TestResources.createRouteV3(ROUTE_NAME, CLUSTER_NAME), UUID.randomUUID().toString()); + private static final SnapshotResource SECRET = + SnapshotResource.create(TestResources.createSecretV3(SECRET_NAME), UUID.randomUUID().toString()); @Test public void createSingleVersionSetsResourcesCorrectly() { @@ -129,19 +132,19 @@ public void resourcesReturnsExpectedResources() { // due to limitations with // generic type constraints. - assertThat((Map) snapshot.resources(CLUSTER_TYPE_URL)) + assertThat(snapshot.resources(CLUSTER_TYPE_URL)) .containsEntry(CLUSTER_NAME, CLUSTER) .hasSize(1); - assertThat((Map) snapshot.resources(ENDPOINT_TYPE_URL)) + assertThat(snapshot.resources(ENDPOINT_TYPE_URL)) .containsEntry(CLUSTER_NAME, ENDPOINT) .hasSize(1); - assertThat((Map) snapshot.resources(LISTENER_TYPE_URL)) + assertThat(snapshot.resources(LISTENER_TYPE_URL)) .containsEntry(LISTENER_NAME, LISTENER) .hasSize(1); - assertThat((Map) snapshot.resources(ROUTE_TYPE_URL)) + assertThat(snapshot.resources(ROUTE_TYPE_URL)) .containsEntry(ROUTE_NAME, ROUTE) .hasSize(1); @@ -230,7 +233,8 @@ public void ensureConsistentThrowsIfEndpointOrRouteNamesMismatch() { Snapshot snapshot1 = Snapshot.create( ImmutableList.of(CLUSTER), - ImmutableList.of(TestResources.createEndpointV3(otherClusterName, ENDPOINT_PORT)), + ImmutableList.of(SnapshotResource.create(TestResources.createEndpointV3(otherClusterName, ENDPOINT_PORT), + UUID.randomUUID().toString())), ImmutableList.of(LISTENER), ImmutableList.of(ROUTE), ImmutableList.of(SECRET), @@ -249,7 +253,8 @@ public void ensureConsistentThrowsIfEndpointOrRouteNamesMismatch() { ImmutableList.of(CLUSTER), ImmutableList.of(ENDPOINT), ImmutableList.of(LISTENER), - ImmutableList.of(TestResources.createRouteV3(otherRouteName, CLUSTER_NAME)), + ImmutableList.of(SnapshotResource.create(TestResources.createRouteV3(otherRouteName, CLUSTER_NAME), + UUID.randomUUID().toString())), ImmutableList.of(SECRET), UUID.randomUUID().toString()); diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/AdsDeltaDiscoveryRequestStreamObserver.java b/server/src/main/java/io/envoyproxy/controlplane/server/AdsDeltaDiscoveryRequestStreamObserver.java new file mode 100644 index 000000000..f5b3cbaaf --- /dev/null +++ b/server/src/main/java/io/envoyproxy/controlplane/server/AdsDeltaDiscoveryRequestStreamObserver.java @@ -0,0 +1,163 @@ +package io.envoyproxy.controlplane.server; + +import static io.envoyproxy.controlplane.server.DiscoveryServer.ANY_TYPE_URL; + +import com.google.common.base.Preconditions; +import io.envoyproxy.controlplane.cache.DeltaWatch; +import io.envoyproxy.controlplane.cache.Resources; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.function.Supplier; + +/** + * {@code AdsDeltaDiscoveryRequestStreamObserver} is an implementation of {@link DeltaDiscoveryRequestStreamObserver} + * tailored for ADS streams, which handle multiple watches for all TYPE_URLS. + */ + +public class AdsDeltaDiscoveryRequestStreamObserver extends DeltaDiscoveryRequestStreamObserver { + private final ConcurrentMap watches; + private final ConcurrentMap latestVersion; + private final ConcurrentMap> responses; + // tracked and pending resources are always accessed in the observer + // so they are safe from race, no need for concurrent map + private final Map> trackedResourceMap; + private final Map> pendingResourceMap; + + AdsDeltaDiscoveryRequestStreamObserver(StreamObserver responseObserver, + long streamId, + Executor executor, + DiscoveryServer discoveryServer) { + super(ANY_TYPE_URL, responseObserver, streamId, executor, discoveryServer); + + Preconditions.checkState(Resources.V2.TYPE_URLS.size() == Resources.V3.TYPE_URLS.size()); + this.watches = new ConcurrentHashMap<>(Resources.V2.TYPE_URLS.size()); + this.latestVersion = new ConcurrentHashMap<>(Resources.V2.TYPE_URLS.size()); + this.responses = new ConcurrentHashMap<>(Resources.V2.TYPE_URLS.size()); + this.trackedResourceMap = new HashMap<>(Resources.V2.TYPE_URLS.size()); + this.pendingResourceMap = new HashMap<>(Resources.V2.TYPE_URLS.size()); + } + + @Override + public void onNext(V request) { + if (discoveryServer.wrapDeltaXdsRequest(request).getTypeUrl().isEmpty()) { + closeWithError( + Status.UNKNOWN + .withDescription(String.format("[%d] type URL is required for ADS", streamId)) + .asRuntimeException()); + + return; + } + + super.onNext(request); + } + + @Override + void cancel() { + watches.values().forEach(DeltaWatch::cancel); + } + + @Override + boolean ads() { + return true; + } + + @Override + void setLatestVersion(String typeUrl, String version) { + latestVersion.put(typeUrl, version); + if (typeUrl.equals(Resources.V2.CLUSTER_TYPE_URL) || typeUrl.equals(Resources.V3.CLUSTER_TYPE_URL)) { + hasClusterChanged = true; + } else if (typeUrl.equals(Resources.V2.ENDPOINT_TYPE_URL) || typeUrl.equals(Resources.V3.ENDPOINT_TYPE_URL)) { + hasClusterChanged = false; + } + } + + @Override + String latestVersion(String typeUrl) { + return latestVersion.get(typeUrl); + } + + @Override + void setResponse(String typeUrl, String nonce, LatestDeltaDiscoveryResponse response) { + responses.computeIfAbsent(typeUrl, s -> new ConcurrentHashMap<>()) + .put(nonce, response); + } + + @Override + LatestDeltaDiscoveryResponse clearResponse(String typeUrl, String nonce) { + return responses.computeIfAbsent(typeUrl, s -> new ConcurrentHashMap<>()) + .remove(nonce); + } + + @Override + int responseCount(String typeUrl) { + return responses.computeIfAbsent(typeUrl, s -> new ConcurrentHashMap<>()) + .size(); + } + + @Override + Map resourceVersions(String typeUrl) { + return trackedResourceMap.getOrDefault(typeUrl, Collections.emptyMap()); + } + + @Override + Set pendingResources(String typeUrl) { + return pendingResourceMap.getOrDefault(typeUrl, Collections.emptySet()); + } + + @Override + boolean isWildcard(String typeUrl) { + return typeUrl.equals(Resources.V2.CLUSTER_TYPE_URL) + || typeUrl.equals(Resources.V3.CLUSTER_TYPE_URL) + || typeUrl.equals(Resources.V2.LISTENER_TYPE_URL) + || typeUrl.equals(Resources.V3.LISTENER_TYPE_URL); + } + + @Override + void updateTrackedResources(String typeUrl, + Map resourcesVersions, + List removedResources) { + + Map trackedResources = trackedResourceMap.computeIfAbsent(typeUrl, s -> new HashMap<>()); + Set pendingResources = pendingResourceMap.computeIfAbsent(typeUrl, s -> new HashSet<>()); + resourcesVersions.forEach((k, v) -> { + trackedResources.put(k, v); + pendingResources.remove(k); + }); + removedResources.forEach(trackedResources::remove); + } + + @Override + void updateSubscriptions(String typeUrl, + List resourceNamesSubscribe, + List resourceNamesUnsubscribe) { + + Map trackedResources = trackedResourceMap.computeIfAbsent(typeUrl, s -> new HashMap<>()); + Set pendingResources = pendingResourceMap.computeIfAbsent(typeUrl, s -> new HashSet<>()); + // unsubscribe first + resourceNamesUnsubscribe.forEach(s -> { + trackedResources.remove(s); + pendingResources.remove(s); + }); + pendingResources.addAll(resourceNamesSubscribe); + } + + @Override + void computeWatch(String typeUrl, Supplier watchCreator) { + watches.compute(typeUrl, (s, watch) -> { + if (watch != null) { + watch.cancel(); + } + + return watchCreator.get(); + }); + } +} diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/AdsDiscoveryRequestStreamObserver.java b/server/src/main/java/io/envoyproxy/controlplane/server/AdsDiscoveryRequestStreamObserver.java index f408923ba..c9900db8d 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/AdsDiscoveryRequestStreamObserver.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/AdsDiscoveryRequestStreamObserver.java @@ -22,19 +22,17 @@ public class AdsDiscoveryRequestStreamObserver extends DiscoveryRequestStr private final ConcurrentMap watches; private final ConcurrentMap latestResponse; private final ConcurrentMap> ackedResources; - private final DiscoveryServer discoveryServer; AdsDiscoveryRequestStreamObserver(StreamObserver responseObserver, - long streamId, - Executor executor, - DiscoveryServer discoveryServer) { + long streamId, + Executor executor, + DiscoveryServer discoveryServer) { super(ANY_TYPE_URL, responseObserver, streamId, executor, discoveryServer); Preconditions.checkState(Resources.V2.TYPE_URLS.size() == Resources.V3.TYPE_URLS.size()); this.watches = new ConcurrentHashMap<>(Resources.V2.TYPE_URLS.size()); this.latestResponse = new ConcurrentHashMap<>(Resources.V2.TYPE_URLS.size()); this.ackedResources = new ConcurrentHashMap<>(Resources.V2.TYPE_URLS.size()); - this.discoveryServer = discoveryServer; } @Override diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/DeltaDiscoveryRequestStreamObserver.java b/server/src/main/java/io/envoyproxy/controlplane/server/DeltaDiscoveryRequestStreamObserver.java new file mode 100644 index 000000000..1bb59a7df --- /dev/null +++ b/server/src/main/java/io/envoyproxy/controlplane/server/DeltaDiscoveryRequestStreamObserver.java @@ -0,0 +1,246 @@ +package io.envoyproxy.controlplane.server; + +import io.envoyproxy.controlplane.cache.DeltaResponse; +import io.envoyproxy.controlplane.cache.DeltaWatch; +import io.envoyproxy.controlplane.cache.DeltaXdsRequest; +import io.envoyproxy.controlplane.cache.Resources; +import io.envoyproxy.controlplane.server.exception.RequestException; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code DeltaDiscoveryRequestStreamObserver} provides the base implementation for Delta XDS stream handling. + * The proto message types are abstracted so it can be used with different xDS versions. + */ +public abstract class DeltaDiscoveryRequestStreamObserver implements StreamObserver { + private static final AtomicLongFieldUpdater streamNonceUpdater = + AtomicLongFieldUpdater.newUpdater(DeltaDiscoveryRequestStreamObserver.class, "streamNonce"); + private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class); + + final long streamId; + private final String defaultTypeUrl; + private final StreamObserver responseObserver; + private final Executor executor; + volatile boolean hasClusterChanged; + final DiscoveryServer discoveryServer; + private volatile long streamNonce; + private volatile boolean isClosing; + + DeltaDiscoveryRequestStreamObserver(String defaultTypeUrl, + StreamObserver responseObserver, + long streamId, + Executor executor, + DiscoveryServer discoveryServer) { + this.defaultTypeUrl = defaultTypeUrl; + this.responseObserver = responseObserver; + this.streamId = streamId; + this.executor = executor; + this.streamNonce = 0; + this.discoveryServer = discoveryServer; + this.hasClusterChanged = false; + } + + @Override + public void onNext(V rawRequest) { + DeltaXdsRequest request = discoveryServer.wrapDeltaXdsRequest(rawRequest); + String requestTypeUrl = request.getTypeUrl().isEmpty() ? defaultTypeUrl : request.getTypeUrl(); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("[{}] request {}[{}] with nonce {} from versions {}", + streamId, + requestTypeUrl, + String.join(", ", request.getResourceNamesSubscribeList()), + request.getResponseNonce(), + request.getInitialResourceVersionsMap()); + } + + try { + discoveryServer.runStreamDeltaRequestCallbacks(streamId, rawRequest); + } catch (RequestException e) { + closeWithError(e); + return; + } + + final String version; + if (latestVersion(requestTypeUrl) == null) { + version = ""; + } else { + version = latestVersion(requestTypeUrl); + } + + // always update subscriptions + updateSubscriptions(requestTypeUrl, + request.getResourceNamesSubscribeList(), + request.getResourceNamesUnsubscribeList()); + + if (!request.getResponseNonce().isEmpty()) { + // envoy is replying to a response we sent, get and clear respective response + LatestDeltaDiscoveryResponse response = clearResponse(requestTypeUrl, request.getResponseNonce()); + if (!request.hasErrorDetail()) { + // if envoy has acked, update tracked resources + // from the corresponding response + updateTrackedResources(requestTypeUrl, + response.resourceVersions(), + response.removedResources()); + } + } + + // if nonce is empty, envoy is only requesting new resources or this is a new connection, + // in either case we have already updated the subscriptions, + // but we should only create watches when there's no pending ack + // this tries to ensure we don't have two outstanding responses + if (responseCount(requestTypeUrl) == 0) { + computeWatch(requestTypeUrl, () -> discoveryServer.configWatcher.createDeltaWatch( + request, + version, + resourceVersions(requestTypeUrl), + pendingResources(requestTypeUrl), + isWildcard(requestTypeUrl), + r -> executor.execute(() -> send(r, requestTypeUrl)), + hasClusterChanged + )); + } + } + + @Override + public void onError(Throwable t) { + if (!Status.fromThrowable(t).getCode().equals(Status.CANCELLED.getCode())) { + LOGGER.error("[{}] stream closed with error", streamId, t); + } + + try { + discoveryServer.callbacks.forEach(cb -> cb.onStreamCloseWithError(streamId, defaultTypeUrl, t)); + closeWithError(Status.fromThrowable(t).asException()); + } finally { + cancel(); + } + } + + @Override + public void onCompleted() { + LOGGER.debug("[{}] stream closed", streamId); + + try { + discoveryServer.callbacks.forEach(cb -> cb.onStreamClose(streamId, defaultTypeUrl)); + synchronized (responseObserver) { + if (!isClosing) { + isClosing = true; + responseObserver.onCompleted(); + } + } + } finally { + cancel(); + } + } + + void onCancelled() { + LOGGER.info("[{}] stream cancelled", streamId); + cancel(); + } + + void closeWithError(Throwable exception) { + synchronized (responseObserver) { + if (!isClosing) { + isClosing = true; + responseObserver.onError(exception); + } + } + cancel(); + } + + private void send(DeltaResponse response, String typeUrl) { + String nonce = Long.toString(streamNonceUpdater.getAndIncrement(this)); + List resources = response.resources() + .entrySet() + .stream() + .map(entry -> discoveryServer.makeResource(entry.getKey(), + entry.getValue().version(), + discoveryServer.protoResourcesSerializer.serialize( + entry.getValue().resource(), + Resources.getResourceApiVersion(typeUrl)))) + .collect(Collectors.toList()); + + X discoveryResponse = discoveryServer.makeDeltaResponse( + typeUrl, + response.version(), + nonce, + resources, + response.removedResources() + ); + + LOGGER.debug("[{}] response {} with nonce {} version {}", streamId, typeUrl, nonce, + response.version()); + + discoveryServer.runStreamDeltaResponseCallbacks(streamId, response.request(), discoveryResponse); + + // Store the latest response *before* we send the response. This ensures that by the time the request + // is processed the map is guaranteed to be updated. Doing it afterwards leads to a race conditions + // which may see the incoming request arrive before the map is updated, failing the nonce check erroneously. + setResponse( + typeUrl, + nonce, + LatestDeltaDiscoveryResponse.create( + nonce, + response.version(), + response.resources() + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().version())), + response.removedResources() + ) + ); + setLatestVersion(typeUrl, response.version()); + + synchronized (responseObserver) { + if (!isClosing) { + try { + responseObserver.onNext(discoveryResponse); + } catch (StatusRuntimeException e) { + if (!Status.CANCELLED.getCode().equals(e.getStatus().getCode())) { + throw e; + } + } + } + } + } + + abstract void cancel(); + + abstract boolean ads(); + + abstract void setLatestVersion(String typeUrl, String version); + + abstract String latestVersion(String typeUrl); + + abstract void setResponse(String typeUrl, String nonce, LatestDeltaDiscoveryResponse response); + + abstract LatestDeltaDiscoveryResponse clearResponse(String typeUrl, String nonce); + + abstract int responseCount(String typeUrl); + + abstract Map resourceVersions(String typeUrl); + + abstract Set pendingResources(String typeUrl); + + abstract boolean isWildcard(String typeUrl); + + abstract void updateTrackedResources(String typeUrl, + Map resourcesVersions, + List removedResources); + + abstract void updateSubscriptions(String typeUrl, + List resourceNamesSubscribe, + List resourceNamesUnsubscribe); + + abstract void computeWatch(String typeUrl, Supplier watchCreator); +} diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryRequestStreamObserver.java b/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryRequestStreamObserver.java index 49df750e9..9788ceaea 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryRequestStreamObserver.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryRequestStreamObserver.java @@ -30,19 +30,19 @@ public abstract class DiscoveryRequestStreamObserver implements StreamObse private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class); final long streamId; - volatile boolean hasClusterChanged; + final DiscoveryServer discoveryServer; private final String defaultTypeUrl; private final StreamObserver responseObserver; private final Executor executor; - private final DiscoveryServer discoveryServer; + volatile boolean hasClusterChanged; private volatile long streamNonce; private volatile boolean isClosing; DiscoveryRequestStreamObserver(String defaultTypeUrl, - StreamObserver responseObserver, - long streamId, - Executor executor, - DiscoveryServer discoveryServer) { + StreamObserver responseObserver, + long streamId, + Executor executor, + DiscoveryServer discoveryServer) { this.defaultTypeUrl = defaultTypeUrl; this.responseObserver = responseObserver; this.streamId = streamId; diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServer.java b/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServer.java index ca028882d..f2efff981 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServer.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServer.java @@ -3,6 +3,7 @@ import com.google.common.base.Preconditions; import com.google.protobuf.Any; import io.envoyproxy.controlplane.cache.ConfigWatcher; +import io.envoyproxy.controlplane.cache.DeltaXdsRequest; import io.envoyproxy.controlplane.cache.XdsRequest; import io.envoyproxy.controlplane.server.serializer.ProtoResourcesSerializer; import io.grpc.stub.ServerCallStreamObserver; @@ -14,7 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class DiscoveryServer { +public abstract class DiscoveryServer { static final String ANY_TYPE_URL = ""; private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class); final List callbacks; @@ -31,9 +32,9 @@ public abstract class DiscoveryServer { * @param protoResourcesSerializer serializer of proto buffer messages */ protected DiscoveryServer(List callbacks, - ConfigWatcher configWatcher, - ExecutorGroup executorGroup, - ProtoResourcesSerializer protoResourcesSerializer) { + ConfigWatcher configWatcher, + ExecutorGroup executorGroup, + ProtoResourcesSerializer protoResourcesSerializer) { Preconditions.checkNotNull(executorGroup, "executorGroup cannot be null"); Preconditions.checkNotNull(callbacks, "callbacks cannot be null"); Preconditions.checkNotNull(configWatcher, "configWatcher cannot be null"); @@ -47,13 +48,24 @@ protected DiscoveryServer(List callbacks, protected abstract XdsRequest wrapXdsRequest(T request); + protected abstract DeltaXdsRequest wrapDeltaXdsRequest(V request); + protected abstract U makeResponse(String version, Collection resources, String typeUrl, - String nonce); + String nonce); + + public abstract X makeDeltaResponse(String typeUrl, String version, String nonce, List resources, + List removedResources); + + protected abstract Y makeResource(String name, String version, Any resource); protected abstract void runStreamRequestCallbacks(long streamId, T request); + protected abstract void runStreamDeltaRequestCallbacks(long streamId, V request); + protected abstract void runStreamResponseCallbacks(long streamId, XdsRequest request, U response); + protected abstract void runStreamDeltaResponseCallbacks(long streamId, DeltaXdsRequest request, X response); + StreamObserver createRequestHandler( StreamObserver responseObserver, boolean ads, @@ -88,4 +100,41 @@ StreamObserver createRequestHandler( return requestStreamObserver; } + + StreamObserver createDeltaRequestHandler( + StreamObserver responseObserver, + boolean ads, + String defaultTypeUrl) { + + long streamId = streamCount.getAndIncrement(); + Executor executor = executorGroup.next(); + + LOGGER.debug("[{}] open stream from {}", streamId, defaultTypeUrl); + + callbacks.forEach(cb -> cb.onStreamOpen(streamId, defaultTypeUrl)); + + final DeltaDiscoveryRequestStreamObserver requestStreamObserver; + if (ads) { + requestStreamObserver = new AdsDeltaDiscoveryRequestStreamObserver<>( + responseObserver, + streamId, + executor, + this + ); + } else { + requestStreamObserver = new XdsDeltaDiscoveryRequestStreamObserver<>( + defaultTypeUrl, + responseObserver, + streamId, + executor, + this + ); + } + + if (responseObserver instanceof ServerCallStreamObserver) { + ((ServerCallStreamObserver) responseObserver).setOnCancelHandler(requestStreamObserver::onCancelled); + } + + return requestStreamObserver; + } } diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServerCallbacks.java b/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServerCallbacks.java index 79a0a07c7..4cfc7f7b2 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServerCallbacks.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServerCallbacks.java @@ -1,6 +1,8 @@ package io.envoyproxy.controlplane.server; import io.envoyproxy.controlplane.server.exception.RequestException; +import io.envoyproxy.envoy.api.v2.DeltaDiscoveryRequest; +import io.envoyproxy.envoy.api.v2.DeltaDiscoveryResponse; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryResponse; @@ -67,7 +69,32 @@ default void onStreamOpen(long streamId, String typeUrl) { * will be returned to the client and the stream will be closed with error. */ void onV3StreamRequest(long streamId, - io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest request); + io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest request); + + /** + * {@code onStreamRequest} is called for each {@link DeltaDiscoveryRequest} that is received on the + * stream. + * + * @param streamId an ID for this stream that is only unique to this discovery server instance + * @param request the discovery request sent by the envoy instance + * + * @throws RequestException optionally can throw {@link RequestException} with custom status. That status + * will be returned to the client and the stream will be closed with error. + */ + void onV2StreamDeltaRequest(long streamId, DeltaDiscoveryRequest request); + + /** + * {@code onV3StreamRequest} is called for each {@link io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest} + * that is received on the stream. + * + * @param streamId an ID for this stream that is only unique to this discovery server instance + * @param request the discovery request sent by the envoy instance + * + * @throws RequestException optionally can throw {@link RequestException} with custom status. That status + * will be returned to the client and the stream will be closed with error. + */ + void onV3StreamDeltaRequest(long streamId, + io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest request); /** * {@code onStreamResponse} is called just before each {@link DiscoveryResponse} that is sent @@ -89,7 +116,31 @@ default void onStreamResponse(long streamId, DiscoveryRequest request, Discovery * @param response the discovery response sent by the discovery server */ default void onV3StreamResponse(long streamId, - io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest request, - io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse response) { + io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest request, + io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse response) { + } + + /** + * {@code onStreamDeltaResponse} is called just before each {@link DeltaDiscoveryResponse} that is sent + * on the stream. + * + * @param streamId an ID for this stream that is only unique to this discovery server instance + * @param request the discovery request sent by the envoy instance + * @param response the discovery response sent by the discovery server + */ + default void onStreamDeltaResponse(long streamId, DeltaDiscoveryRequest request, DeltaDiscoveryResponse response) { + } + + /** + * {@code onV3StreamResponse} is called just before each + * {@link io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse} that is sent on the stream. + * + * @param streamId an ID for this stream that is only unique to this discovery server instance + * @param request the discovery request sent by the envoy instance + * @param response the discovery response sent by the discovery server + */ + default void onV3StreamDeltaResponse(long streamId, + io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest request, + io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse response) { } } diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/LatestDeltaDiscoveryResponse.java b/server/src/main/java/io/envoyproxy/controlplane/server/LatestDeltaDiscoveryResponse.java new file mode 100644 index 000000000..d657cdd72 --- /dev/null +++ b/server/src/main/java/io/envoyproxy/controlplane/server/LatestDeltaDiscoveryResponse.java @@ -0,0 +1,26 @@ +package io.envoyproxy.controlplane.server; + +import com.google.auto.value.AutoValue; +import java.util.List; +import java.util.Map; + +/** + * Class introduces optimization which store only required data during next request. + */ +@AutoValue +public abstract class LatestDeltaDiscoveryResponse { + static LatestDeltaDiscoveryResponse create(String nonce, + String version, + Map resourceVersions, + List removedResources) { + return new AutoValue_LatestDeltaDiscoveryResponse(nonce, version, resourceVersions, removedResources); + } + + abstract String nonce(); + + abstract String version(); + + abstract Map resourceVersions(); + + abstract List removedResources(); +} diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/V2DiscoveryServer.java b/server/src/main/java/io/envoyproxy/controlplane/server/V2DiscoveryServer.java index 1d8616282..f2d9f9d19 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/V2DiscoveryServer.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/V2DiscoveryServer.java @@ -3,15 +3,19 @@ import com.google.common.base.Preconditions; import com.google.protobuf.Any; import io.envoyproxy.controlplane.cache.ConfigWatcher; +import io.envoyproxy.controlplane.cache.DeltaXdsRequest; import io.envoyproxy.controlplane.cache.Resources; import io.envoyproxy.controlplane.cache.XdsRequest; import io.envoyproxy.controlplane.server.serializer.DefaultProtoResourcesSerializer; import io.envoyproxy.controlplane.server.serializer.ProtoResourcesSerializer; import io.envoyproxy.envoy.api.v2.ClusterDiscoveryServiceGrpc; +import io.envoyproxy.envoy.api.v2.DeltaDiscoveryRequest; +import io.envoyproxy.envoy.api.v2.DeltaDiscoveryResponse; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryResponse; import io.envoyproxy.envoy.api.v2.EndpointDiscoveryServiceGrpc; import io.envoyproxy.envoy.api.v2.ListenerDiscoveryServiceGrpc; +import io.envoyproxy.envoy.api.v2.Resource; import io.envoyproxy.envoy.api.v2.RouteDiscoveryServiceGrpc; import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc; import io.envoyproxy.envoy.service.discovery.v2.SecretDiscoveryServiceGrpc; @@ -20,14 +24,15 @@ import java.util.Collections; import java.util.List; -public class V2DiscoveryServer extends DiscoveryServer { +public class V2DiscoveryServer extends DiscoveryServer { public V2DiscoveryServer(ConfigWatcher configWatcher) { this(Collections.emptyList(), configWatcher); } public V2DiscoveryServer(DiscoveryServerCallbacks callbacks, - ConfigWatcher configWatcher) { + ConfigWatcher configWatcher) { this(Collections.singletonList(callbacks), configWatcher); } @@ -39,7 +44,9 @@ public V2DiscoveryServer( } public V2DiscoveryServer(List callbacks, - ConfigWatcher configWatcher, ExecutorGroup executorGroup, ProtoResourcesSerializer protoResourcesSerializer) { + ConfigWatcher configWatcher, + ExecutorGroup executorGroup, + ProtoResourcesSerializer protoResourcesSerializer) { super(callbacks, configWatcher, executorGroup, protoResourcesSerializer); } @@ -54,6 +61,13 @@ public StreamObserver streamAggregatedResources( return createRequestHandler(responseObserver, true, ANY_TYPE_URL); } + + @Override + public StreamObserver deltaAggregatedResources( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, true, ANY_TYPE_URL); + } }; } @@ -68,6 +82,13 @@ public StreamObserver streamClusters( return createRequestHandler(responseObserver, false, Resources.V2.CLUSTER_TYPE_URL); } + + @Override + public StreamObserver deltaClusters( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V2.CLUSTER_TYPE_URL); + } }; } @@ -82,6 +103,13 @@ public StreamObserver streamEndpoints( return createRequestHandler(responseObserver, false, Resources.V2.ENDPOINT_TYPE_URL); } + + @Override + public StreamObserver deltaEndpoints( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V2.ENDPOINT_TYPE_URL); + } }; } @@ -96,6 +124,13 @@ public StreamObserver streamListeners( return createRequestHandler(responseObserver, false, Resources.V2.LISTENER_TYPE_URL); } + + @Override + public StreamObserver deltaListeners( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V2.LISTENER_TYPE_URL); + } }; } @@ -110,6 +145,13 @@ public StreamObserver streamRoutes( return createRequestHandler(responseObserver, false, Resources.V2.ROUTE_TYPE_URL); } + + @Override + public StreamObserver deltaRoutes( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V2.ROUTE_TYPE_URL); + } }; } @@ -121,8 +163,16 @@ public SecretDiscoveryServiceGrpc.SecretDiscoveryServiceImplBase getSecretDiscov @Override public StreamObserver streamSecrets( StreamObserver responseObserver) { + return createRequestHandler(responseObserver, false, Resources.V2.SECRET_TYPE_URL); } + + @Override + public StreamObserver deltaSecrets( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V2.SECRET_TYPE_URL); + } }; } @@ -131,15 +181,26 @@ protected XdsRequest wrapXdsRequest(DiscoveryRequest request) { return XdsRequest.create(request); } + @Override + protected DeltaXdsRequest wrapDeltaXdsRequest(DeltaDiscoveryRequest request) { + return DeltaXdsRequest.create(request); + } + @Override protected void runStreamRequestCallbacks(long streamId, DiscoveryRequest discoveryRequest) { callbacks.forEach( cb -> cb.onV2StreamRequest(streamId, discoveryRequest)); } + @Override + protected void runStreamDeltaRequestCallbacks(long streamId, DeltaDiscoveryRequest request) { + callbacks.forEach( + cb -> cb.onV2StreamDeltaRequest(streamId, request)); + } + @Override protected void runStreamResponseCallbacks(long streamId, XdsRequest request, - DiscoveryResponse discoveryResponse) { + DiscoveryResponse discoveryResponse) { Preconditions.checkArgument(request.v2Request() != null); callbacks.forEach( cb -> cb.onStreamResponse(streamId, @@ -147,10 +208,21 @@ protected void runStreamResponseCallbacks(long streamId, XdsRequest request, discoveryResponse)); } + @Override + protected void runStreamDeltaResponseCallbacks(long streamId, + DeltaXdsRequest request, + DeltaDiscoveryResponse response) { + Preconditions.checkArgument(request.v2Request() != null); + callbacks.forEach( + cb -> cb.onStreamDeltaResponse(streamId, + request.v2Request(), + response)); + } + @Override protected DiscoveryResponse makeResponse(String version, Collection resources, - String typeUrl, - String nonce) { + String typeUrl, + String nonce) { return DiscoveryResponse.newBuilder() .setNonce(nonce) .setVersionInfo(version) @@ -158,4 +230,26 @@ protected DiscoveryResponse makeResponse(String version, Collection resourc .setTypeUrl(typeUrl) .build(); } + + @Override + public DeltaDiscoveryResponse makeDeltaResponse(String typeUrl, String version, String nonce, + List resources, + List removedResources) { + return DeltaDiscoveryResponse.newBuilder() + .setTypeUrl(typeUrl) + .setSystemVersionInfo(version) + .setNonce(nonce) + .addAllResources(resources) + .addAllRemovedResources(removedResources) + .build(); + } + + @Override + protected Resource makeResource(String name, String version, Any resource) { + return Resource.newBuilder() + .setName(name) + .setVersion(version) + .setResource(resource) + .build(); + } } diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/V3DiscoveryServer.java b/server/src/main/java/io/envoyproxy/controlplane/server/V3DiscoveryServer.java index 601c9a059..56347eaaf 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/V3DiscoveryServer.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/V3DiscoveryServer.java @@ -9,25 +9,30 @@ import com.google.common.base.Preconditions; import com.google.protobuf.Any; import io.envoyproxy.controlplane.cache.ConfigWatcher; +import io.envoyproxy.controlplane.cache.DeltaXdsRequest; import io.envoyproxy.controlplane.cache.Resources; import io.envoyproxy.controlplane.cache.XdsRequest; import io.envoyproxy.controlplane.server.serializer.DefaultProtoResourcesSerializer; import io.envoyproxy.controlplane.server.serializer.ProtoResourcesSerializer; import io.envoyproxy.envoy.service.cluster.v3.ClusterDiscoveryServiceGrpc.ClusterDiscoveryServiceImplBase; +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.envoyproxy.envoy.service.discovery.v3.Resource; import io.grpc.stub.StreamObserver; import java.util.Collection; import java.util.Collections; import java.util.List; -public class V3DiscoveryServer extends DiscoveryServer { +public class V3DiscoveryServer extends DiscoveryServer { public V3DiscoveryServer(ConfigWatcher configWatcher) { this(Collections.emptyList(), configWatcher); } public V3DiscoveryServer(DiscoveryServerCallbacks callbacks, - ConfigWatcher configWatcher) { + ConfigWatcher configWatcher) { this(Collections.singletonList(callbacks), configWatcher); } @@ -39,7 +44,9 @@ public V3DiscoveryServer( } public V3DiscoveryServer(List callbacks, - ConfigWatcher configWatcher, ExecutorGroup executorGroup, ProtoResourcesSerializer protoResourcesSerializer) { + ConfigWatcher configWatcher, + ExecutorGroup executorGroup, + ProtoResourcesSerializer protoResourcesSerializer) { super(callbacks, configWatcher, executorGroup, protoResourcesSerializer); } @@ -54,6 +61,13 @@ public StreamObserver streamAggregatedResources( return createRequestHandler(responseObserver, true, ANY_TYPE_URL); } + + @Override + public StreamObserver deltaAggregatedResources( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, true, ANY_TYPE_URL); + } }; } @@ -68,6 +82,13 @@ public StreamObserver streamClusters( return createRequestHandler(responseObserver, false, Resources.V3.CLUSTER_TYPE_URL); } + + @Override + public StreamObserver deltaClusters( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V3.CLUSTER_TYPE_URL); + } }; } @@ -82,6 +103,13 @@ public StreamObserver streamEndpoints( return createRequestHandler(responseObserver, false, Resources.V3.ENDPOINT_TYPE_URL); } + + @Override + public StreamObserver deltaEndpoints( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V3.ENDPOINT_TYPE_URL); + } }; } @@ -96,6 +124,13 @@ public StreamObserver streamListeners( return createRequestHandler(responseObserver, false, Resources.V3.LISTENER_TYPE_URL); } + + @Override + public StreamObserver deltaListeners( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V3.LISTENER_TYPE_URL); + } }; } @@ -110,6 +145,13 @@ public StreamObserver streamRoutes( return createRequestHandler(responseObserver, false, Resources.V3.ROUTE_TYPE_URL); } + + @Override + public StreamObserver deltaRoutes( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V3.ROUTE_TYPE_URL); + } }; } @@ -121,8 +163,16 @@ public SecretDiscoveryServiceImplBase getSecretDiscoveryServiceImpl() { @Override public StreamObserver streamSecrets( StreamObserver responseObserver) { + return createRequestHandler(responseObserver, false, Resources.V3.SECRET_TYPE_URL); } + + @Override + public StreamObserver deltaSecrets( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V3.SECRET_TYPE_URL); + } }; } @@ -131,15 +181,26 @@ protected XdsRequest wrapXdsRequest(DiscoveryRequest request) { return XdsRequest.create(request); } + @Override + protected DeltaXdsRequest wrapDeltaXdsRequest(DeltaDiscoveryRequest request) { + return DeltaXdsRequest.create(request); + } + @Override protected void runStreamRequestCallbacks(long streamId, DiscoveryRequest discoveryRequest) { callbacks.forEach( cb -> cb.onV3StreamRequest(streamId, discoveryRequest)); } + @Override + protected void runStreamDeltaRequestCallbacks(long streamId, DeltaDiscoveryRequest request) { + callbacks.forEach( + cb -> cb.onV3StreamDeltaRequest(streamId, request)); + } + @Override protected void runStreamResponseCallbacks(long streamId, XdsRequest request, - DiscoveryResponse discoveryResponse) { + DiscoveryResponse discoveryResponse) { Preconditions.checkArgument(request.v3Request() != null); callbacks.forEach( cb -> cb.onV3StreamResponse(streamId, @@ -147,10 +208,20 @@ protected void runStreamResponseCallbacks(long streamId, XdsRequest request, discoveryResponse)); } + @Override + protected void runStreamDeltaResponseCallbacks(long streamId, DeltaXdsRequest request, + DeltaDiscoveryResponse response) { + Preconditions.checkArgument(request.v3Request() != null); + callbacks.forEach( + cb -> cb.onV3StreamDeltaResponse(streamId, + request.v3Request(), + response)); + } + @Override protected DiscoveryResponse makeResponse(String version, Collection resources, - String typeUrl, - String nonce) { + String typeUrl, + String nonce) { return DiscoveryResponse.newBuilder() .setNonce(nonce) .setVersionInfo(version) @@ -158,4 +229,26 @@ protected DiscoveryResponse makeResponse(String version, Collection resourc .setTypeUrl(typeUrl) .build(); } + + @Override + public DeltaDiscoveryResponse makeDeltaResponse(String typeUrl, String version, String nonce, + List resources, + List removedResources) { + return DeltaDiscoveryResponse.newBuilder() + .setTypeUrl(typeUrl) + .setSystemVersionInfo(version) + .setNonce(nonce) + .addAllResources(resources) + .addAllRemovedResources(removedResources) + .build(); + } + + @Override + protected Resource makeResource(String name, String version, Any resource) { + return Resource.newBuilder() + .setName(name) + .setVersion(version) + .setResource(resource) + .build(); + } } diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/XdsDeltaDiscoveryRequestStreamObserver.java b/server/src/main/java/io/envoyproxy/controlplane/server/XdsDeltaDiscoveryRequestStreamObserver.java new file mode 100644 index 000000000..da395e515 --- /dev/null +++ b/server/src/main/java/io/envoyproxy/controlplane/server/XdsDeltaDiscoveryRequestStreamObserver.java @@ -0,0 +1,126 @@ +package io.envoyproxy.controlplane.server; + +import io.envoyproxy.controlplane.cache.DeltaWatch; +import io.envoyproxy.controlplane.cache.Resources; +import io.grpc.stub.StreamObserver; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.function.Supplier; + +/** + * {@code XdsDeltaDiscoveryRequestStreamObserver} is a lightweight implementation of + * {@link DeltaDiscoveryRequestStreamObserver} tailored for non-ADS streams which handle a single watch. + */ +public class XdsDeltaDiscoveryRequestStreamObserver extends DeltaDiscoveryRequestStreamObserver { + // tracked is only used in the same thread so it need not be volatile + private final Map trackedResources; + private final Set pendingResources; + private final boolean isWildcard; + private final ConcurrentMap responses; + private volatile DeltaWatch watch; + private volatile String latestVersion; + + XdsDeltaDiscoveryRequestStreamObserver(String defaultTypeUrl, + StreamObserver responseObserver, + long streamId, + Executor executor, + DiscoveryServer discoveryServer) { + super(defaultTypeUrl, responseObserver, streamId, executor, discoveryServer); + this.trackedResources = new HashMap<>(); + this.pendingResources = new HashSet<>(); + this.isWildcard = defaultTypeUrl.equals(Resources.V2.CLUSTER_TYPE_URL) + || defaultTypeUrl.equals(Resources.V3.CLUSTER_TYPE_URL) + || defaultTypeUrl.equals(Resources.V2.LISTENER_TYPE_URL) + || defaultTypeUrl.equals(Resources.V3.LISTENER_TYPE_URL); + this.responses = new ConcurrentHashMap<>(); + } + + @Override + void cancel() { + if (watch != null) { + watch.cancel(); + } + } + + @Override + boolean ads() { + return false; + } + + @Override + void setLatestVersion(String typeUrl, String version) { + latestVersion = version; + } + + @Override + String latestVersion(String typeUrl) { + return latestVersion; + } + + @Override + void setResponse(String typeUrl, String nonce, LatestDeltaDiscoveryResponse response) { + responses.put(nonce, response); + } + + @Override + LatestDeltaDiscoveryResponse clearResponse(String typeUrl, String nonce) { + return responses.remove(nonce); + } + + @Override + int responseCount(String typeUrl) { + return responses.size(); + } + + @Override + Map resourceVersions(String typeUrl) { + return trackedResources; + } + + @Override + Set pendingResources(String typeUrl) { + return pendingResources; + } + + @Override + boolean isWildcard(String typeUrl) { + return isWildcard; + } + + @Override + void updateTrackedResources(String typeUrl, + Map resourcesVersions, + List removedResources) { + + resourcesVersions.forEach((k, v) -> { + trackedResources.put(k, v); + pendingResources.remove(k); + }); + removedResources.forEach(trackedResources::remove); + } + + @Override + void updateSubscriptions(String typeUrl, + List resourceNamesSubscribe, + List resourceNamesUnsubscribe) { + + // unsubscribe first + resourceNamesUnsubscribe.forEach(s -> { + trackedResources.remove(s); + pendingResources.remove(s); + }); + pendingResources.addAll(resourceNamesSubscribe); + } + + @Override + void computeWatch(String typeUrl, Supplier watchCreator) { + cancel(); + watch = watchCreator.get(); + } +} diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/XdsDiscoveryRequestStreamObserver.java b/server/src/main/java/io/envoyproxy/controlplane/server/XdsDiscoveryRequestStreamObserver.java index 26c3929d0..403198975 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/XdsDiscoveryRequestStreamObserver.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/XdsDiscoveryRequestStreamObserver.java @@ -18,10 +18,10 @@ public class XdsDiscoveryRequestStreamObserver extends DiscoveryRequestStr private Set ackedResources; XdsDiscoveryRequestStreamObserver(String defaultTypeUrl, - StreamObserver responseObserver, - long streamId, - Executor executor, - DiscoveryServer discoveryServer) { + StreamObserver responseObserver, + long streamId, + Executor executor, + DiscoveryServer discoveryServer) { super(defaultTypeUrl, responseObserver, streamId, executor, discoveryServer); this.ackedResources = Collections.emptySet(); } diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/callback/SnapshotCollectingCallback.java b/server/src/main/java/io/envoyproxy/controlplane/server/callback/SnapshotCollectingCallback.java index 8e2e3f55f..ce585bea2 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/callback/SnapshotCollectingCallback.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/callback/SnapshotCollectingCallback.java @@ -6,6 +6,7 @@ import io.envoyproxy.controlplane.cache.SnapshotCache; import io.envoyproxy.controlplane.cache.v2.Snapshot; import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks; +import io.envoyproxy.envoy.api.v2.DeltaDiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import java.time.Clock; import java.time.Instant; @@ -42,11 +43,6 @@ */ public class SnapshotCollectingCallback implements DiscoveryServerCallbacks { - private static class SnapshotState { - int streamCount; - Instant lastSeen; - } - private final SnapshotCache snapshotCache; private final NodeGroup nodeGroup; private final Clock clock; @@ -58,16 +54,16 @@ private static class SnapshotState { /** * Creates the callback. * - * @param snapshotCache the cache to evict snapshots from - * @param nodeGroup the node group used to map requests to groups - * @param clock system clock - * @param collectorCallbacks the callbacks to invoke when snapshot is collected - * @param collectAfterMillis how long a snapshot must be referenced for before being collected + * @param snapshotCache the cache to evict snapshots from + * @param nodeGroup the node group used to map requests to groups + * @param clock system clock + * @param collectorCallbacks the callbacks to invoke when snapshot is collected + * @param collectAfterMillis how long a snapshot must be referenced for before being collected * @param collectionIntervalMillis how often the collection background action should run */ public SnapshotCollectingCallback(SnapshotCache snapshotCache, - NodeGroup nodeGroup, Clock clock, Set> collectorCallbacks, - long collectAfterMillis, long collectionIntervalMillis) { + NodeGroup nodeGroup, Clock clock, Set> collectorCallbacks, + long collectAfterMillis, long collectionIntervalMillis) { this.snapshotCache = snapshotCache; this.nodeGroup = nodeGroup; this.clock = clock; @@ -87,7 +83,20 @@ public synchronized void onV2StreamRequest(long streamId, DiscoveryRequest reque @Override public synchronized void onV3StreamRequest(long streamId, - io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest request) { + io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest request) { + T groupIdentifier = nodeGroup.hash(request.getNode()); + updateState(streamId, groupIdentifier); + } + + @Override + public void onV2StreamDeltaRequest(long streamId, DeltaDiscoveryRequest request) { + T groupIdentifier = nodeGroup.hash(request.getNode()); + updateState(streamId, groupIdentifier); + } + + @Override + public void onV3StreamDeltaRequest(long streamId, + io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest request) { T groupIdentifier = nodeGroup.hash(request.getNode()); updateState(streamId, groupIdentifier); } @@ -102,11 +111,13 @@ private void updateState(long streamId, T groupIdentifier) { } } - @Override public void onStreamClose(long streamId, String typeUrl) { + @Override + public void onStreamClose(long streamId, String typeUrl) { onStreamCloseHelper(streamId); } - @Override public void onStreamCloseWithError(long streamId, String typeUrl, Throwable error) { + @Override + public void onStreamCloseWithError(long streamId, String typeUrl, Throwable error) { onStreamCloseHelper(streamId); } @@ -145,4 +156,9 @@ private synchronized void onStreamCloseHelper(long streamId) { snapshotState.streamCount--; snapshotState.lastSeen = clock.instant(); } + + private static class SnapshotState { + int streamCount; + Instant lastSeen; + } } diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/TestMain.java b/server/src/test/java/io/envoyproxy/controlplane/server/TestMain.java index 4086945b5..ac0f90dfd 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/TestMain.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/TestMain.java @@ -3,6 +3,7 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.Duration; import io.envoyproxy.controlplane.cache.NodeGroup; +import io.envoyproxy.controlplane.cache.SnapshotResource; import io.envoyproxy.controlplane.cache.v2.SimpleCache; import io.envoyproxy.controlplane.cache.v2.Snapshot; import io.envoyproxy.envoy.api.v2.Cluster; @@ -27,11 +28,13 @@ public class TestMain { */ public static void main(String[] arg) throws IOException, InterruptedException { SimpleCache cache = new SimpleCache<>(new NodeGroup() { - @Override public String hash(Node node) { + @Override + public String hash(Node node) { return GROUP; } - @Override public String hash(io.envoyproxy.envoy.config.core.v3.Node node) { + @Override + public String hash(io.envoyproxy.envoy.config.core.v3.Node node) { return GROUP; } }); @@ -40,13 +43,15 @@ public static void main(String[] arg) throws IOException, InterruptedException { GROUP, Snapshot.create( ImmutableList.of( - Cluster.newBuilder() - .setName("cluster0") - .setConnectTimeout(Duration.newBuilder().setSeconds(5)) - .setType(DiscoveryType.STATIC) - .addHosts(Address.newBuilder() - .setSocketAddress(SocketAddress.newBuilder().setAddress("127.0.0.1").setPortValue(1234))) - .build()), + SnapshotResource.create( + Cluster.newBuilder() + .setName("cluster0") + .setConnectTimeout(Duration.newBuilder().setSeconds(5)) + .setType(DiscoveryType.STATIC) + .addHosts(Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder().setAddress("127.0.0.1").setPortValue(1234))) + .build(), + "1")), ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), @@ -82,13 +87,15 @@ public static void main(String[] arg) throws IOException, InterruptedException { GROUP, Snapshot.create( ImmutableList.of( - Cluster.newBuilder() + SnapshotResource.create( + Cluster.newBuilder() .setName("cluster1") .setConnectTimeout(Duration.newBuilder().setSeconds(5)) .setType(DiscoveryType.STATIC) .addHosts(Address.newBuilder() .setSocketAddress(SocketAddress.newBuilder().setAddress("127.0.0.1").setPortValue(1235))) - .build()), + .build(), + "1")), ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V2DiscoveryServerAdsWarmingClusterIT.java b/server/src/test/java/io/envoyproxy/controlplane/server/V2DiscoveryServerAdsWarmingClusterIT.java index f9480d9e0..a8bbd44a5 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/V2DiscoveryServerAdsWarmingClusterIT.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V2DiscoveryServerAdsWarmingClusterIT.java @@ -8,11 +8,13 @@ import com.google.protobuf.util.Durations; import io.envoyproxy.controlplane.cache.NodeGroup; +import io.envoyproxy.controlplane.cache.SnapshotResource; import io.envoyproxy.controlplane.cache.TestResources; import io.envoyproxy.controlplane.cache.v2.SimpleCache; import io.envoyproxy.controlplane.cache.v2.Snapshot; import io.envoyproxy.envoy.api.v2.Cluster; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; +import io.envoyproxy.envoy.api.v2.DeltaDiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryResponse; import io.envoyproxy.envoy.api.v2.Listener; @@ -71,6 +73,17 @@ public void onV3StreamRequest(long streamId, throw new IllegalStateException("unexpected v3 request for v2 test"); } + @Override + public void onV2StreamDeltaRequest(long streamId, DeltaDiscoveryRequest request) { + throw new IllegalStateException("unexpected delta request for test"); + } + + @Override + public void onV3StreamDeltaRequest(long streamId, + io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest request) { + throw new IllegalStateException("unexpected v3 request for v2 test"); + } + @Override public void onStreamResponse(long streamId, DiscoveryRequest request, DiscoveryResponse response) { // Here we update a Snapshot with working cluster, but we change only CDS version, not EDS version. @@ -181,13 +194,13 @@ private static Snapshot createSnapshotWithNotWorkingCluster(boolean ads, // here we have new version of resources other than CDS. return Snapshot.create( - ImmutableList.of(cluster), + ImmutableList.of(SnapshotResource.create(cluster, "1")), "1", - ImmutableList.of(endpoint), + ImmutableList.of(SnapshotResource.create(endpoint, "2")), "2", - ImmutableList.of(listener), + ImmutableList.of(SnapshotResource.create(listener, "2")), "2", - ImmutableList.of(route), + ImmutableList.of(SnapshotResource.create(route, "2")), "2", ImmutableList.of(), "2"); diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V2DiscoveryServerTest.java b/server/src/test/java/io/envoyproxy/controlplane/server/V2DiscoveryServerTest.java index abb80b8d8..7899506a2 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/V2DiscoveryServerTest.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V2DiscoveryServerTest.java @@ -11,6 +11,9 @@ import com.google.common.collect.Table; import com.google.protobuf.Message; import io.envoyproxy.controlplane.cache.ConfigWatcher; +import io.envoyproxy.controlplane.cache.DeltaResponse; +import io.envoyproxy.controlplane.cache.DeltaWatch; +import io.envoyproxy.controlplane.cache.DeltaXdsRequest; import io.envoyproxy.controlplane.cache.Resources; import io.envoyproxy.controlplane.cache.Response; import io.envoyproxy.controlplane.cache.TestResources; @@ -22,6 +25,7 @@ import io.envoyproxy.envoy.api.v2.ClusterDiscoveryServiceGrpc; import io.envoyproxy.envoy.api.v2.ClusterDiscoveryServiceGrpc.ClusterDiscoveryServiceStub; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; +import io.envoyproxy.envoy.api.v2.DeltaDiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryResponse; import io.envoyproxy.envoy.api.v2.EndpointDiscoveryServiceGrpc; @@ -1018,6 +1022,16 @@ public Watch createWatch( return watch; } + + @Override + public DeltaWatch createDeltaWatch(DeltaXdsRequest request, String requesterVersion, + Map resourceVersions, + Set pendingResources, + boolean isWildcard, + Consumer responseConsumer, + boolean hasClusterChanged) { + throw new IllegalStateException("not implemented"); + } } private static class MockDiscoveryServerCallbacks @@ -1066,6 +1080,17 @@ public void onV3StreamRequest(long streamId, throw new IllegalStateException("Unexpected v3 request in v2 test"); } + @Override + public void onV2StreamDeltaRequest(long streamId, DeltaDiscoveryRequest request) { + throw new IllegalStateException("Unexpected delta request"); + } + + @Override + public void onV3StreamDeltaRequest(long streamId, + io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest request) { + throw new IllegalStateException("Unexpected v3 request in v2 test"); + } + @Override public void onStreamResponse(long streamId, DiscoveryRequest request, DiscoveryResponse response) { streamResponseCount.getAndIncrement(); diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V2OnlyDiscoveryServerCallbacks.java b/server/src/test/java/io/envoyproxy/controlplane/server/V2OnlyDiscoveryServerCallbacks.java index d1376a812..04bb46864 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/V2OnlyDiscoveryServerCallbacks.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V2OnlyDiscoveryServerCallbacks.java @@ -1,5 +1,6 @@ package io.envoyproxy.controlplane.server; +import io.envoyproxy.envoy.api.v2.DeltaDiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryResponse; import java.util.concurrent.CountDownLatch; @@ -40,6 +41,17 @@ public void onV3StreamRequest(long streamId, throw new IllegalStateException("unexpected v3 request in v2 test"); } + @Override + public void onV2StreamDeltaRequest(long streamId, DeltaDiscoveryRequest request) { + throw new IllegalStateException("unexpected delta request"); + } + + @Override + public void onV3StreamDeltaRequest(long streamId, + io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest request) { + throw new IllegalStateException("unexpected v3 request in v2 test"); + } + @Override public void onStreamResponse(long streamId, DiscoveryRequest request, DiscoveryResponse response) { onStreamResponseLatch.countDown(); diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V2TestSnapshots.java b/server/src/test/java/io/envoyproxy/controlplane/server/V2TestSnapshots.java index 16d05c278..10d6a765a 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/V2TestSnapshots.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V2TestSnapshots.java @@ -1,5 +1,6 @@ package io.envoyproxy.controlplane.server; +import io.envoyproxy.controlplane.cache.SnapshotResource; import io.envoyproxy.controlplane.cache.TestResources; import io.envoyproxy.controlplane.cache.v2.Snapshot; import io.envoyproxy.envoy.api.v2.Cluster; @@ -28,10 +29,10 @@ static Snapshot createSnapshot( RouteConfiguration route = TestResources.createRoute(routeName, clusterName); return Snapshot.create( - ImmutableList.of(cluster), - ImmutableList.of(endpoint), - ImmutableList.of(listener), - ImmutableList.of(route), + ImmutableList.of(SnapshotResource.create(cluster, version)), + ImmutableList.of(SnapshotResource.create(endpoint, version)), + ImmutableList.of(SnapshotResource.create(listener, version)), + ImmutableList.of(SnapshotResource.create(route, version)), ImmutableList.of(), version); } @@ -79,10 +80,10 @@ private static Snapshot createSnapshotNoEds( RouteConfiguration route = TestResources.createRoute(routeName, clusterName); return Snapshot.create( - ImmutableList.of(cluster), + ImmutableList.of(SnapshotResource.create(cluster, version)), ImmutableList.of(), - ImmutableList.of(listener), - ImmutableList.of(route), + ImmutableList.of(SnapshotResource.create(listener, version)), + ImmutableList.of(SnapshotResource.create(route, version)), ImmutableList.of(), version); } diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsWarmingClusterIT.java b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsWarmingClusterIT.java index 22fd770bc..48299e86e 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsWarmingClusterIT.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsWarmingClusterIT.java @@ -9,9 +9,11 @@ import com.google.protobuf.util.Durations; import io.envoyproxy.controlplane.cache.NodeGroup; +import io.envoyproxy.controlplane.cache.SnapshotResource; import io.envoyproxy.controlplane.cache.TestResources; import io.envoyproxy.controlplane.cache.v3.SimpleCache; import io.envoyproxy.controlplane.cache.v3.Snapshot; +import io.envoyproxy.envoy.api.v2.DeltaDiscoveryRequest; import io.envoyproxy.envoy.api.v2.core.Node; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; @@ -72,6 +74,17 @@ public void onV3StreamRequest(long streamId, DiscoveryRequest request) { onStreamRequestLatch.countDown(); } + @Override + public void onV2StreamDeltaRequest(long streamId, DeltaDiscoveryRequest request) { + throw new IllegalStateException("Unexpected v2 request in v3 test"); + } + + @Override + public void onV3StreamDeltaRequest(long streamId, + io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest request) { + throw new IllegalStateException("Unexpected delta request"); + } + @Override public void onStreamResponse(long streamId, io.envoyproxy.envoy.api.v2.DiscoveryRequest request, io.envoyproxy.envoy.api.v2.DiscoveryResponse response) { @@ -188,13 +201,13 @@ private static Snapshot createSnapshotWithNotWorkingCluster(boolean ads, // here we have new version of resources other than CDS. return Snapshot.create( - ImmutableList.of(cluster), + ImmutableList.of(SnapshotResource.create(cluster, "1")), "1", - ImmutableList.of(endpoint), + ImmutableList.of(SnapshotResource.create(endpoint, "2")), "2", - ImmutableList.of(listener), + ImmutableList.of(SnapshotResource.create(listener, "2")), "2", - ImmutableList.of(route), + ImmutableList.of(SnapshotResource.create(route, "2")), "2", ImmutableList.of(), "2"); diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerTest.java b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerTest.java index 60ae43e41..70c88da01 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerTest.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerTest.java @@ -12,6 +12,9 @@ import com.google.common.collect.Table; import com.google.protobuf.Message; import io.envoyproxy.controlplane.cache.ConfigWatcher; +import io.envoyproxy.controlplane.cache.DeltaResponse; +import io.envoyproxy.controlplane.cache.DeltaWatch; +import io.envoyproxy.controlplane.cache.DeltaXdsRequest; import io.envoyproxy.controlplane.cache.Resources; import io.envoyproxy.controlplane.cache.Response; import io.envoyproxy.controlplane.cache.TestResources; @@ -19,6 +22,7 @@ import io.envoyproxy.controlplane.cache.WatchCancelledException; import io.envoyproxy.controlplane.cache.XdsRequest; import io.envoyproxy.controlplane.server.exception.RequestException; +import io.envoyproxy.envoy.api.v2.DeltaDiscoveryRequest; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; @@ -1022,6 +1026,16 @@ public Watch createWatch( return watch; } + + @Override + public DeltaWatch createDeltaWatch(DeltaXdsRequest request, String requesterVersion, + Map resourceVersions, + Set pendingResources, + boolean isWildcard, + Consumer responseConsumer, + boolean hasClusterChanged) { + throw new IllegalStateException("not implemented"); + } } private static class MockDiscoveryServerCallbacks @@ -1070,6 +1084,17 @@ public void onV3StreamRequest(long streamId, DiscoveryRequest request) { } } + @Override + public void onV2StreamDeltaRequest(long streamId, DeltaDiscoveryRequest request) { + throw new IllegalStateException("Unexpected v2 request in v3 test"); + } + + @Override + public void onV3StreamDeltaRequest(long streamId, + io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest request) { + throw new IllegalStateException("Unexpected delta request"); + } + @Override public void onStreamResponse(long streamId, io.envoyproxy.envoy.api.v2.DiscoveryRequest request, io.envoyproxy.envoy.api.v2.DiscoveryResponse response) { diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V3OnlyDiscoveryServerCallbacks.java b/server/src/test/java/io/envoyproxy/controlplane/server/V3OnlyDiscoveryServerCallbacks.java index e1297a5b7..6d9334e49 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/V3OnlyDiscoveryServerCallbacks.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V3OnlyDiscoveryServerCallbacks.java @@ -1,5 +1,6 @@ package io.envoyproxy.controlplane.server; +import io.envoyproxy.envoy.api.v2.DeltaDiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import java.util.concurrent.CountDownLatch; @@ -40,6 +41,17 @@ public void onV3StreamRequest(long streamId, DiscoveryRequest request) { onStreamRequestLatch.countDown(); } + @Override + public void onV2StreamDeltaRequest(long streamId, DeltaDiscoveryRequest request) { + throw new IllegalStateException("Unexpected v2 request in v3 test"); + } + + @Override + public void onV3StreamDeltaRequest(long streamId, + io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest request) { + throw new IllegalStateException("Unexpected delta request"); + } + @Override public void onStreamResponse(long streamId, io.envoyproxy.envoy.api.v2.DiscoveryRequest request, diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V3TestSnapshots.java b/server/src/test/java/io/envoyproxy/controlplane/server/V3TestSnapshots.java index c6c27437c..8bdacc604 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/V3TestSnapshots.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V3TestSnapshots.java @@ -3,6 +3,7 @@ import static io.envoyproxy.envoy.config.core.v3.ApiVersion.V2; import static io.envoyproxy.envoy.config.core.v3.ApiVersion.V3; +import io.envoyproxy.controlplane.cache.SnapshotResource; import io.envoyproxy.controlplane.cache.TestResources; import io.envoyproxy.controlplane.cache.v3.Snapshot; import io.envoyproxy.envoy.config.cluster.v3.Cluster; @@ -32,10 +33,10 @@ static Snapshot createSnapshot( RouteConfiguration route = TestResources.createRouteV3(routeName, clusterName); return Snapshot.create( - ImmutableList.of(cluster), - ImmutableList.of(endpoint), - ImmutableList.of(listener), - ImmutableList.of(route), + ImmutableList.of(SnapshotResource.create(cluster, version)), + ImmutableList.of(SnapshotResource.create(endpoint, version)), + ImmutableList.of(SnapshotResource.create(listener, version)), + ImmutableList.of(SnapshotResource.create(route, version)), ImmutableList.of(), version); } @@ -84,10 +85,10 @@ private static Snapshot createSnapshotNoEds( RouteConfiguration route = TestResources.createRouteV3(routeName, clusterName); return Snapshot.create( - ImmutableList.of(cluster), + ImmutableList.of(SnapshotResource.create(cluster, version)), ImmutableList.of(), - ImmutableList.of(listener), - ImmutableList.of(route), + ImmutableList.of(SnapshotResource.create(listener, version)), + ImmutableList.of(SnapshotResource.create(route, version)), ImmutableList.of(), version); } From c75d1143ce02bea0a494c8f227b099f86da66939 Mon Sep 17 00:00:00 2001 From: Sebastian Schepens Date: Thu, 22 Oct 2020 18:43:57 -0300 Subject: [PATCH 2/3] fixes Signed-off-by: Sebastian Schepens --- .../controlplane/cache/Resources.java | 28 +++++----- .../controlplane/cache/SimpleCache.java | 54 +++++++++++++++++-- ...dsDeltaDiscoveryRequestStreamObserver.java | 7 ++- ...dsDeltaDiscoveryRequestStreamObserver.java | 7 ++- 4 files changed, 72 insertions(+), 24 deletions(-) diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/Resources.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/Resources.java index a00806b28..ca375d336 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/Resources.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/Resources.java @@ -103,19 +103,21 @@ public static class V3 { ROUTE, SECRET); - public static final Map V3_TYPE_URLS_TO_V2 = ImmutableMap.of( - Resources.V3.CLUSTER_TYPE_URL, Resources.V2.CLUSTER_TYPE_URL, - Resources.V3.ENDPOINT_TYPE_URL, Resources.V2.ENDPOINT_TYPE_URL, - Resources.V3.LISTENER_TYPE_URL, Resources.V2.LISTENER_TYPE_URL, - Resources.V3.ROUTE_TYPE_URL, Resources.V2.ROUTE_TYPE_URL, - Resources.V3.SECRET_TYPE_URL, Resources.V2.SECRET_TYPE_URL); - - public static final Map V2_TYPE_URLS_TO_V3 = ImmutableMap.of( - Resources.V2.CLUSTER_TYPE_URL, Resources.V3.CLUSTER_TYPE_URL, - Resources.V2.ENDPOINT_TYPE_URL, Resources.V3.ENDPOINT_TYPE_URL, - Resources.V2.LISTENER_TYPE_URL, Resources.V3.LISTENER_TYPE_URL, - Resources.V2.ROUTE_TYPE_URL, Resources.V3.ROUTE_TYPE_URL, - Resources.V2.SECRET_TYPE_URL, Resources.V3.SECRET_TYPE_URL); + public static final Map V3_TYPE_URLS_TO_V2 = ImmutableMap.builder() + .put(Resources.V3.CLUSTER_TYPE_URL, Resources.V2.CLUSTER_TYPE_URL) + .put(Resources.V3.ENDPOINT_TYPE_URL, Resources.V2.ENDPOINT_TYPE_URL) + .put(Resources.V3.LISTENER_TYPE_URL, Resources.V2.LISTENER_TYPE_URL) + .put(Resources.V3.ROUTE_TYPE_URL, Resources.V2.ROUTE_TYPE_URL) + .put(Resources.V3.SECRET_TYPE_URL, Resources.V2.SECRET_TYPE_URL) + .build(); + + public static final Map V2_TYPE_URLS_TO_V3 = ImmutableMap.builder() + .put(Resources.V2.CLUSTER_TYPE_URL, Resources.V3.CLUSTER_TYPE_URL) + .put(Resources.V2.ENDPOINT_TYPE_URL, Resources.V3.ENDPOINT_TYPE_URL) + .put(Resources.V2.LISTENER_TYPE_URL, Resources.V3.LISTENER_TYPE_URL) + .put(Resources.V2.ROUTE_TYPE_URL, Resources.V3.ROUTE_TYPE_URL) + .put(Resources.V2.SECRET_TYPE_URL, Resources.V3.SECRET_TYPE_URL) + .build(); public static final Map TYPE_URLS_TO_RESOURCE_TYPE = new ImmutableMap.Builder() diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java index def32fd91..64d7fd7c3 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java @@ -364,10 +364,11 @@ public Collection groups() { public synchronized void setSnapshot(T group, U snapshot) { // we take a writeLock to prevent watches from being created while we update the snapshot ConcurrentMap> status; + U previousSnapshot; writeLock.lock(); try { // Update the existing snapshot entry. - snapshots.put(group, snapshot); + previousSnapshot = snapshots.put(group, snapshot); status = statuses.get(group); } finally { writeLock.unlock(); @@ -379,7 +380,7 @@ public synchronized void setSnapshot(T group, U snapshot) { // Responses should be in specific order and typeUrls has a list of resources in the right // order. - respondWithSpecificOrder(group, snapshot, status); + respondWithSpecificOrder(group, previousSnapshot, snapshot, status); } /** @@ -403,7 +404,7 @@ public StatusInfo statusInfo(T group) { @VisibleForTesting protected void respondWithSpecificOrder(T group, - U snapshot, + U previousSnapshot, U snapshot, ConcurrentMap> statusMap) { for (ResourceType resourceType : RESOURCE_TYPES_IN_ORDER) { CacheStatusInfo status = statusMap.get(resourceType); @@ -435,6 +436,53 @@ protected void respondWithSpecificOrder(T group, // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond. return false; }); + + Map> previousResources = previousSnapshot == null + ? Collections.emptyMap() + : previousSnapshot.resources(resourceType); + Map> snapshotResources = snapshot.resources(resourceType); + + Map> snapshotChangedResources = snapshotResources.entrySet() + .stream() + .filter(entry -> { + SnapshotResource snapshotResource = previousResources.get(entry.getKey()); + return snapshotResource == null || !snapshotResource.version().equals(entry.getValue().version()); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Set snapshotRemovedResources = previousResources.keySet() + .stream() + .filter(s -> !snapshotResources.containsKey(s)) + .collect(Collectors.toSet()); + + status.deltaWatchesRemoveIf((id, watch) -> { + String version = snapshot.version(watch.request().getResourceType(), Collections.emptyList()); + + if (!watch.version().equals(version)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("responding to open watch {}[{}] with new version {}", + id, + String.join(", ", watch.trackedResources().keySet()), + version); + } + + List removedResources = snapshotRemovedResources.stream() + .filter(s -> watch.trackedResources().get(s) != null) + .collect(Collectors.toList()); + + ResponseState responseState = respondDeltaTracked(watch, + snapshotChangedResources, + removedResources, + version, + group); + // Discard the watch if it was responded or cancelled. + // A new watch will be created for future snapshots once envoy ACKs the response. + return ResponseState.RESPONDED.equals(responseState) || ResponseState.CANCELLED.equals(responseState); + } + + // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond. + return false; + }); } } diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/AdsDeltaDiscoveryRequestStreamObserver.java b/server/src/main/java/io/envoyproxy/controlplane/server/AdsDeltaDiscoveryRequestStreamObserver.java index f5b3cbaaf..8f2b77486 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/AdsDeltaDiscoveryRequestStreamObserver.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/AdsDeltaDiscoveryRequestStreamObserver.java @@ -115,10 +115,9 @@ Set pendingResources(String typeUrl) { @Override boolean isWildcard(String typeUrl) { - return typeUrl.equals(Resources.V2.CLUSTER_TYPE_URL) - || typeUrl.equals(Resources.V3.CLUSTER_TYPE_URL) - || typeUrl.equals(Resources.V2.LISTENER_TYPE_URL) - || typeUrl.equals(Resources.V3.LISTENER_TYPE_URL); + Resources.ResourceType resourceType = Resources.TYPE_URLS_TO_RESOURCE_TYPE.get(typeUrl); + return Resources.ResourceType.CLUSTER.equals(resourceType) + || Resources.ResourceType.LISTENER.equals(resourceType); } @Override diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/XdsDeltaDiscoveryRequestStreamObserver.java b/server/src/main/java/io/envoyproxy/controlplane/server/XdsDeltaDiscoveryRequestStreamObserver.java index da395e515..01a424778 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/XdsDeltaDiscoveryRequestStreamObserver.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/XdsDeltaDiscoveryRequestStreamObserver.java @@ -34,10 +34,9 @@ public class XdsDeltaDiscoveryRequestStreamObserver extends DeltaDiscov super(defaultTypeUrl, responseObserver, streamId, executor, discoveryServer); this.trackedResources = new HashMap<>(); this.pendingResources = new HashSet<>(); - this.isWildcard = defaultTypeUrl.equals(Resources.V2.CLUSTER_TYPE_URL) - || defaultTypeUrl.equals(Resources.V3.CLUSTER_TYPE_URL) - || defaultTypeUrl.equals(Resources.V2.LISTENER_TYPE_URL) - || defaultTypeUrl.equals(Resources.V3.LISTENER_TYPE_URL); + Resources.ResourceType resourceType = Resources.TYPE_URLS_TO_RESOURCE_TYPE.get(defaultTypeUrl); + this.isWildcard = Resources.ResourceType.CLUSTER.equals(resourceType) + || Resources.ResourceType.LISTENER.equals(resourceType); this.responses = new ConcurrentHashMap<>(); } From f8bf24d629bb05778e0fb1dfcc7b0e8d65aaeaf5 Mon Sep 17 00:00:00 2001 From: Sebastian Schepens Date: Fri, 23 Oct 2020 03:55:26 -0300 Subject: [PATCH 3/3] method naming Signed-off-by: Sebastian Schepens --- .../controlplane/cache/SimpleCache.java | 54 +++++++++++++------ 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java index 64d7fd7c3..dafc3c1e9 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java @@ -280,9 +280,14 @@ public DeltaWatch createDeltaWatch( return watch; } } else if (hasClusterChanged && requestResourceType.equals(ResourceType.ENDPOINT)) { - ResponseState responseState = respondDeltaTracked( + Map> snapshotResources = snapshot.resources(request.getResourceType()); + List removedResources = findRemovedResources(watch, + snapshotResources); + Map> changedResources = findChangedResources(watch, snapshotResources); + ResponseState responseState = respondDelta( watch, - snapshot.resources(request.getResourceType()), + changedResources, + removedResources, version, group); if (responseState.equals(ResponseState.RESPONDED) || responseState.equals(ResponseState.CANCELLED)) { @@ -304,8 +309,13 @@ public DeltaWatch createDeltaWatch( } // Otherwise, version is different, the watch may be responded immediately - ResponseState responseState = respondDeltaTracked(watch, - snapshot.resources(request.getResourceType()), + Map> snapshotResources = snapshot.resources(request.getResourceType()); + List removedResources = findRemovedResources(watch, + snapshotResources); + Map> changedResources = findChangedResources(watch, snapshotResources); + ResponseState responseState = respondDelta(watch, + changedResources, + removedResources, version, group); if (responseState.equals(ResponseState.RESPONDED) || responseState.equals(ResponseState.CANCELLED)) { @@ -470,8 +480,10 @@ protected void respondWithSpecificOrder(T group, .filter(s -> watch.trackedResources().get(s) != null) .collect(Collectors.toList()); - ResponseState responseState = respondDeltaTracked(watch, - snapshotChangedResources, + Map> changedResources = findChangedResources(watch, snapshotChangedResources); + + ResponseState responseState = respondDelta(watch, + changedResources, removedResources, version, group); @@ -551,22 +563,30 @@ private boolean respond(Watch watch, U snapshot, T group) { return false; } - /** - * Responds a delta watch using resource version comparison. - * - * @return if the watch has been responded. - */ - private ResponseState respondDeltaTracked(DeltaWatch watch, - Map> snapshotResources, - String version, - T group) { + private List findRemovedResources(DeltaWatch watch, Map> snapshotResources) { // remove resources for which client has a tracked version but do not exist in snapshot - List removedResources = watch.trackedResources().keySet() + return watch.trackedResources().keySet() .stream() .filter(s -> !snapshotResources.containsKey(s)) .collect(Collectors.toList()); + } - return respondDeltaTracked(watch, snapshotResources, removedResources, version, group); + private Map> findChangedResources(DeltaWatch watch, + Map> snapshotResources) { + return snapshotResources.entrySet() + .stream() + .filter(entry -> { + if (watch.pendingResources().contains(entry.getKey())) { + return true; + } + String resourceVersion = watch.trackedResources().get(entry.getKey()); + if (resourceVersion == null) { + // resource is not tracked, should respond it only if watch is wildcard + return watch.isWildcard(); + } + return !entry.getValue().version().equals(resourceVersion); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } private ResponseState respondDeltaTracked(DeltaWatch watch,