diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/AbstractWatch.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/AbstractWatch.java new file mode 100644 index 000000000..9a90ddcf9 --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/AbstractWatch.java @@ -0,0 +1,73 @@ +package io.envoyproxy.controlplane.cache; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Consumer; + +public abstract class AbstractWatch { + + private static final AtomicIntegerFieldUpdater isCancelledUpdater = + AtomicIntegerFieldUpdater.newUpdater(AbstractWatch.class, "isCancelled"); + private final V request; + private final Consumer responseConsumer; + private volatile int isCancelled = 0; + private Runnable stop; + + /** + * Construct a watch. + * + * @param request the original request for the watch + * @param responseConsumer handler for outgoing response messages + */ + public AbstractWatch(V request, Consumer responseConsumer) { + this.request = request; + this.responseConsumer = responseConsumer; + } + + /** + * Cancel the watch. A watch must be cancelled in order to complete its resource stream and free resources. Cancel + * may be called multiple times, with each subsequent call being a no-op. + */ + public void cancel() { + if (isCancelledUpdater.compareAndSet(this, 0, 1)) { + if (stop != null) { + stop.run(); + } + } + } + + /** + * Returns boolean indicating whether or not the watch has been cancelled. + */ + public boolean isCancelled() { + return isCancelledUpdater.get(this) == 1; + } + + /** + * Returns the original request for the watch. + */ + public V request() { + return request; + } + + /** + * Sends the given response to the watch's response handler. + * + * @param response the response to be handled + * @throws WatchCancelledException if the watch has already been cancelled + */ + public void respond(T response) throws WatchCancelledException { + if (isCancelled()) { + throw new WatchCancelledException(); + } + + responseConsumer.accept(response); + } + + /** + * Sets the callback method to be executed when the watch is cancelled. Even if cancel is executed multiple times, it + * ensures that this stop callback is only executed once. + */ + public void setStop(Runnable stop) { + this.stop = stop; + } +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/Cache.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/Cache.java index 34529c1db..51237aae0 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/Cache.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/Cache.java @@ -20,5 +20,5 @@ public interface Cache extends ConfigWatcher { * * @param group the node group whose status is being fetched */ - StatusInfo statusInfo(T group); + StatusInfo statusInfo(T group); } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfo.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfo.java index 30b7c454b..cf4d620b7 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfo.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfo.java @@ -1,10 +1,5 @@ package io.envoyproxy.controlplane.cache; -import com.google.common.collect.ImmutableSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.BiFunction; import javax.annotation.concurrent.ThreadSafe; /** @@ -12,83 +7,8 @@ * implementations. */ @ThreadSafe -public class CacheStatusInfo implements StatusInfo { - - private final T nodeGroup; - - private final ConcurrentMap watches = new ConcurrentHashMap<>(); - private volatile long lastWatchRequestTime; - +public class CacheStatusInfo extends MutableStatusInfo { public CacheStatusInfo(T nodeGroup) { - this.nodeGroup = nodeGroup; - } - - /** - * {@inheritDoc} - */ - @Override - public long lastWatchRequestTime() { - return lastWatchRequestTime; - } - - /** - * {@inheritDoc} - */ - @Override - public T nodeGroup() { - return nodeGroup; - } - - /** - * {@inheritDoc} - */ - @Override - public int numWatches() { - return watches.size(); - } - - /** - * Removes the given watch from the tracked collection of watches. - * - * @param watchId the ID for the watch that should be removed - */ - public void removeWatch(long watchId) { - watches.remove(watchId); - } - - /** - * Sets the timestamp of the last discovery watch request. - * - * @param lastWatchRequestTime the latest watch request timestamp - */ - public void setLastWatchRequestTime(long lastWatchRequestTime) { - this.lastWatchRequestTime = lastWatchRequestTime; - } - - /** - * Adds the given watch to the tracked collection of watches. - * - * @param watchId the ID for the watch that should be added - * @param watch the watch that should be added - */ - public void setWatch(long watchId, Watch watch) { - watches.put(watchId, watch); - } - - /** - * Returns the set of IDs for all watched currently being tracked. - */ - public Set watchIds() { - return ImmutableSet.copyOf(watches.keySet()); - } - - /** - * Iterate over all tracked watches and execute the given function. If it returns {@code true}, then the watch is - * removed from the tracked collection. If it returns {@code false}, then the watch is not removed. - * - * @param filter the function to execute on each watch - */ - public void watchesRemoveIf(BiFunction filter) { - watches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue())); + super(nodeGroup); } } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfoAggregator.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfoAggregator.java new file mode 100644 index 000000000..2331191fa --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/CacheStatusInfoAggregator.java @@ -0,0 +1,77 @@ +package io.envoyproxy.controlplane.cache; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class CacheStatusInfoAggregator { + private final ConcurrentMap>> statuses = + new ConcurrentHashMap<>(); + private final ConcurrentMap>> deltaStatuses = + new ConcurrentHashMap<>(); + + public Collection groups() { + return Stream.concat(statuses.keySet().stream(), deltaStatuses.keySet().stream()).collect(Collectors.toSet()); + } + + public void remove(T group) { + statuses.remove(group); + deltaStatuses.remove(group); + } + + /** + * Returns map of delta status infos for group identifier. + * + * @param group group identifier. + */ + public Map> getDeltaStatus(T group) { + return deltaStatuses.getOrDefault(group, new ConcurrentHashMap<>()); + } + + /** + * Returns map of status infos for group identifier. + * + * @param group group identifier. + */ + public Map> getStatus(T group) { + return statuses.getOrDefault(group, new ConcurrentHashMap<>()); + } + + /** + * Check if statuses for specific group have any watcher. + * + * @param group group identifier. + * @return true if statuses for specific group have any watcher. + */ + public boolean hasStatuses(T group) { + Map> status = getStatus(group); + Map> deltaStatus = getDeltaStatus(group); + return status.values().stream().mapToLong(CacheStatusInfo::numWatches).sum() + + deltaStatus.values().stream().mapToLong(DeltaCacheStatusInfo::numWatches).sum() > 0; + } + + /** + * Returns delta status info for group identifier and creates new one if it doesn't exist. + * + * @param group group identifier. + * @param resourceType resource type. + */ + public DeltaCacheStatusInfo getOrAddDeltaStatusInfo(T group, Resources.ResourceType resourceType) { + return deltaStatuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>()) + .computeIfAbsent(resourceType, s -> new DeltaCacheStatusInfo<>(group)); + } + + /** + * Returns status info for group identifier and creates new one if it doesn't exist. + * + * @param group group identifier. + * @param resourceType resource type. + */ + public CacheStatusInfo getOrAddStatusInfo(T group, Resources.ResourceType resourceType) { + return statuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>()) + .computeIfAbsent(resourceType, s -> new CacheStatusInfo<>(group)); + } +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/ConfigWatcher.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/ConfigWatcher.java index fc94fe8d3..d51b82eb6 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/ConfigWatcher.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/ConfigWatcher.java @@ -1,5 +1,6 @@ package io.envoyproxy.controlplane.cache; +import java.util.Map; import java.util.Set; import java.util.function.Consumer; import javax.annotation.concurrent.ThreadSafe; @@ -28,4 +29,25 @@ Watch createWatch( Set knownResourceNames, Consumer responseConsumer, boolean hasClusterChanged); + + /** + * Returns a new configuration resource {@link Watch} for the given discovery request. + * + * @param request the discovery request (node, names, etc.) to use to generate the watch + * @param requesterVersion the last version applied by the requester + * @param resourceVersions resources that are already known to the requester + * @param pendingResources resources that the caller is waiting for + * @param isWildcard indicates if the stream is in wildcard mode + * @param responseConsumer the response handler, used to process outgoing response messages + * @param hasClusterChanged indicates if EDS should be sent immediately, even if version has not been changed. + * Supported in ADS mode. + */ + DeltaWatch createDeltaWatch( + DeltaXdsRequest request, + String requesterVersion, + Map resourceVersions, + Set pendingResources, + boolean isWildcard, + Consumer responseConsumer, + boolean hasClusterChanged); } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaCacheStatusInfo.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaCacheStatusInfo.java new file mode 100644 index 000000000..355a7dfdd --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaCacheStatusInfo.java @@ -0,0 +1,8 @@ +package io.envoyproxy.controlplane.cache; + +public class DeltaCacheStatusInfo extends MutableStatusInfo { + + public DeltaCacheStatusInfo(T nodeGroup) { + super(nodeGroup); + } +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaResponse.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaResponse.java new file mode 100644 index 000000000..074dfcf36 --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaResponse.java @@ -0,0 +1,41 @@ +package io.envoyproxy.controlplane.cache; + +import com.google.auto.value.AutoValue; +import com.google.protobuf.Message; +import java.util.List; +import java.util.Map; + +/** + * {@code Response} is a data class that contains the response for an assumed configuration type. + */ +@AutoValue +public abstract class DeltaResponse { + + public static DeltaResponse create(DeltaXdsRequest request, + Map> resources, + List removedResources, + String version) { + return new AutoValue_DeltaResponse(request, resources, removedResources, version); + } + + /** + * Returns the original request associated with the response. + */ + public abstract DeltaXdsRequest request(); + + /** + * Returns the resources to include in the response. + */ + public abstract Map> resources(); + + /** + * Returns the removed resources to include in the response. + */ + public abstract List removedResources(); + + /** + * Returns the version of the resources as tracked by the cache for the given type. Envoy responds with this version + * as an acknowledgement. + */ + public abstract String version(); +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaWatch.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaWatch.java new file mode 100644 index 000000000..390e1fb04 --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaWatch.java @@ -0,0 +1,66 @@ +package io.envoyproxy.controlplane.cache; + +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +/** + * {@code Watch} is a dedicated stream of configuration resources produced by the configuration cache and consumed by + * the xDS server. + */ +public class DeltaWatch extends AbstractWatch { + private final Map resourceVersions; + private final Set pendingResources; + private final boolean isWildcard; + private final String version; + + /** + * Construct a watch. + * + * @param request the original request for the watch + * @param version indicates the stream current version + * @param isWildcard indicates if the stream is in wildcard mode + * @param responseConsumer handler for outgoing response messages + */ + public DeltaWatch(DeltaXdsRequest request, + Map resourceVersions, + Set pendingResources, + String version, + boolean isWildcard, + Consumer responseConsumer) { + super(request, responseConsumer); + this.resourceVersions = resourceVersions; + this.pendingResources = pendingResources; + this.version = version; + this.isWildcard = isWildcard; + } + + /** + * Returns the tracked resources for the watch. + */ + public Map trackedResources() { + return resourceVersions; + } + + /** + * Returns the pending resources for the watch. + */ + public Set pendingResources() { + return pendingResources; + } + + /** + * Returns the stream current version. + */ + public String version() { + return version; + } + + /** + * Indicates if the stream is in wildcard mode. + */ + public boolean isWildcard() { + return isWildcard; + } + +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaXdsRequest.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaXdsRequest.java new file mode 100644 index 000000000..f0686c8c9 --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/DeltaXdsRequest.java @@ -0,0 +1,84 @@ +package io.envoyproxy.controlplane.cache; + +import static io.envoyproxy.controlplane.cache.Resources.TYPE_URLS_TO_RESOURCE_TYPE; + +import com.google.auto.value.AutoValue; +import io.envoyproxy.controlplane.cache.Resources.ResourceType; +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest; + +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +/** + * XdsRequest wraps a v3 DiscoveryRequest of and provides common methods as a + * workaround to the proto messages not implementing a common interface that can be used to + * abstract away xDS version. XdsRequest is passed around the codebase through common code, + * however the callers that need the raw request from it have knowledge of whether it is a v2 or + * a v3 request. + */ +@AutoValue +public abstract class DeltaXdsRequest { + public static DeltaXdsRequest create(DeltaDiscoveryRequest discoveryRequest) { + return new AutoValue_DeltaXdsRequest(discoveryRequest); + } + + /** + * Returns he underlying v3 request, or null if this was a v2 request. Callers should have + * knowledge of whether the request was v3 or not. + * + * @return v3 DiscoveryRequest or null + */ + @Nullable + public abstract DeltaDiscoveryRequest v3Request(); + + /** + * Returns the type URL of the v2 or v3 request. + */ + public String getTypeUrl() { + return v3Request().getTypeUrl(); + } + + /** + * Returns the ResourceType of the underlying request. This is useful for accepting requests + * for both v3 resource types and having a key to normalize on the logical resource. + */ + public ResourceType getResourceType() { + return TYPE_URLS_TO_RESOURCE_TYPE.get(v3Request().getTypeUrl()); + } + + /** + * Returns the response nonse from the underlying DiscoveryRequest. + */ + public String getResponseNonce() { + return v3Request().getResponseNonce(); + } + + /** + * Returns the error_detail from the underlying v3 request. + */ + public boolean hasErrorDetail() { + return v3Request().hasErrorDetail(); + } + + /** + * Returns the resource_names_subscribe from the underlying v2 or v3 request. + */ + public List getResourceNamesSubscribeList() { + return v3Request().getResourceNamesSubscribeList(); + } + + /** + * Returns the resource_names_unsubscribe from the underlying v2 or v3 request. + */ + public List getResourceNamesUnsubscribeList() { + return v3Request().getResourceNamesUnsubscribeList(); + } + + /** + * Returns the initial_resource_versions from the underlying v2 or v3 request. + */ + public Map getInitialResourceVersionsMap() { + return v3Request().getInitialResourceVersionsMap(); + } +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/GroupCacheStatusInfo.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/GroupCacheStatusInfo.java index 1b192764b..682b2e8d2 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/GroupCacheStatusInfo.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/GroupCacheStatusInfo.java @@ -8,10 +8,10 @@ * {@code GroupCacheStatusInfo} provides an implementation of {@link StatusInfo} for a group of {@link CacheStatusInfo}. */ @ThreadSafe -class GroupCacheStatusInfo implements StatusInfo { - private final Collection> statuses; +public class GroupCacheStatusInfo implements StatusInfo { + private final Collection> statuses; - public GroupCacheStatusInfo(Collection> statuses) { + public GroupCacheStatusInfo(Collection> statuses) { this.statuses = new ArrayList<>(statuses); } @@ -20,7 +20,7 @@ public GroupCacheStatusInfo(Collection> statuses) { */ @Override public long lastWatchRequestTime() { - return statuses.stream().mapToLong(CacheStatusInfo::lastWatchRequestTime).max().orElse(0); + return statuses.stream().mapToLong(StatusInfo::lastWatchRequestTime).max().orElse(0); } /** @@ -28,7 +28,7 @@ public long lastWatchRequestTime() { */ @Override public T nodeGroup() { - return statuses.stream().map(CacheStatusInfo::nodeGroup).findFirst().orElse(null); + return statuses.stream().map(StatusInfo::nodeGroup).findFirst().orElse(null); } /** @@ -36,6 +36,7 @@ public T nodeGroup() { */ @Override public int numWatches() { - return statuses.stream().mapToInt(CacheStatusInfo::numWatches).sum(); + return statuses.stream().mapToInt(StatusInfo::numWatches).sum(); } + } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/MutableStatusInfo.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/MutableStatusInfo.java new file mode 100644 index 000000000..464e06779 --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/MutableStatusInfo.java @@ -0,0 +1,85 @@ +package io.envoyproxy.controlplane.cache; + +import com.google.common.collect.ImmutableSet; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiFunction; + +public class MutableStatusInfo> implements StatusInfo { + private final ConcurrentMap watches = new ConcurrentHashMap<>(); + private final T nodeGroup; + private volatile long lastWatchRequestTime; + + protected MutableStatusInfo(T nodeGroup) { + this.nodeGroup = nodeGroup; + } + + /** + * {@inheritDoc} + */ + public long lastWatchRequestTime() { + return lastWatchRequestTime; + } + + /** + * {@inheritDoc} + */ + public T nodeGroup() { + return nodeGroup; + } + + /** + * {@inheritDoc} + */ + public int numWatches() { + return watches.size(); + } + + /** + * Removes the given watch from the tracked collection of watches. + * + * @param watchId the ID for the watch that should be removed + */ + public void removeWatch(long watchId) { + watches.remove(watchId); + } + + + /** + * Sets the timestamp of the last discovery watch request. + * + * @param lastWatchRequestTime the latest watch request timestamp + */ + public void setLastWatchRequestTime(long lastWatchRequestTime) { + this.lastWatchRequestTime = lastWatchRequestTime; + } + + /** + * Adds the given watch to the tracked collection of watches. + * + * @param watchId the ID for the watch that should be added + * @param watch the watch that should be added + */ + public void setWatch(long watchId, V watch) { + watches.put(watchId, watch); + } + + /** + * Returns the set of IDs for all watched currently being tracked. + */ + public Set watchIds() { + return ImmutableSet.copyOf(watches.keySet()); + } + + /** + * Iterate over all tracked watches and execute the given function. If it returns {@code true}, then the watch is + * removed from the tracked collection. If it returns {@code false}, then the watch is not removed. + * + * @param filter the function to execute on each watch + */ + public void watchesRemoveIf(BiFunction filter) { + watches.entrySet().removeIf(entry -> filter.apply(entry.getKey(), entry.getValue())); + } +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/ResourceMapBuilder.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/ResourceMapBuilder.java new file mode 100644 index 000000000..084ff78dc --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/ResourceMapBuilder.java @@ -0,0 +1,37 @@ +package io.envoyproxy.controlplane.cache; + +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Message; + +class ResourceMapBuilder { + + private final ImmutableMap.Builder> versionedResources = ImmutableMap.builder(); + private final ImmutableMap.Builder resources = ImmutableMap.builder(); + + + ImmutableMap> getVersionedResources() { + return versionedResources.build(); + } + + ImmutableMap getResources() { + return resources.build(); + } + + void put(Object resource) { + if (resource instanceof VersionedResource) { + VersionedResource eCast = (VersionedResource) resource; + versionedResources.put(Resources.getResourceName(eCast.resource()), eCast); + resources.put(Resources.getResourceName(eCast.resource()), eCast.resource()); + } else { + T eCast = (T) resource; + versionedResources.put(Resources.getResourceName(eCast), VersionedResource.create(eCast)); + resources.put(Resources.getResourceName(eCast), eCast); + } + } + + ResourceMapBuilder putAll(ResourceMapBuilder other) { + versionedResources.putAll(other.getVersionedResources()); + resources.putAll(other.getResources()); + return this; + } +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/Resources.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/Resources.java index deb3e28a9..b405005aa 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/Resources.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/Resources.java @@ -148,10 +148,11 @@ public static String getResourceName(Any anyResource) { * * @param resources the resource whose dependencies we are calculating */ - public static Set getResourceReferences(Collection resources) { + public static Set getResourceReferences(Collection> resources) { final ImmutableSet.Builder refs = ImmutableSet.builder(); - for (Message r : resources) { + for (VersionedResource sr : resources) { + Message r = sr.resource(); if (r instanceof ClusterLoadAssignment || r instanceof RouteConfiguration) { // Endpoints have no dependencies. diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java index 2fc80deab..08738e5a5 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/SimpleCache.java @@ -4,24 +4,29 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.protobuf.Message; import io.envoyproxy.controlplane.cache.Resources.ResourceType; + import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.concurrent.GuardedBy; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +51,7 @@ public abstract class SimpleCache implements SnapshotCach @GuardedBy("lock") private final Map snapshots = new HashMap<>(); - private final ConcurrentMap>> statuses = - new ConcurrentHashMap<>(); + private final CacheStatusInfoAggregator statuses = new CacheStatusInfoAggregator<>(); private AtomicLong watchCount = new AtomicLong(); @@ -68,10 +72,8 @@ public boolean clearSnapshot(T group) { // we take a writeLock to prevent watches from being created writeLock.lock(); try { - Map> status = statuses.get(group); - // If we don't know about this group, do nothing. - if (status != null && status.values().stream().mapToLong(CacheStatusInfo::numWatches).sum() > 0) { + if (statuses.hasStatuses(group)) { LOGGER.warn("tried to clear snapshot for group with existing watches, group={}", group); return false; @@ -115,8 +117,7 @@ public Watch createWatch( // doesn't conflict readLock.lock(); try { - CacheStatusInfo status = statuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>()) - .computeIfAbsent(requestResourceType, s -> new CacheStatusInfo<>(group)); + CacheStatusInfo status = statuses.getOrAddStatusInfo(group, requestResourceType); status.setLastWatchRequestTime(System.currentTimeMillis()); U snapshot = snapshots.get(group); @@ -152,21 +153,7 @@ public Watch createWatch( // If the requested version is up-to-date or missing a response, leave an open watch. if (snapshot == null || request.getVersionInfo().equals(version)) { - long watchId = watchCount.incrementAndGet(); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("open watch {} for {}[{}] from node {} for version {}", - watchId, - request.getTypeUrl(), - String.join(", ", request.getResourceNamesList()), - group, - request.getVersionInfo()); - } - - status.setWatch(watchId, watch); - - watch.setStop(() -> status.removeWatch(watchId)); - + openWatch(status, watch, request.getTypeUrl(), request.getResourceNamesList(), group, request.getVersionInfo()); return watch; } @@ -174,28 +161,121 @@ public Watch createWatch( boolean responded = respond(watch, snapshot, group); if (!responded) { - long watchId = watchCount.incrementAndGet(); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("did not respond immediately, leaving open watch {} for {}[{}] from node {} for version {}", - watchId, - request.getTypeUrl(), - String.join(", ", request.getResourceNamesList()), - group, - request.getVersionInfo()); + openWatch(status, watch, request.getTypeUrl(), request.getResourceNamesList(), group, request.getVersionInfo()); + } + + return watch; + } finally { + readLock.unlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public DeltaWatch createDeltaWatch( + DeltaXdsRequest request, + String requesterVersion, + Map resourceVersions, + Set pendingResources, + boolean isWildcard, + Consumer responseConsumer, + boolean hasClusterChanged) { + + ResourceType requestResourceType = request.getResourceType(); + Preconditions.checkNotNull(requestResourceType, "unsupported type URL %s", + request.getTypeUrl()); + T group; + group = groups.hash(request.v3Request().getNode()); + + // even though we're modifying, we take a readLock to allow multiple watches to be created in parallel since it + // doesn't conflict + readLock.lock(); + try { + DeltaCacheStatusInfo status = statuses.getOrAddDeltaStatusInfo(group, requestResourceType); + status.setLastWatchRequestTime(System.currentTimeMillis()); + + U snapshot = snapshots.get(group); + String version = snapshot == null ? "" : snapshot.version(requestResourceType, Collections.emptyList()); + + DeltaWatch watch = new DeltaWatch(request, + ImmutableMap.copyOf(resourceVersions), + ImmutableSet.copyOf(pendingResources), + requesterVersion, + isWildcard, + responseConsumer); + + // If no snapshot, leave an open watch. + + if (snapshot == null) { + openWatch(status, watch, request.getTypeUrl(), watch.trackedResources().keySet(), group, requesterVersion); + return watch; + } + + // If the requested version is up-to-date or missing a response, leave an open watch. + if (version.equals(requesterVersion)) { + // If the request is not wildcard, we have pending resources and we have them, we should respond immediately. + if (!isWildcard && watch.pendingResources().size() != 0) { + // If any of the pending resources are in the snapshot respond immediately. If not we'll fall back to + // version comparisons. + Map> resources = snapshot.versionedResources(request.getResourceType()); + Map> requestedResources = watch.pendingResources() + .stream() + .filter(resources::containsKey) + .collect(Collectors.toMap(Function.identity(), resources::get)); + ResponseState responseState = respondDelta(watch, + requestedResources, + Collections.emptyList(), + version, + group); + if (responseState.isFinished()) { + return watch; + } + } else if (hasClusterChanged && requestResourceType.equals(ResourceType.ENDPOINT)) { + ResponseState responseState = respondDelta(request, watch, snapshot, version, group); + if (responseState.isFinished()) { + return watch; + } } - status.setWatch(watchId, watch); + openWatch(status, watch, request.getTypeUrl(), watch.trackedResources().keySet(), group, requesterVersion); + + return watch; + } - watch.setStop(() -> status.removeWatch(watchId)); + // Otherwise, version is different, the watch may be responded immediately + ResponseState responseState = respondDelta(request, watch, snapshot, version, group); + if (responseState.isFinished()) { + return watch; } + openWatch(status, watch, request.getTypeUrl(), watch.trackedResources().keySet(), group, requesterVersion); return watch; } finally { readLock.unlock(); } } + private > void openWatch(MutableStatusInfo status, + V watch, + String url, + Collection resources, + T group, + String version) { + long watchId = watchCount.incrementAndGet(); + status.setWatch(watchId, watch); + watch.setStop(() -> status.removeWatch(watchId)); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("open watch {} for {}[{}] from node {} for version {}", + watchId, + url, + String.join(", ", resources), + group, + version); + } + } + /** * {@inheritDoc} */ @@ -215,7 +295,7 @@ public U getSnapshot(T group) { */ @Override public Collection groups() { - return ImmutableSet.copyOf(statuses.keySet()); + return ImmutableSet.copyOf(statuses.groups()); } /** @@ -224,39 +304,47 @@ public Collection groups() { @Override public synchronized void setSnapshot(T group, U snapshot) { // we take a writeLock to prevent watches from being created while we update the snapshot - ConcurrentMap> status; + Map> status; + Map> deltaStatus; + U previousSnapshot; writeLock.lock(); try { // Update the existing snapshot entry. - snapshots.put(group, snapshot); - status = statuses.get(group); + previousSnapshot = snapshots.put(group, snapshot); + status = statuses.getStatus(group); + deltaStatus = statuses.getDeltaStatus(group); } finally { writeLock.unlock(); } - if (status == null) { + if (status.isEmpty() && deltaStatus.isEmpty()) { return; } // Responses should be in specific order and typeUrls has a list of resources in the right // order. - respondWithSpecificOrder(group, snapshot, status); + respondWithSpecificOrder(group, previousSnapshot, snapshot, status, deltaStatus); } /** * {@inheritDoc} */ @Override - public StatusInfo statusInfo(T group) { + public StatusInfo statusInfo(T group) { readLock.lock(); try { - ConcurrentMap> statusMap = statuses.get(group); - if (statusMap == null || statusMap.isEmpty()) { + Map> statusMap = statuses.getStatus(group); + Map> deltaStatusMap = statuses.getDeltaStatus(group); + + if (statusMap.isEmpty() && deltaStatusMap.isEmpty()) { return null; } - return new GroupCacheStatusInfo<>(statusMap.values()); + List> collection = Stream.concat(statusMap.values().stream(), + deltaStatusMap.values().stream()).collect(Collectors.toList()); + + return new GroupCacheStatusInfo<>(collection); } finally { readLock.unlock(); } @@ -264,55 +352,111 @@ public StatusInfo statusInfo(T group) { @VisibleForTesting protected void respondWithSpecificOrder(T group, - U snapshot, - ConcurrentMap> statusMap) { + U previousSnapshot, U snapshot, + Map> statusMap, + Map> deltaStatusMap) { for (ResourceType resourceType : RESOURCE_TYPES_IN_ORDER) { CacheStatusInfo status = statusMap.get(resourceType); - if (status == null) { - continue; - } + if (status != null) { + status.watchesRemoveIf((id, watch) -> { + if (!watch.request().getResourceType().equals(resourceType)) { + return false; + } + String version = snapshot.version(watch.request().getResourceType(), + watch.request().getResourceNamesList()); - status.watchesRemoveIf((id, watch) -> { - if (!watch.request().getResourceType().equals(resourceType)) { - return false; - } - String version = snapshot.version(watch.request().getResourceType(), - watch.request().getResourceNamesList()); - - if (!watch.request().getVersionInfo().equals(version)) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("responding to open watch {}[{}] with new version {}", - id, - String.join(", ", watch.request().getResourceNamesList()), - version); + if (!watch.request().getVersionInfo().equals(version)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("responding to open watch {}[{}] with new version {}", + id, + String.join(", ", watch.request().getResourceNamesList()), + version); + } + + respond(watch, snapshot, group); + + // Discard the watch. A new watch will be created for future snapshots once envoy ACKs the response. + return true; } - respond(watch, snapshot, group); + // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond. + return false; + }); + } - // Discard the watch. A new watch will be created for future snapshots once envoy ACKs the response. - return true; - } + DeltaCacheStatusInfo deltaStatus = deltaStatusMap.get(resourceType); + if (deltaStatus != null) { + Map> previousResources = previousSnapshot == null + ? Collections.emptyMap() + : previousSnapshot.versionedResources(resourceType); + Map> snapshotResources = snapshot.versionedResources(resourceType); + + Map> snapshotChangedResources = snapshotResources.entrySet() + .stream() + .filter(entry -> { + VersionedResource versionedResource = previousResources.get(entry.getKey()); + return versionedResource == null || !versionedResource + .version().equals(entry.getValue().version()); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Set snapshotRemovedResources = previousResources.keySet() + .stream() + .filter(s -> !snapshotResources.containsKey(s)) + .collect(Collectors.toSet()); + deltaStatus.watchesRemoveIf((id, watch) -> { + String version = snapshot.version(watch.request().getResourceType(), Collections.emptyList()); + + if (!watch.version().equals(version)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("responding to open watch {}[{}] with new version {}", + id, + String.join(", ", watch.trackedResources().keySet()), + version); + } + + List removedResources = snapshotRemovedResources.stream() + .filter(s -> watch.trackedResources().get(s) != null) + .collect(Collectors.toList()); + + Map> changedResources = findChangedResources(watch, snapshotChangedResources); + + + ResponseState responseState = respondDelta(watch, + changedResources, + removedResources, + version, + group); + // Discard the watch if it was responded or cancelled. + // A new watch will be created for future snapshots once envoy ACKs the response. + return responseState.isFinished(); + } - // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond. - return false; - }); + // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond. + return false; + }); + } } } - private Response createResponse(XdsRequest request, Map resources, - String version) { + private Response createResponse(XdsRequest request, Map> resources, + String version) { Collection filtered = request.getResourceNamesList().isEmpty() - ? resources.values() + ? resources.values().stream() + .map(VersionedResource::resource) + .collect(Collectors.toList()) : request.getResourceNamesList().stream() .map(resources::get) .filter(Objects::nonNull) + .map(VersionedResource::resource) .collect(Collectors.toList()); return Response.create(request, filtered, version); } private boolean respond(Watch watch, U snapshot, T group) { - Map snapshotResources = snapshot.resources(watch.request().getResourceType()); + Map> snapshotResources = + snapshot.versionedResources(watch.request().getResourceType()); if (!watch.request().getResourceNamesList().isEmpty() && watch.ads()) { Collection missingNames = watch.request().getResourceNamesList().stream() @@ -360,4 +504,83 @@ private boolean respond(Watch watch, U snapshot, T group) { return false; } + + private List findRemovedResources(DeltaWatch watch, Map> snapshotResources) { + // remove resources for which client has a tracked version but do not exist in snapshot + return watch.trackedResources().keySet() + .stream() + .filter(s -> !snapshotResources.containsKey(s)) + .collect(Collectors.toList()); + } + + private Map> findChangedResources(DeltaWatch watch, + Map> snapshotResources) { + return snapshotResources.entrySet() + .stream() + .filter(entry -> { + if (watch.pendingResources().contains(entry.getKey())) { + return true; + } + String resourceVersion = watch.trackedResources().get(entry.getKey()); + if (resourceVersion == null) { + // resource is not tracked, should respond it only if watch is wildcard + return watch.isWildcard(); + } + return !entry.getValue().version().equals(resourceVersion); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + + private ResponseState respondDelta(DeltaXdsRequest request, DeltaWatch watch, U snapshot, String version, T group) { + Map> snapshotResources = snapshot.versionedResources(request.getResourceType()); + List removedResources = findRemovedResources(watch, + snapshotResources); + Map> changedResources = findChangedResources(watch, snapshotResources); + return respondDelta(watch, + changedResources, + removedResources, + version, + group); + } + + private ResponseState respondDelta(DeltaWatch watch, + Map> resources, + List removedResources, + String version, + T group) { + if (resources.isEmpty() && removedResources.isEmpty()) { + return ResponseState.UNRESPONDED; + } + + DeltaResponse response = DeltaResponse.create( + watch.request(), + resources, + removedResources, + version); + + try { + watch.respond(response); + return ResponseState.RESPONDED; + } catch (WatchCancelledException e) { + LOGGER.error( + "failed to respond for {} from node {} with version {} because watch was already cancelled", + watch.request().getTypeUrl(), + group, + version); + } + + return ResponseState.CANCELLED; + } + + private enum ResponseState { + RESPONDED, + UNRESPONDED, + CANCELLED; + + private boolean isFinished() { + return this.equals(RESPONDED) || this.equals(CANCELLED); + } + + } } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/Snapshot.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/Snapshot.java index 41ec42b7b..bdc82c838 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/Snapshot.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/Snapshot.java @@ -2,9 +2,11 @@ import com.google.protobuf.Message; import io.envoyproxy.controlplane.cache.Resources.ResourceType; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.StreamSupport; public abstract class Snapshot { @@ -15,17 +17,17 @@ public abstract class Snapshot { /** * Asserts that all of the given resource names have corresponding values in the given resources collection. * - * @param parentTypeUrl the type of the parent resources (source of the resource name refs) + * @param parentTypeUrl the type of the parent resources (source of the resource name refs) * @param dependencyTypeUrl the type of the given dependent resources - * @param resourceNames the set of dependent resource names that must exist - * @param resources the collection of resources whose names are being checked + * @param resourceNames the set of dependent resource names that must exist + * @param resources the collection of resources whose names are being checked * @throws SnapshotConsistencyException if a name is given that does not exist in the resources collection */ - protected static void ensureAllResourceNamesExist( + protected static void ensureAllResourceNamesExist( String parentTypeUrl, String dependencyTypeUrl, Set resourceNames, - Map resources) throws SnapshotConsistencyException { + Map> resources) throws SnapshotConsistencyException { if (resourceNames.size() != resources.size()) { throw new SnapshotConsistencyException( @@ -49,4 +51,18 @@ protected static void ensureAllResourceNamesExist( } } } + + public abstract Map> versionedResources(ResourceType resourceType); + + private static Iterable getIterableFromIterator(Iterator iterator) { + return () -> iterator; + } + + protected static Iterable> generateSnapshotResourceIterable( + Iterable resources) { + return getIterableFromIterator( + StreamSupport.stream(resources.spliterator(), false) + .map((r) -> VersionedResource.create(r)) + .iterator()); + } } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/SnapshotResources.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/SnapshotResources.java index 107e173ca..f270ada2a 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/SnapshotResources.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/SnapshotResources.java @@ -1,9 +1,7 @@ package io.envoyproxy.controlplane.cache; import com.google.auto.value.AutoValue; -import com.google.common.collect.ImmutableMap; import com.google.protobuf.Message; - import java.util.List; import java.util.Map; import java.util.stream.Collector; @@ -16,41 +14,54 @@ public abstract class SnapshotResources { * Returns a new {@link SnapshotResources} instance. * * @param resources the resources in this collection - * @param version the version associated with the resources in this collection - * @param the type of resources in this collection + * @param version the version associated with the resources in this collection + * @param the type of resources in this collection */ - public static SnapshotResources create(Iterable resources, String version) { + public static SnapshotResources create( + Iterable resources, + String version) { + ResourceMapBuilder resourcesMapBuilder = createResourcesMap(resources); return new AutoValue_SnapshotResources<>( - resourcesMap(resources), + resourcesMapBuilder.getVersionedResources(), + resourcesMapBuilder.getResources(), (r) -> version ); } + /** * Returns a new {@link SnapshotResources} instance with versions by resource name. * - * @param resources the resources in this collection + * @param resources the resources in this collection * @param versionResolver version resolver for the resources in this collection - * @param the type of resources in this collection + * @param the type of resources in this collection */ public static SnapshotResources create( - Iterable resources, + Iterable> resources, ResourceVersionResolver versionResolver) { + ResourceMapBuilder resourcesMapBuilder = createResourcesMap(resources); return new AutoValue_SnapshotResources<>( - resourcesMap(resources), + resourcesMapBuilder.getVersionedResources(), + resourcesMapBuilder.getResources(), versionResolver); } - private static ImmutableMap resourcesMap(Iterable resources) { + private static ResourceMapBuilder createResourcesMap( + Iterable resources) { + return StreamSupport.stream(resources.spliterator(), false) .collect( Collector.of( - ImmutableMap.Builder::new, - (b, e) -> b.put(Resources.getResourceName(e), e), - (b1, b2) -> b1.putAll(b2.build()), - ImmutableMap.Builder::build)); + ResourceMapBuilder::new, + ResourceMapBuilder::put, + ResourceMapBuilder::putAll)); } + /** + * Returns a map of the resources in this collection, where the key is the name of the resource. + */ + public abstract Map> versionedResources(); + /** * Returns a map of the resources in this collection, where the key is the name of the resource. */ diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/StatusInfo.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/StatusInfo.java index 2a763a649..aab186b2a 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/StatusInfo.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/StatusInfo.java @@ -6,7 +6,6 @@ * {@code StatusInfo} tracks the state for remote envoy nodes. */ public interface StatusInfo { - /** * Returns the timestamp of the last discovery watch request. */ @@ -22,4 +21,5 @@ public interface StatusInfo { * Returns the number of open watches. */ int numWatches(); + } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/TestResources.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/TestResources.java index 3ee2b5e62..cc877c344 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/TestResources.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/TestResources.java @@ -71,6 +71,7 @@ public static Cluster createCluster(String clusterName) { * @param clusterName name of the new cluster * @param address address to use for the cluster endpoint * @param port port to use for the cluster endpoint + * @param discoveryType service discovery type */ public static Cluster createCluster( String clusterName, String address, int port, Cluster.DiscoveryType discoveryType) { @@ -148,6 +149,7 @@ public static ClusterLoadAssignment createEndpoint(String clusterName, String ad */ public static Listener createListener( boolean ads, + boolean delta, ApiVersion rdsTransportVersion, ApiVersion rdsResourceVersion, String listenerName, @@ -165,7 +167,7 @@ public static Listener createListener( .setApiConfigSource( ApiConfigSource.newBuilder() .setTransportApiVersion(rdsTransportVersion) - .setApiType(ApiConfigSource.ApiType.GRPC) + .setApiType(delta ? ApiConfigSource.ApiType.DELTA_GRPC : ApiConfigSource.ApiType.GRPC) .addGrpcServices( GrpcService.newBuilder() .setEnvoyGrpc( diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/VersionedResource.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/VersionedResource.java new file mode 100644 index 000000000..91b413df2 --- /dev/null +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/VersionedResource.java @@ -0,0 +1,56 @@ +package io.envoyproxy.controlplane.cache; + +import com.google.auto.value.AutoValue; +import com.google.common.hash.Hashing; +import com.google.protobuf.Message; + +import java.nio.charset.StandardCharsets; + +@AutoValue +public abstract class VersionedResource { + + /** + * Returns a new {@link VersionedResource} instance. + * + * @param resource the resource + * @param version the version associated with the resource + * @param the type of resource + */ + public static VersionedResource create(T resource, String version) { + return new AutoValue_VersionedResource<>( + resource, + version + ); + } + + /** + * Returns a new {@link VersionedResource} instance. + * + * @param resource the resource + * @param the type of resource + */ + public static VersionedResource create(T resource) { + return new AutoValue_VersionedResource<>( + resource, + // todo: is this a stable hash? + Hashing.sha256() + .hashString(resourceHashCode(resource), StandardCharsets.UTF_8) + .toString() + ); + } + + private static String resourceHashCode(T resource) { + return resource.getClass() + "@" + resource.hashCode(); + } + + /** + * Returns the resource. + */ + public abstract T resource(); + + /** + * Returns the version associated with the resource. + */ + public abstract String version(); + +} diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/Watch.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/Watch.java index 76605fac3..69bc6e6e0 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/Watch.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/Watch.java @@ -1,20 +1,13 @@ package io.envoyproxy.controlplane.cache; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Consumer; /** * {@code Watch} is a dedicated stream of configuration resources produced by the configuration cache and consumed by * the xDS server. */ -public class Watch { - private static final AtomicIntegerFieldUpdater isCancelledUpdater = - AtomicIntegerFieldUpdater.newUpdater(Watch.class, "isCancelled"); +public class Watch extends AbstractWatch { private final boolean ads; - private final XdsRequest request; - private final Consumer responseConsumer; - private volatile int isCancelled = 0; - private Runnable stop; /** * Construct a watch. @@ -24,9 +17,8 @@ public class Watch { * @param responseConsumer handler for outgoing response messages */ public Watch(boolean ads, XdsRequest request, Consumer responseConsumer) { + super(request, responseConsumer); this.ads = ads; - this.request = request; - this.responseConsumer = responseConsumer; } /** @@ -36,51 +28,4 @@ public boolean ads() { return ads; } - /** - * Cancel the watch. A watch must be cancelled in order to complete its resource stream and free resources. Cancel - * may be called multiple times, with each subsequent call being a no-op. - */ - public void cancel() { - if (isCancelledUpdater.compareAndSet(this, 0, 1)) { - if (stop != null) { - stop.run(); - } - } - } - - /** - * Returns boolean indicating whether or not the watch has been cancelled. - */ - public boolean isCancelled() { - return isCancelledUpdater.get(this) == 1; - } - - /** - * Returns the original request for the watch. - */ - public XdsRequest request() { - return request; - } - - /** - * Sends the given response to the watch's response handler. - * - * @param response the response to be handled - * @throws WatchCancelledException if the watch has already been cancelled - */ - public void respond(Response response) throws WatchCancelledException { - if (isCancelled()) { - throw new WatchCancelledException(); - } - - responseConsumer.accept(response); - } - - /** - * Sets the callback method to be executed when the watch is cancelled. Even if cancel is executed multiple times, it - * ensures that this stop callback is only executed once. - */ - public void setStop(Runnable stop) { - this.stop = stop; - } } diff --git a/cache/src/main/java/io/envoyproxy/controlplane/cache/v3/Snapshot.java b/cache/src/main/java/io/envoyproxy/controlplane/cache/v3/Snapshot.java index 26188841b..97f0e0abf 100644 --- a/cache/src/main/java/io/envoyproxy/controlplane/cache/v3/Snapshot.java +++ b/cache/src/main/java/io/envoyproxy/controlplane/cache/v3/Snapshot.java @@ -10,6 +10,7 @@ import io.envoyproxy.controlplane.cache.Resources.ResourceType; import io.envoyproxy.controlplane.cache.SnapshotConsistencyException; import io.envoyproxy.controlplane.cache.SnapshotResources; +import io.envoyproxy.controlplane.cache.VersionedResource; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.listener.v3.Listener; @@ -21,20 +22,21 @@ import java.util.Set; /** - * {@code Snapshot} is a data class that contains an internally consistent snapshot of v3 xDS - * resources. Snapshots should have distinct versions per node group. + * {@code Snapshot} is a data class that contains an internally consistent snapshot of v3 xDS resources. Snapshots + * should have distinct versions per node group. */ @AutoValue public abstract class Snapshot extends io.envoyproxy.controlplane.cache.Snapshot { + /** - * Returns a new {@link io.envoyproxy.controlplane.cache.v3.Snapshot} instance that is versioned - * uniformly across all resources. + * Returns a new {@link io.envoyproxy.controlplane.cache.v3.Snapshot} instance that is versioned uniformly across all + * resources. * - * @param clusters the cluster resources in this snapshot + * @param clusters the cluster resources in this snapshot * @param endpoints the endpoint resources in this snapshot * @param listeners the listener resources in this snapshot - * @param routes the route resources in this snapshot - * @param version the version associated with all resources in this snapshot + * @param routes the route resources in this snapshot + * @param version the version associated with all resources in this snapshot */ public static Snapshot create( Iterable clusters, @@ -45,25 +47,30 @@ public static Snapshot create( String version) { return new AutoValue_Snapshot( - SnapshotResources.create(clusters, version), - SnapshotResources.create(endpoints, version), - SnapshotResources.create(listeners, version), - SnapshotResources.create(routes, version), - SnapshotResources.create(secrets, version)); + SnapshotResources + .create(generateSnapshotResourceIterable(clusters), version), + SnapshotResources + .create(generateSnapshotResourceIterable(endpoints), version), + SnapshotResources + .create(generateSnapshotResourceIterable(listeners), version), + SnapshotResources + .create(generateSnapshotResourceIterable(routes), version), + SnapshotResources + .create(generateSnapshotResourceIterable(secrets), version)); } /** - * Returns a new {@link io.envoyproxy.controlplane.cache.v3.Snapshot} instance that has separate - * versions for each resource type. + * Returns a new {@link io.envoyproxy.controlplane.cache.v3.Snapshot} instance that has separate versions for each + * resource type. * - * @param clusters the cluster resources in this snapshot - * @param clustersVersion the version of the cluster resources - * @param endpoints the endpoint resources in this snapshot + * @param clusters the cluster resources in this snapshot + * @param clustersVersion the version of the cluster resources + * @param endpoints the endpoint resources in this snapshot * @param endpointsVersion the version of the endpoint resources - * @param listeners the listener resources in this snapshot + * @param listeners the listener resources in this snapshot * @param listenersVersion the version of the listener resources - * @param routes the route resources in this snapshot - * @param routesVersion the version of the route resources + * @param routes the route resources in this snapshot + * @param routesVersion the version of the route resources */ public static Snapshot create( Iterable clusters, @@ -79,11 +86,16 @@ public static Snapshot create( // TODO(snowp): add a builder alternative return new AutoValue_Snapshot( - SnapshotResources.create(clusters, clustersVersion), - SnapshotResources.create(endpoints, endpointsVersion), - SnapshotResources.create(listeners, listenersVersion), - SnapshotResources.create(routes, routesVersion), - SnapshotResources.create(secrets, secretsVersion)); + SnapshotResources.create(generateSnapshotResourceIterable(clusters), + clustersVersion), + SnapshotResources.create(generateSnapshotResourceIterable(endpoints), + endpointsVersion), + SnapshotResources.create(generateSnapshotResourceIterable(listeners), + listenersVersion), + SnapshotResources + .create(generateSnapshotResourceIterable(routes), routesVersion), + SnapshotResources.create(generateSnapshotResourceIterable(secrets), + secretsVersion)); } /** @@ -132,16 +144,16 @@ public static Snapshot createEmpty(String version) { */ public void ensureConsistent() throws SnapshotConsistencyException { Set clusterEndpointRefs = - Resources.getResourceReferences(clusters().resources().values()); + Resources.getResourceReferences(clusters().versionedResources().values()); ensureAllResourceNamesExist(Resources.V3.CLUSTER_TYPE_URL, Resources.V3.ENDPOINT_TYPE_URL, - clusterEndpointRefs, endpoints().resources()); + clusterEndpointRefs, endpoints().versionedResources()); Set listenerRouteRefs = - Resources.getResourceReferences(listeners().resources().values()); + Resources.getResourceReferences(listeners().versionedResources().values()); ensureAllResourceNamesExist(Resources.V3.LISTENER_TYPE_URL, Resources.V3.ROUTE_TYPE_URL, - listenerRouteRefs, routes().resources()); + listenerRouteRefs, routes().versionedResources()); } /** @@ -149,7 +161,7 @@ public void ensureConsistent() throws SnapshotConsistencyException { * * @param typeUrl the type URL of the requested resource type */ - public Map resources(String typeUrl) { + public Map> resources(String typeUrl) { if (Strings.isNullOrEmpty(typeUrl)) { return ImmutableMap.of(); } @@ -159,7 +171,7 @@ public void ensureConsistent() throws SnapshotConsistencyException { return ImmutableMap.of(); } - return resources(resourceType); + return versionedResources(resourceType); } /** @@ -170,15 +182,37 @@ public void ensureConsistent() throws SnapshotConsistencyException { public Map resources(ResourceType resourceType) { switch (resourceType) { case CLUSTER: - return clusters().resources(); + return (Map) clusters().resources(); + case ENDPOINT: + return (Map) endpoints().resources(); + case LISTENER: + return (Map) listeners().resources(); + case ROUTE: + return (Map) routes().resources(); + case SECRET: + return (Map) secrets().resources(); + default: + return ImmutableMap.of(); + } + } + + /** + * Returns the resources with the given type. + * + * @param resourceType the requested resource type + */ + public Map> versionedResources(ResourceType resourceType) { + switch (resourceType) { + case CLUSTER: + return (Map) clusters().versionedResources(); case ENDPOINT: - return endpoints().resources(); + return (Map) endpoints().versionedResources(); case LISTENER: - return listeners().resources(); + return (Map) listeners().versionedResources(); case ROUTE: - return routes().resources(); + return (Map) routes().versionedResources(); case SECRET: - return secrets().resources(); + return (Map) secrets().versionedResources(); default: return ImmutableMap.of(); } @@ -196,9 +230,8 @@ public String version(String typeUrl) { /** * Returns the version in this snapshot for the given resource type. * - * @param typeUrl the type URL of the requested resource type - * @param resourceNames list of requested resource names, - * used to calculate a version for the given resources + * @param typeUrl the type URL of the requested resource type + * @param resourceNames list of requested resource names, used to calculate a version for the given resources */ public String version(String typeUrl, List resourceNames) { if (Strings.isNullOrEmpty(typeUrl)) { @@ -219,9 +252,8 @@ public String version(ResourceType resourceType) { /** * Returns the version in this snapshot for the given resource type. * - * @param resourceType the the requested resource type - * @param resourceNames list of requested resource names, - * used to calculate a version for the given resources + * @param resourceType the the requested resource type + * @param resourceNames list of requested resource names, used to calculate a version for the given resources */ @Override public String version(ResourceType resourceType, List resourceNames) { diff --git a/cache/src/test/java/io/envoyproxy/controlplane/cache/ResourcesTest.java b/cache/src/test/java/io/envoyproxy/controlplane/cache/ResourcesTest.java index 18697f46d..a3c7522f2 100644 --- a/cache/src/test/java/io/envoyproxy/controlplane/cache/ResourcesTest.java +++ b/cache/src/test/java/io/envoyproxy/controlplane/cache/ResourcesTest.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import org.junit.Test; @@ -34,18 +35,22 @@ public class ResourcesTest { private static final int ENDPOINT_PORT = ThreadLocalRandom.current().nextInt(10000, 20000); private static final int LISTENER_PORT = ThreadLocalRandom.current().nextInt(20000, 30000); - private static final Cluster CLUSTER = TestResources.createCluster(CLUSTER_NAME); - private static final ClusterLoadAssignment ENDPOINT = - TestResources.createEndpoint(CLUSTER_NAME, ENDPOINT_PORT); - private static final Listener LISTENER = - TestResources.createListener(ADS, V3, V3, LISTENER_NAME, LISTENER_PORT, ROUTE_NAME); - private static final RouteConfiguration ROUTE = - TestResources.createRoute(ROUTE_NAME, CLUSTER_NAME); - private static final Secret SECRET = TestResources.createSecret(SECRET_NAME); + private static final VersionedResource CLUSTER = + VersionedResource.create(TestResources.createCluster(CLUSTER_NAME), UUID.randomUUID().toString()); + private static final VersionedResource ENDPOINT = VersionedResource.create( + TestResources.createEndpoint(CLUSTER_NAME, ENDPOINT_PORT), UUID.randomUUID().toString()); + private static final VersionedResource LISTENER = VersionedResource.create( + TestResources.createListener(ADS, false, V3, V3, LISTENER_NAME, LISTENER_PORT, ROUTE_NAME), + UUID.randomUUID().toString()); + private static final VersionedResource ROUTE = + VersionedResource.create(TestResources.createRoute(ROUTE_NAME, CLUSTER_NAME), UUID.randomUUID().toString()); + private static final VersionedResource SECRET = VersionedResource.create( + TestResources.createSecret(SECRET_NAME), + UUID.randomUUID().toString()); @Test public void getResourceNameReturnsExpectedNameForValidResourceMessage() { - Map cases = + ImmutableMap, String> cases = ImmutableMap.of( CLUSTER, CLUSTER_NAME, ENDPOINT, CLUSTER_NAME, @@ -55,7 +60,7 @@ public void getResourceNameReturnsExpectedNameForValidResourceMessage() { cases.forEach( (resource, expectedName) -> - assertThat(Resources.getResourceName(resource)).isEqualTo(expectedName)); + assertThat(Resources.getResourceName(resource.resource())).isEqualTo(expectedName)); } @Test @@ -75,24 +80,25 @@ public void getResourceNameAnyThrowsOnBadClass() { @Test public void getResourceReferencesReturnsExpectedReferencesForValidResourceMessages() { String clusterServiceName = "clusterWithServiceName0"; - Cluster clusterWithServiceName = - Cluster.newBuilder() - .setName(CLUSTER_NAME) - .setEdsClusterConfig( - Cluster.EdsClusterConfig.newBuilder().setServiceName(clusterServiceName)) - .setType(Cluster.DiscoveryType.EDS) - .build(); - - Map, Set> cases = - ImmutableMap., Set>builder() - .put(ImmutableList.of(CLUSTER), ImmutableSet.of(CLUSTER_NAME)) - .put(ImmutableList.of(clusterWithServiceName), ImmutableSet.of(clusterServiceName)) - .put(ImmutableList.of(ENDPOINT), ImmutableSet.of()) - .put(ImmutableList.of(LISTENER), ImmutableSet.of(ROUTE_NAME)) - .put(ImmutableList.of(ROUTE), ImmutableSet.of()) + VersionedResource clusterWithServiceName = + VersionedResource.create(Cluster.newBuilder() + .setName(CLUSTER_NAME) + .setEdsClusterConfig( + Cluster.EdsClusterConfig.newBuilder().setServiceName(clusterServiceName)) + .setType(Cluster.DiscoveryType.EDS) + .build(), + UUID.randomUUID().toString()); + + Map>, Set> cases = + ImmutableMap.>, Set>builder() + .put((Collection) ImmutableList.of(CLUSTER), ImmutableSet.of(CLUSTER_NAME)) + .put((Collection) ImmutableList.of(clusterWithServiceName), ImmutableSet.of(clusterServiceName)) + .put((Collection) ImmutableList.of(ENDPOINT), ImmutableSet.of()) + .put((Collection) ImmutableList.of(LISTENER), ImmutableSet.of(ROUTE_NAME)) + .put((Collection)ImmutableList.of(ROUTE), ImmutableSet.of()) .put( - ImmutableList.of(CLUSTER, ENDPOINT, LISTENER, ROUTE), - ImmutableSet.of(CLUSTER_NAME, ROUTE_NAME)) + (Collection) ImmutableList.of(CLUSTER, ENDPOINT, LISTENER, ROUTE), + (Collection) ImmutableSet.of(CLUSTER_NAME, ROUTE_NAME)) .build(); cases.forEach( diff --git a/cache/src/test/java/io/envoyproxy/controlplane/cache/SnapshotResourcesTest.java b/cache/src/test/java/io/envoyproxy/controlplane/cache/SnapshotResourcesTest.java index 27cb69292..32e7c2383 100644 --- a/cache/src/test/java/io/envoyproxy/controlplane/cache/SnapshotResourcesTest.java +++ b/cache/src/test/java/io/envoyproxy/controlplane/cache/SnapshotResourcesTest.java @@ -16,8 +16,10 @@ public class SnapshotResourcesTest { private static final String CLUSTER0_NAME = "cluster0"; private static final String CLUSTER1_NAME = "cluster1"; - private static final Cluster CLUSTER0 = TestResources.createCluster(CLUSTER0_NAME); - private static final Cluster CLUSTER1 = TestResources.createCluster(CLUSTER1_NAME); + private static final VersionedResource CLUSTER0 = VersionedResource.create( + TestResources.createCluster(CLUSTER0_NAME), UUID.randomUUID().toString()); + private static final VersionedResource CLUSTER1 = VersionedResource.create( + TestResources.createCluster(CLUSTER1_NAME), UUID.randomUUID().toString()); @Test public void createBuildsResourcesMapWithNameAndPopulatesVersion() { @@ -26,8 +28,8 @@ public void createBuildsResourcesMapWithNameAndPopulatesVersion() { SnapshotResources snapshot = SnapshotResources.create(ImmutableList.of(CLUSTER0, CLUSTER1), version); assertThat(snapshot.resources()) - .containsEntry(CLUSTER0_NAME, CLUSTER0) - .containsEntry(CLUSTER1_NAME, CLUSTER1) + .containsEntry(CLUSTER0_NAME, CLUSTER0.resource()) + .containsEntry(CLUSTER1_NAME, CLUSTER1.resource()) .hasSize(2); assertThat(snapshot.version()).isEqualTo(version); diff --git a/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SimpleCacheTest.java b/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SimpleCacheTest.java index 3894fbd96..a5de127ed 100644 --- a/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SimpleCacheTest.java +++ b/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SimpleCacheTest.java @@ -11,6 +11,7 @@ import io.envoyproxy.controlplane.cache.Resources; import io.envoyproxy.controlplane.cache.Response; import io.envoyproxy.controlplane.cache.StatusInfo; +import io.envoyproxy.controlplane.cache.VersionedResource; import io.envoyproxy.controlplane.cache.Watch; import io.envoyproxy.controlplane.cache.XdsRequest; import io.envoyproxy.envoy.config.cluster.v3.Cluster; @@ -69,6 +70,23 @@ public class SimpleCacheTest { ImmutableList.of(Secret.newBuilder().setName(SECRET_NAME).build()), VERSION2); + private static void assertThatWatchIsOpenWithNoResponses(WatchAndTracker watchAndTracker) { + assertThat(watchAndTracker.watch.isCancelled()).isFalse(); + Assertions.assertThat(watchAndTracker.tracker.responses).isEmpty(); + } + + private static void assertThatWatchReceivesSnapshot(WatchAndTracker watchAndTracker, Snapshot snapshot) { + Assertions.assertThat(watchAndTracker.tracker.responses).isNotEmpty(); + + Response response = watchAndTracker.tracker.responses.getFirst(); + + assertThat(response).isNotNull(); + assertThat(response.version()).isEqualTo(snapshot.version(watchAndTracker.watch.request().getTypeUrl())); + assertThat(response.resources().toArray(new Message[0])) + .containsExactlyElementsOf(snapshot.resources(watchAndTracker.watch.request().getTypeUrl()).values() + .stream().map(VersionedResource::resource).collect(Collectors.toList())); + } + @Test public void invalidNamesListShouldReturnWatcherWithNoResponseInAdsMode() { SimpleCache cache = new SimpleCache<>(new SingleNodeGroup()); @@ -420,7 +438,7 @@ public void watchIsLeftOpenIfNotRespondedImmediately() { .setNode(Node.getDefaultInstance()) .setTypeUrl(ROUTE_TYPE_URL) .addAllResourceNames(Collections.singleton(ROUTE_NAME)) - .build()), + .build()), Collections.singleton(ROUTE_NAME), responseTracker); @@ -461,7 +479,8 @@ public void clearSnapshotWithWatches() { .setVersionInfo(SNAPSHOT1.version(CLUSTER_TYPE_URL)) .build()), Collections.emptySet(), - r -> { }); + r -> { + }); // clearSnapshot should fail and the snapshot should be left untouched assertThat(cache.clearSnapshot(SingleNodeGroup.GROUP)).isFalse(); @@ -487,27 +506,12 @@ public void groups() { .setTypeUrl(CLUSTER_TYPE_URL) .build()), Collections.emptySet(), - r -> { }); + r -> { + }); assertThat(cache.groups()).containsExactly(SingleNodeGroup.GROUP); } - private static void assertThatWatchIsOpenWithNoResponses(WatchAndTracker watchAndTracker) { - assertThat(watchAndTracker.watch.isCancelled()).isFalse(); - Assertions.assertThat(watchAndTracker.tracker.responses).isEmpty(); - } - - private static void assertThatWatchReceivesSnapshot(WatchAndTracker watchAndTracker, Snapshot snapshot) { - Assertions.assertThat(watchAndTracker.tracker.responses).isNotEmpty(); - - Response response = watchAndTracker.tracker.responses.getFirst(); - - assertThat(response).isNotNull(); - assertThat(response.version()).isEqualTo(snapshot.version(watchAndTracker.watch.request().getTypeUrl())); - assertThat(response.resources().toArray(new Message[0])) - .containsExactlyElementsOf(snapshot.resources(watchAndTracker.watch.request().getTypeUrl()).values()); - } - private static class ResponseTracker implements Consumer { private final LinkedList responses = new LinkedList<>(); @@ -523,7 +527,8 @@ private static class ResponseOrderTracker implements Consumer { private final LinkedList responseTypes = new LinkedList<>(); - @Override public void accept(Response response) { + @Override + public void accept(Response response) { responseTypes.add(response.request().getTypeUrl()); } } diff --git a/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SnapshotTest.java b/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SnapshotTest.java index af88401d6..d40b02bd6 100644 --- a/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SnapshotTest.java +++ b/cache/src/test/java/io/envoyproxy/controlplane/cache/v3/SnapshotTest.java @@ -10,20 +10,20 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.google.common.collect.ImmutableList; -import com.google.protobuf.Message; import io.envoyproxy.controlplane.cache.SnapshotConsistencyException; import io.envoyproxy.controlplane.cache.TestResources; +import io.envoyproxy.controlplane.cache.VersionedResource; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import org.junit.Test; public class SnapshotTest { + private static final boolean ADS = ThreadLocalRandom.current().nextBoolean(); private static final String CLUSTER_NAME = "cluster0"; private static final String LISTENER_NAME = "listener0"; @@ -37,7 +37,7 @@ public class SnapshotTest { private static final ClusterLoadAssignment ENDPOINT = TestResources.createEndpoint(CLUSTER_NAME, ENDPOINT_PORT); private static final Listener - LISTENER = TestResources.createListener(ADS, V3, V3, LISTENER_NAME, LISTENER_PORT, ROUTE_NAME); + LISTENER = TestResources.createListener(ADS, false, V3, V3, LISTENER_NAME, LISTENER_PORT, ROUTE_NAME); private static final RouteConfiguration ROUTE = TestResources.createRoute(ROUTE_NAME, CLUSTER_NAME); private static final Secret SECRET = TestResources.createSecret(SECRET_NAME); @@ -129,20 +129,20 @@ public void resourcesReturnsExpectedResources() { // due to limitations with // generic type constraints. - assertThat((Map) snapshot.resources(CLUSTER_TYPE_URL)) - .containsEntry(CLUSTER_NAME, CLUSTER) + assertThat(snapshot.resources(CLUSTER_TYPE_URL)) + .containsEntry(CLUSTER_NAME, VersionedResource.create(CLUSTER)) .hasSize(1); - assertThat((Map) snapshot.resources(ENDPOINT_TYPE_URL)) - .containsEntry(CLUSTER_NAME, ENDPOINT) + assertThat(snapshot.resources(ENDPOINT_TYPE_URL)) + .containsEntry(CLUSTER_NAME, VersionedResource.create(ENDPOINT)) .hasSize(1); - assertThat((Map) snapshot.resources(LISTENER_TYPE_URL)) - .containsEntry(LISTENER_NAME, LISTENER) + assertThat(snapshot.resources(LISTENER_TYPE_URL)) + .containsEntry(LISTENER_NAME, VersionedResource.create(LISTENER)) .hasSize(1); - assertThat((Map) snapshot.resources(ROUTE_TYPE_URL)) - .containsEntry(ROUTE_NAME, ROUTE) + assertThat(snapshot.resources(ROUTE_TYPE_URL)) + .containsEntry(ROUTE_NAME, VersionedResource.create(ROUTE)) .hasSize(1); String nullString = null; diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/AdsDeltaDiscoveryRequestStreamObserver.java b/server/src/main/java/io/envoyproxy/controlplane/server/AdsDeltaDiscoveryRequestStreamObserver.java new file mode 100644 index 000000000..58424eda9 --- /dev/null +++ b/server/src/main/java/io/envoyproxy/controlplane/server/AdsDeltaDiscoveryRequestStreamObserver.java @@ -0,0 +1,160 @@ +package io.envoyproxy.controlplane.server; + +import static io.envoyproxy.controlplane.server.DiscoveryServer.ANY_TYPE_URL; + +import io.envoyproxy.controlplane.cache.DeltaWatch; +import io.envoyproxy.controlplane.cache.Resources; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.function.Supplier; + +/** + * {@code AdsDeltaDiscoveryRequestStreamObserver} is an implementation of {@link DeltaDiscoveryRequestStreamObserver} + * tailored for ADS streams, which handle multiple watches for all TYPE_URLS. + */ + +public class AdsDeltaDiscoveryRequestStreamObserver extends DeltaDiscoveryRequestStreamObserver { + private final ConcurrentMap watches; + private final ConcurrentMap latestVersion; + private final ConcurrentMap> responses; + // tracked and pending resources are always accessed in the observer + // so they are safe from race, no need for concurrent map + private final Map> trackedResourceMap; + private final Map> pendingResourceMap; + + AdsDeltaDiscoveryRequestStreamObserver(StreamObserver responseObserver, + long streamId, + Executor executor, + DiscoveryServer discoveryServer) { + super(ANY_TYPE_URL, responseObserver, streamId, executor, discoveryServer); + + this.watches = new ConcurrentHashMap<>(Resources.V3.TYPE_URLS.size()); + this.latestVersion = new ConcurrentHashMap<>(Resources.V3.TYPE_URLS.size()); + this.responses = new ConcurrentHashMap<>(Resources.V3.TYPE_URLS.size()); + this.trackedResourceMap = new HashMap<>(Resources.V3.TYPE_URLS.size()); + this.pendingResourceMap = new HashMap<>(Resources.V3.TYPE_URLS.size()); + } + + @Override + public void onNext(V request) { + if (discoveryServer.wrapDeltaXdsRequest(request).getTypeUrl().isEmpty()) { + closeWithError( + Status.UNKNOWN + .withDescription(String.format("[%d] type URL is required for ADS", streamId)) + .asRuntimeException()); + + return; + } + + super.onNext(request); + } + + @Override + void cancel() { + watches.values().forEach(DeltaWatch::cancel); + } + + @Override + boolean ads() { + return true; + } + + @Override + void setLatestVersion(String typeUrl, String version) { + latestVersion.put(typeUrl, version); + if (typeUrl.equals(Resources.V3.CLUSTER_TYPE_URL)) { + hasClusterChanged = true; + } else if (typeUrl.equals(Resources.V3.ENDPOINT_TYPE_URL)) { + hasClusterChanged = false; + } + } + + @Override + String latestVersion(String typeUrl) { + return latestVersion.get(typeUrl); + } + + @Override + void setResponse(String typeUrl, String nonce, LatestDeltaDiscoveryResponse response) { + responses.computeIfAbsent(typeUrl, s -> new ConcurrentHashMap<>()) + .put(nonce, response); + } + + @Override + LatestDeltaDiscoveryResponse clearResponse(String typeUrl, String nonce) { + return responses.computeIfAbsent(typeUrl, s -> new ConcurrentHashMap<>()) + .remove(nonce); + } + + @Override + int responseCount(String typeUrl) { + return responses.computeIfAbsent(typeUrl, s -> new ConcurrentHashMap<>()) + .size(); + } + + @Override + Map resourceVersions(String typeUrl) { + return trackedResourceMap.getOrDefault(typeUrl, Collections.emptyMap()); + } + + @Override + Set pendingResources(String typeUrl) { + return pendingResourceMap.getOrDefault(typeUrl, Collections.emptySet()); + } + + @Override + boolean isWildcard(String typeUrl) { + Resources.ResourceType resourceType = Resources.TYPE_URLS_TO_RESOURCE_TYPE.get(typeUrl); + return Resources.ResourceType.CLUSTER.equals(resourceType) + || Resources.ResourceType.LISTENER.equals(resourceType); + } + + @Override + void updateTrackedResources(String typeUrl, + Map resourcesVersions, + List removedResources) { + + Map trackedResources = trackedResourceMap.computeIfAbsent(typeUrl, s -> new HashMap<>()); + Set pendingResources = pendingResourceMap.computeIfAbsent(typeUrl, s -> new HashSet<>()); + resourcesVersions.forEach((k, v) -> { + trackedResources.put(k, v); + pendingResources.remove(k); + }); + removedResources.forEach(trackedResources::remove); + } + + @Override + void updateSubscriptions(String typeUrl, + List resourceNamesSubscribe, + List resourceNamesUnsubscribe) { + + Map trackedResources = trackedResourceMap.computeIfAbsent(typeUrl, s -> new HashMap<>()); + Set pendingResources = pendingResourceMap.computeIfAbsent(typeUrl, s -> new HashSet<>()); + // unsubscribe first + resourceNamesUnsubscribe.forEach(s -> { + trackedResources.remove(s); + pendingResources.remove(s); + }); + pendingResources.addAll(resourceNamesSubscribe); + } + + @Override + void computeWatch(String typeUrl, Supplier watchCreator) { + watches.compute(typeUrl, (s, watch) -> { + if (watch != null) { + watch.cancel(); + } + + return watchCreator.get(); + }); + } +} diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/AdsDiscoveryRequestStreamObserver.java b/server/src/main/java/io/envoyproxy/controlplane/server/AdsDiscoveryRequestStreamObserver.java index 02ec584c3..869361b4b 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/AdsDiscoveryRequestStreamObserver.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/AdsDiscoveryRequestStreamObserver.java @@ -22,19 +22,16 @@ public class AdsDiscoveryRequestStreamObserver extends DiscoveryRequestStr private final ConcurrentMap watches; private final ConcurrentMap latestResponse; private final ConcurrentMap> ackedResources; - private final DiscoveryServer discoveryServer; - AdsDiscoveryRequestStreamObserver( - StreamObserver responseObserver, - long streamId, - Executor executor, - DiscoveryServer discoveryServer) { + AdsDiscoveryRequestStreamObserver(StreamObserver responseObserver, + long streamId, + Executor executor, + DiscoveryServer discoveryServer) { super(ANY_TYPE_URL, responseObserver, streamId, executor, discoveryServer); this.watches = new ConcurrentHashMap<>(Resources.V3.TYPE_URLS.size()); this.latestResponse = new ConcurrentHashMap<>(Resources.V3.TYPE_URLS.size()); this.ackedResources = new ConcurrentHashMap<>(Resources.V3.TYPE_URLS.size()); - this.discoveryServer = discoveryServer; } @Override diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/DeltaDiscoveryRequestStreamObserver.java b/server/src/main/java/io/envoyproxy/controlplane/server/DeltaDiscoveryRequestStreamObserver.java new file mode 100644 index 000000000..1bb59a7df --- /dev/null +++ b/server/src/main/java/io/envoyproxy/controlplane/server/DeltaDiscoveryRequestStreamObserver.java @@ -0,0 +1,246 @@ +package io.envoyproxy.controlplane.server; + +import io.envoyproxy.controlplane.cache.DeltaResponse; +import io.envoyproxy.controlplane.cache.DeltaWatch; +import io.envoyproxy.controlplane.cache.DeltaXdsRequest; +import io.envoyproxy.controlplane.cache.Resources; +import io.envoyproxy.controlplane.server.exception.RequestException; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@code DeltaDiscoveryRequestStreamObserver} provides the base implementation for Delta XDS stream handling. + * The proto message types are abstracted so it can be used with different xDS versions. + */ +public abstract class DeltaDiscoveryRequestStreamObserver implements StreamObserver { + private static final AtomicLongFieldUpdater streamNonceUpdater = + AtomicLongFieldUpdater.newUpdater(DeltaDiscoveryRequestStreamObserver.class, "streamNonce"); + private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class); + + final long streamId; + private final String defaultTypeUrl; + private final StreamObserver responseObserver; + private final Executor executor; + volatile boolean hasClusterChanged; + final DiscoveryServer discoveryServer; + private volatile long streamNonce; + private volatile boolean isClosing; + + DeltaDiscoveryRequestStreamObserver(String defaultTypeUrl, + StreamObserver responseObserver, + long streamId, + Executor executor, + DiscoveryServer discoveryServer) { + this.defaultTypeUrl = defaultTypeUrl; + this.responseObserver = responseObserver; + this.streamId = streamId; + this.executor = executor; + this.streamNonce = 0; + this.discoveryServer = discoveryServer; + this.hasClusterChanged = false; + } + + @Override + public void onNext(V rawRequest) { + DeltaXdsRequest request = discoveryServer.wrapDeltaXdsRequest(rawRequest); + String requestTypeUrl = request.getTypeUrl().isEmpty() ? defaultTypeUrl : request.getTypeUrl(); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("[{}] request {}[{}] with nonce {} from versions {}", + streamId, + requestTypeUrl, + String.join(", ", request.getResourceNamesSubscribeList()), + request.getResponseNonce(), + request.getInitialResourceVersionsMap()); + } + + try { + discoveryServer.runStreamDeltaRequestCallbacks(streamId, rawRequest); + } catch (RequestException e) { + closeWithError(e); + return; + } + + final String version; + if (latestVersion(requestTypeUrl) == null) { + version = ""; + } else { + version = latestVersion(requestTypeUrl); + } + + // always update subscriptions + updateSubscriptions(requestTypeUrl, + request.getResourceNamesSubscribeList(), + request.getResourceNamesUnsubscribeList()); + + if (!request.getResponseNonce().isEmpty()) { + // envoy is replying to a response we sent, get and clear respective response + LatestDeltaDiscoveryResponse response = clearResponse(requestTypeUrl, request.getResponseNonce()); + if (!request.hasErrorDetail()) { + // if envoy has acked, update tracked resources + // from the corresponding response + updateTrackedResources(requestTypeUrl, + response.resourceVersions(), + response.removedResources()); + } + } + + // if nonce is empty, envoy is only requesting new resources or this is a new connection, + // in either case we have already updated the subscriptions, + // but we should only create watches when there's no pending ack + // this tries to ensure we don't have two outstanding responses + if (responseCount(requestTypeUrl) == 0) { + computeWatch(requestTypeUrl, () -> discoveryServer.configWatcher.createDeltaWatch( + request, + version, + resourceVersions(requestTypeUrl), + pendingResources(requestTypeUrl), + isWildcard(requestTypeUrl), + r -> executor.execute(() -> send(r, requestTypeUrl)), + hasClusterChanged + )); + } + } + + @Override + public void onError(Throwable t) { + if (!Status.fromThrowable(t).getCode().equals(Status.CANCELLED.getCode())) { + LOGGER.error("[{}] stream closed with error", streamId, t); + } + + try { + discoveryServer.callbacks.forEach(cb -> cb.onStreamCloseWithError(streamId, defaultTypeUrl, t)); + closeWithError(Status.fromThrowable(t).asException()); + } finally { + cancel(); + } + } + + @Override + public void onCompleted() { + LOGGER.debug("[{}] stream closed", streamId); + + try { + discoveryServer.callbacks.forEach(cb -> cb.onStreamClose(streamId, defaultTypeUrl)); + synchronized (responseObserver) { + if (!isClosing) { + isClosing = true; + responseObserver.onCompleted(); + } + } + } finally { + cancel(); + } + } + + void onCancelled() { + LOGGER.info("[{}] stream cancelled", streamId); + cancel(); + } + + void closeWithError(Throwable exception) { + synchronized (responseObserver) { + if (!isClosing) { + isClosing = true; + responseObserver.onError(exception); + } + } + cancel(); + } + + private void send(DeltaResponse response, String typeUrl) { + String nonce = Long.toString(streamNonceUpdater.getAndIncrement(this)); + List resources = response.resources() + .entrySet() + .stream() + .map(entry -> discoveryServer.makeResource(entry.getKey(), + entry.getValue().version(), + discoveryServer.protoResourcesSerializer.serialize( + entry.getValue().resource(), + Resources.getResourceApiVersion(typeUrl)))) + .collect(Collectors.toList()); + + X discoveryResponse = discoveryServer.makeDeltaResponse( + typeUrl, + response.version(), + nonce, + resources, + response.removedResources() + ); + + LOGGER.debug("[{}] response {} with nonce {} version {}", streamId, typeUrl, nonce, + response.version()); + + discoveryServer.runStreamDeltaResponseCallbacks(streamId, response.request(), discoveryResponse); + + // Store the latest response *before* we send the response. This ensures that by the time the request + // is processed the map is guaranteed to be updated. Doing it afterwards leads to a race conditions + // which may see the incoming request arrive before the map is updated, failing the nonce check erroneously. + setResponse( + typeUrl, + nonce, + LatestDeltaDiscoveryResponse.create( + nonce, + response.version(), + response.resources() + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().version())), + response.removedResources() + ) + ); + setLatestVersion(typeUrl, response.version()); + + synchronized (responseObserver) { + if (!isClosing) { + try { + responseObserver.onNext(discoveryResponse); + } catch (StatusRuntimeException e) { + if (!Status.CANCELLED.getCode().equals(e.getStatus().getCode())) { + throw e; + } + } + } + } + } + + abstract void cancel(); + + abstract boolean ads(); + + abstract void setLatestVersion(String typeUrl, String version); + + abstract String latestVersion(String typeUrl); + + abstract void setResponse(String typeUrl, String nonce, LatestDeltaDiscoveryResponse response); + + abstract LatestDeltaDiscoveryResponse clearResponse(String typeUrl, String nonce); + + abstract int responseCount(String typeUrl); + + abstract Map resourceVersions(String typeUrl); + + abstract Set pendingResources(String typeUrl); + + abstract boolean isWildcard(String typeUrl); + + abstract void updateTrackedResources(String typeUrl, + Map resourcesVersions, + List removedResources); + + abstract void updateSubscriptions(String typeUrl, + List resourceNamesSubscribe, + List resourceNamesUnsubscribe); + + abstract void computeWatch(String typeUrl, Supplier watchCreator); +} diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryRequestStreamObserver.java b/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryRequestStreamObserver.java index 49df750e9..9788ceaea 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryRequestStreamObserver.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryRequestStreamObserver.java @@ -30,19 +30,19 @@ public abstract class DiscoveryRequestStreamObserver implements StreamObse private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class); final long streamId; - volatile boolean hasClusterChanged; + final DiscoveryServer discoveryServer; private final String defaultTypeUrl; private final StreamObserver responseObserver; private final Executor executor; - private final DiscoveryServer discoveryServer; + volatile boolean hasClusterChanged; private volatile long streamNonce; private volatile boolean isClosing; DiscoveryRequestStreamObserver(String defaultTypeUrl, - StreamObserver responseObserver, - long streamId, - Executor executor, - DiscoveryServer discoveryServer) { + StreamObserver responseObserver, + long streamId, + Executor executor, + DiscoveryServer discoveryServer) { this.defaultTypeUrl = defaultTypeUrl; this.responseObserver = responseObserver; this.streamId = streamId; diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServer.java b/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServer.java index ca028882d..f2efff981 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServer.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServer.java @@ -3,6 +3,7 @@ import com.google.common.base.Preconditions; import com.google.protobuf.Any; import io.envoyproxy.controlplane.cache.ConfigWatcher; +import io.envoyproxy.controlplane.cache.DeltaXdsRequest; import io.envoyproxy.controlplane.cache.XdsRequest; import io.envoyproxy.controlplane.server.serializer.ProtoResourcesSerializer; import io.grpc.stub.ServerCallStreamObserver; @@ -14,7 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class DiscoveryServer { +public abstract class DiscoveryServer { static final String ANY_TYPE_URL = ""; private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class); final List callbacks; @@ -31,9 +32,9 @@ public abstract class DiscoveryServer { * @param protoResourcesSerializer serializer of proto buffer messages */ protected DiscoveryServer(List callbacks, - ConfigWatcher configWatcher, - ExecutorGroup executorGroup, - ProtoResourcesSerializer protoResourcesSerializer) { + ConfigWatcher configWatcher, + ExecutorGroup executorGroup, + ProtoResourcesSerializer protoResourcesSerializer) { Preconditions.checkNotNull(executorGroup, "executorGroup cannot be null"); Preconditions.checkNotNull(callbacks, "callbacks cannot be null"); Preconditions.checkNotNull(configWatcher, "configWatcher cannot be null"); @@ -47,13 +48,24 @@ protected DiscoveryServer(List callbacks, protected abstract XdsRequest wrapXdsRequest(T request); + protected abstract DeltaXdsRequest wrapDeltaXdsRequest(V request); + protected abstract U makeResponse(String version, Collection resources, String typeUrl, - String nonce); + String nonce); + + public abstract X makeDeltaResponse(String typeUrl, String version, String nonce, List resources, + List removedResources); + + protected abstract Y makeResource(String name, String version, Any resource); protected abstract void runStreamRequestCallbacks(long streamId, T request); + protected abstract void runStreamDeltaRequestCallbacks(long streamId, V request); + protected abstract void runStreamResponseCallbacks(long streamId, XdsRequest request, U response); + protected abstract void runStreamDeltaResponseCallbacks(long streamId, DeltaXdsRequest request, X response); + StreamObserver createRequestHandler( StreamObserver responseObserver, boolean ads, @@ -88,4 +100,41 @@ StreamObserver createRequestHandler( return requestStreamObserver; } + + StreamObserver createDeltaRequestHandler( + StreamObserver responseObserver, + boolean ads, + String defaultTypeUrl) { + + long streamId = streamCount.getAndIncrement(); + Executor executor = executorGroup.next(); + + LOGGER.debug("[{}] open stream from {}", streamId, defaultTypeUrl); + + callbacks.forEach(cb -> cb.onStreamOpen(streamId, defaultTypeUrl)); + + final DeltaDiscoveryRequestStreamObserver requestStreamObserver; + if (ads) { + requestStreamObserver = new AdsDeltaDiscoveryRequestStreamObserver<>( + responseObserver, + streamId, + executor, + this + ); + } else { + requestStreamObserver = new XdsDeltaDiscoveryRequestStreamObserver<>( + defaultTypeUrl, + responseObserver, + streamId, + executor, + this + ); + } + + if (responseObserver instanceof ServerCallStreamObserver) { + ((ServerCallStreamObserver) responseObserver).setOnCancelHandler(requestStreamObserver::onCancelled); + } + + return requestStreamObserver; + } } diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServerCallbacks.java b/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServerCallbacks.java index 2d13e47cd..e2cc05245 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServerCallbacks.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/DiscoveryServerCallbacks.java @@ -1,6 +1,8 @@ package io.envoyproxy.controlplane.server; import io.envoyproxy.controlplane.server.exception.RequestException; +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; @@ -56,6 +58,19 @@ default void onStreamOpen(long streamId, String typeUrl) { */ void onV3StreamRequest(long streamId, DiscoveryRequest request); + /** + * {@code onV3StreamRequest} is called for each {@link io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest} + * that is received on the stream. + * + * @param streamId an ID for this stream that is only unique to this discovery server instance + * @param request the discovery request sent by the envoy instance + * + * @throws RequestException optionally can throw {@link RequestException} with custom status. That status + * will be returned to the client and the stream will be closed with error. + */ + void onV3StreamDeltaRequest(long streamId, + DeltaDiscoveryRequest request); + /** * {@code onV3StreamResponse} is called just before each * {@link DiscoveryResponse} that is sent on the stream. @@ -66,4 +81,17 @@ default void onStreamOpen(long streamId, String typeUrl) { */ default void onV3StreamResponse(long streamId, DiscoveryRequest request, DiscoveryResponse response) { } + + /** + * {@code onV3StreamResponse} is called just before each + * {@link io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse} that is sent on the stream. + * + * @param streamId an ID for this stream that is only unique to this discovery server instance + * @param request the discovery request sent by the envoy instance + * @param response the discovery response sent by the discovery server + */ + default void onV3StreamDeltaResponse(long streamId, + DeltaDiscoveryRequest request, + DeltaDiscoveryResponse response) { + } } diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/LatestDeltaDiscoveryResponse.java b/server/src/main/java/io/envoyproxy/controlplane/server/LatestDeltaDiscoveryResponse.java new file mode 100644 index 000000000..d657cdd72 --- /dev/null +++ b/server/src/main/java/io/envoyproxy/controlplane/server/LatestDeltaDiscoveryResponse.java @@ -0,0 +1,26 @@ +package io.envoyproxy.controlplane.server; + +import com.google.auto.value.AutoValue; +import java.util.List; +import java.util.Map; + +/** + * Class introduces optimization which store only required data during next request. + */ +@AutoValue +public abstract class LatestDeltaDiscoveryResponse { + static LatestDeltaDiscoveryResponse create(String nonce, + String version, + Map resourceVersions, + List removedResources) { + return new AutoValue_LatestDeltaDiscoveryResponse(nonce, version, resourceVersions, removedResources); + } + + abstract String nonce(); + + abstract String version(); + + abstract Map resourceVersions(); + + abstract List removedResources(); +} diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/V3DiscoveryServer.java b/server/src/main/java/io/envoyproxy/controlplane/server/V3DiscoveryServer.java index 601c9a059..56347eaaf 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/V3DiscoveryServer.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/V3DiscoveryServer.java @@ -9,25 +9,30 @@ import com.google.common.base.Preconditions; import com.google.protobuf.Any; import io.envoyproxy.controlplane.cache.ConfigWatcher; +import io.envoyproxy.controlplane.cache.DeltaXdsRequest; import io.envoyproxy.controlplane.cache.Resources; import io.envoyproxy.controlplane.cache.XdsRequest; import io.envoyproxy.controlplane.server.serializer.DefaultProtoResourcesSerializer; import io.envoyproxy.controlplane.server.serializer.ProtoResourcesSerializer; import io.envoyproxy.envoy.service.cluster.v3.ClusterDiscoveryServiceGrpc.ClusterDiscoveryServiceImplBase; +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryResponse; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import io.envoyproxy.envoy.service.discovery.v3.Resource; import io.grpc.stub.StreamObserver; import java.util.Collection; import java.util.Collections; import java.util.List; -public class V3DiscoveryServer extends DiscoveryServer { +public class V3DiscoveryServer extends DiscoveryServer { public V3DiscoveryServer(ConfigWatcher configWatcher) { this(Collections.emptyList(), configWatcher); } public V3DiscoveryServer(DiscoveryServerCallbacks callbacks, - ConfigWatcher configWatcher) { + ConfigWatcher configWatcher) { this(Collections.singletonList(callbacks), configWatcher); } @@ -39,7 +44,9 @@ public V3DiscoveryServer( } public V3DiscoveryServer(List callbacks, - ConfigWatcher configWatcher, ExecutorGroup executorGroup, ProtoResourcesSerializer protoResourcesSerializer) { + ConfigWatcher configWatcher, + ExecutorGroup executorGroup, + ProtoResourcesSerializer protoResourcesSerializer) { super(callbacks, configWatcher, executorGroup, protoResourcesSerializer); } @@ -54,6 +61,13 @@ public StreamObserver streamAggregatedResources( return createRequestHandler(responseObserver, true, ANY_TYPE_URL); } + + @Override + public StreamObserver deltaAggregatedResources( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, true, ANY_TYPE_URL); + } }; } @@ -68,6 +82,13 @@ public StreamObserver streamClusters( return createRequestHandler(responseObserver, false, Resources.V3.CLUSTER_TYPE_URL); } + + @Override + public StreamObserver deltaClusters( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V3.CLUSTER_TYPE_URL); + } }; } @@ -82,6 +103,13 @@ public StreamObserver streamEndpoints( return createRequestHandler(responseObserver, false, Resources.V3.ENDPOINT_TYPE_URL); } + + @Override + public StreamObserver deltaEndpoints( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V3.ENDPOINT_TYPE_URL); + } }; } @@ -96,6 +124,13 @@ public StreamObserver streamListeners( return createRequestHandler(responseObserver, false, Resources.V3.LISTENER_TYPE_URL); } + + @Override + public StreamObserver deltaListeners( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V3.LISTENER_TYPE_URL); + } }; } @@ -110,6 +145,13 @@ public StreamObserver streamRoutes( return createRequestHandler(responseObserver, false, Resources.V3.ROUTE_TYPE_URL); } + + @Override + public StreamObserver deltaRoutes( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V3.ROUTE_TYPE_URL); + } }; } @@ -121,8 +163,16 @@ public SecretDiscoveryServiceImplBase getSecretDiscoveryServiceImpl() { @Override public StreamObserver streamSecrets( StreamObserver responseObserver) { + return createRequestHandler(responseObserver, false, Resources.V3.SECRET_TYPE_URL); } + + @Override + public StreamObserver deltaSecrets( + StreamObserver responseObserver) { + + return createDeltaRequestHandler(responseObserver, false, Resources.V3.SECRET_TYPE_URL); + } }; } @@ -131,15 +181,26 @@ protected XdsRequest wrapXdsRequest(DiscoveryRequest request) { return XdsRequest.create(request); } + @Override + protected DeltaXdsRequest wrapDeltaXdsRequest(DeltaDiscoveryRequest request) { + return DeltaXdsRequest.create(request); + } + @Override protected void runStreamRequestCallbacks(long streamId, DiscoveryRequest discoveryRequest) { callbacks.forEach( cb -> cb.onV3StreamRequest(streamId, discoveryRequest)); } + @Override + protected void runStreamDeltaRequestCallbacks(long streamId, DeltaDiscoveryRequest request) { + callbacks.forEach( + cb -> cb.onV3StreamDeltaRequest(streamId, request)); + } + @Override protected void runStreamResponseCallbacks(long streamId, XdsRequest request, - DiscoveryResponse discoveryResponse) { + DiscoveryResponse discoveryResponse) { Preconditions.checkArgument(request.v3Request() != null); callbacks.forEach( cb -> cb.onV3StreamResponse(streamId, @@ -147,10 +208,20 @@ protected void runStreamResponseCallbacks(long streamId, XdsRequest request, discoveryResponse)); } + @Override + protected void runStreamDeltaResponseCallbacks(long streamId, DeltaXdsRequest request, + DeltaDiscoveryResponse response) { + Preconditions.checkArgument(request.v3Request() != null); + callbacks.forEach( + cb -> cb.onV3StreamDeltaResponse(streamId, + request.v3Request(), + response)); + } + @Override protected DiscoveryResponse makeResponse(String version, Collection resources, - String typeUrl, - String nonce) { + String typeUrl, + String nonce) { return DiscoveryResponse.newBuilder() .setNonce(nonce) .setVersionInfo(version) @@ -158,4 +229,26 @@ protected DiscoveryResponse makeResponse(String version, Collection resourc .setTypeUrl(typeUrl) .build(); } + + @Override + public DeltaDiscoveryResponse makeDeltaResponse(String typeUrl, String version, String nonce, + List resources, + List removedResources) { + return DeltaDiscoveryResponse.newBuilder() + .setTypeUrl(typeUrl) + .setSystemVersionInfo(version) + .setNonce(nonce) + .addAllResources(resources) + .addAllRemovedResources(removedResources) + .build(); + } + + @Override + protected Resource makeResource(String name, String version, Any resource) { + return Resource.newBuilder() + .setName(name) + .setVersion(version) + .setResource(resource) + .build(); + } } diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/XdsDeltaDiscoveryRequestStreamObserver.java b/server/src/main/java/io/envoyproxy/controlplane/server/XdsDeltaDiscoveryRequestStreamObserver.java new file mode 100644 index 000000000..01a424778 --- /dev/null +++ b/server/src/main/java/io/envoyproxy/controlplane/server/XdsDeltaDiscoveryRequestStreamObserver.java @@ -0,0 +1,125 @@ +package io.envoyproxy.controlplane.server; + +import io.envoyproxy.controlplane.cache.DeltaWatch; +import io.envoyproxy.controlplane.cache.Resources; +import io.grpc.stub.StreamObserver; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.function.Supplier; + +/** + * {@code XdsDeltaDiscoveryRequestStreamObserver} is a lightweight implementation of + * {@link DeltaDiscoveryRequestStreamObserver} tailored for non-ADS streams which handle a single watch. + */ +public class XdsDeltaDiscoveryRequestStreamObserver extends DeltaDiscoveryRequestStreamObserver { + // tracked is only used in the same thread so it need not be volatile + private final Map trackedResources; + private final Set pendingResources; + private final boolean isWildcard; + private final ConcurrentMap responses; + private volatile DeltaWatch watch; + private volatile String latestVersion; + + XdsDeltaDiscoveryRequestStreamObserver(String defaultTypeUrl, + StreamObserver responseObserver, + long streamId, + Executor executor, + DiscoveryServer discoveryServer) { + super(defaultTypeUrl, responseObserver, streamId, executor, discoveryServer); + this.trackedResources = new HashMap<>(); + this.pendingResources = new HashSet<>(); + Resources.ResourceType resourceType = Resources.TYPE_URLS_TO_RESOURCE_TYPE.get(defaultTypeUrl); + this.isWildcard = Resources.ResourceType.CLUSTER.equals(resourceType) + || Resources.ResourceType.LISTENER.equals(resourceType); + this.responses = new ConcurrentHashMap<>(); + } + + @Override + void cancel() { + if (watch != null) { + watch.cancel(); + } + } + + @Override + boolean ads() { + return false; + } + + @Override + void setLatestVersion(String typeUrl, String version) { + latestVersion = version; + } + + @Override + String latestVersion(String typeUrl) { + return latestVersion; + } + + @Override + void setResponse(String typeUrl, String nonce, LatestDeltaDiscoveryResponse response) { + responses.put(nonce, response); + } + + @Override + LatestDeltaDiscoveryResponse clearResponse(String typeUrl, String nonce) { + return responses.remove(nonce); + } + + @Override + int responseCount(String typeUrl) { + return responses.size(); + } + + @Override + Map resourceVersions(String typeUrl) { + return trackedResources; + } + + @Override + Set pendingResources(String typeUrl) { + return pendingResources; + } + + @Override + boolean isWildcard(String typeUrl) { + return isWildcard; + } + + @Override + void updateTrackedResources(String typeUrl, + Map resourcesVersions, + List removedResources) { + + resourcesVersions.forEach((k, v) -> { + trackedResources.put(k, v); + pendingResources.remove(k); + }); + removedResources.forEach(trackedResources::remove); + } + + @Override + void updateSubscriptions(String typeUrl, + List resourceNamesSubscribe, + List resourceNamesUnsubscribe) { + + // unsubscribe first + resourceNamesUnsubscribe.forEach(s -> { + trackedResources.remove(s); + pendingResources.remove(s); + }); + pendingResources.addAll(resourceNamesSubscribe); + } + + @Override + void computeWatch(String typeUrl, Supplier watchCreator) { + cancel(); + watch = watchCreator.get(); + } +} diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/XdsDiscoveryRequestStreamObserver.java b/server/src/main/java/io/envoyproxy/controlplane/server/XdsDiscoveryRequestStreamObserver.java index 26c3929d0..403198975 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/XdsDiscoveryRequestStreamObserver.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/XdsDiscoveryRequestStreamObserver.java @@ -18,10 +18,10 @@ public class XdsDiscoveryRequestStreamObserver extends DiscoveryRequestStr private Set ackedResources; XdsDiscoveryRequestStreamObserver(String defaultTypeUrl, - StreamObserver responseObserver, - long streamId, - Executor executor, - DiscoveryServer discoveryServer) { + StreamObserver responseObserver, + long streamId, + Executor executor, + DiscoveryServer discoveryServer) { super(defaultTypeUrl, responseObserver, streamId, executor, discoveryServer); this.ackedResources = Collections.emptySet(); } diff --git a/server/src/main/java/io/envoyproxy/controlplane/server/callback/SnapshotCollectingCallback.java b/server/src/main/java/io/envoyproxy/controlplane/server/callback/SnapshotCollectingCallback.java index 123cf2622..f0ef24270 100644 --- a/server/src/main/java/io/envoyproxy/controlplane/server/callback/SnapshotCollectingCallback.java +++ b/server/src/main/java/io/envoyproxy/controlplane/server/callback/SnapshotCollectingCallback.java @@ -6,6 +6,7 @@ import io.envoyproxy.controlplane.cache.SnapshotCache; import io.envoyproxy.controlplane.cache.v3.Snapshot; import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks; +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import java.time.Clock; @@ -43,11 +44,6 @@ */ public class SnapshotCollectingCallback implements DiscoveryServerCallbacks { - private static class SnapshotState { - int streamCount; - Instant lastSeen; - } - private final SnapshotCache snapshotCache; private final NodeGroup nodeGroup; private final Clock clock; @@ -59,16 +55,16 @@ private static class SnapshotState { /** * Creates the callback. * - * @param snapshotCache the cache to evict snapshots from - * @param nodeGroup the node group used to map requests to groups - * @param clock system clock - * @param collectorCallbacks the callbacks to invoke when snapshot is collected - * @param collectAfterMillis how long a snapshot must be referenced for before being collected + * @param snapshotCache the cache to evict snapshots from + * @param nodeGroup the node group used to map requests to groups + * @param clock system clock + * @param collectorCallbacks the callbacks to invoke when snapshot is collected + * @param collectAfterMillis how long a snapshot must be referenced for before being collected * @param collectionIntervalMillis how often the collection background action should run */ public SnapshotCollectingCallback(SnapshotCache snapshotCache, - NodeGroup nodeGroup, Clock clock, Set> collectorCallbacks, - long collectAfterMillis, long collectionIntervalMillis) { + NodeGroup nodeGroup, Clock clock, Set> collectorCallbacks, + long collectAfterMillis, long collectionIntervalMillis) { this.snapshotCache = snapshotCache; this.nodeGroup = nodeGroup; this.clock = clock; @@ -80,12 +76,18 @@ public SnapshotCollectingCallback(SnapshotCache snapshotCache, collectionIntervalMillis, TimeUnit.MILLISECONDS); } - @Override public synchronized void onV3StreamRequest(long streamId, DiscoveryRequest request) { T groupIdentifier = nodeGroup.hash(request.getNode()); updateState(streamId, groupIdentifier); } + @Override + public void onV3StreamDeltaRequest(long streamId, + DeltaDiscoveryRequest request) { + T groupIdentifier = nodeGroup.hash(request.getNode()); + updateState(streamId, groupIdentifier); + } + private void updateState(long streamId, T groupIdentifier) { SnapshotState snapshotState = this.snapshotStates.computeIfAbsent(groupIdentifier, x -> new SnapshotState()); @@ -96,11 +98,13 @@ private void updateState(long streamId, T groupIdentifier) { } } - @Override public void onStreamClose(long streamId, String typeUrl) { + @Override + public void onStreamClose(long streamId, String typeUrl) { onStreamCloseHelper(streamId); } - @Override public void onStreamCloseWithError(long streamId, String typeUrl, Throwable error) { + @Override + public void onStreamCloseWithError(long streamId, String typeUrl, Throwable error) { onStreamCloseHelper(streamId); } @@ -139,4 +143,9 @@ private synchronized void onStreamCloseHelper(long streamId) { snapshotState.streamCount--; snapshotState.lastSeen = clock.instant(); } + + private static class SnapshotState { + int streamCount; + Instant lastSeen; + } } diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V3DeltaDiscoveryServerCallbacks.java b/server/src/test/java/io/envoyproxy/controlplane/server/V3DeltaDiscoveryServerCallbacks.java new file mode 100644 index 000000000..fdb008056 --- /dev/null +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V3DeltaDiscoveryServerCallbacks.java @@ -0,0 +1,75 @@ +package io.envoyproxy.controlplane.server; + +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; +import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class V3DeltaDiscoveryServerCallbacks implements DiscoveryServerCallbacks { + + private static final Logger LOGGER = LoggerFactory + .getLogger(V3DeltaDiscoveryServerCallbacks.class); + + private final CountDownLatch onStreamOpenLatch; + private final CountDownLatch onStreamRequestLatch; + private StringBuffer nonce; + private StringBuffer errorDetail; + private ConcurrentHashMap resourceToNonceMap; + + + /** + * Returns an implementation of DiscoveryServerCallbacks that throws if it sees a nod-delta v3 request, + * and counts down on provided latches in response to certain events. + * + * @param onStreamOpenLatch latch to call countDown() on when a v3 stream is opened. + * @param onStreamRequestLatch latch to call countDown() on when a v3 request is seen. + */ + public V3DeltaDiscoveryServerCallbacks(CountDownLatch onStreamOpenLatch, + CountDownLatch onStreamRequestLatch, + StringBuffer nonce, + StringBuffer errorDetail, + ConcurrentHashMap resourceToNonceMap + ) { + this.onStreamOpenLatch = onStreamOpenLatch; + this.onStreamRequestLatch = onStreamRequestLatch; + this.nonce = nonce; + this.errorDetail = errorDetail; + this.resourceToNonceMap = resourceToNonceMap; + } + + @Override + public void onStreamOpen(long streamId, String typeUrl) { + LOGGER.info("onStreamOpen called"); + onStreamOpenLatch.countDown(); + } + + @Override + public void onV3StreamRequest(long streamId, DiscoveryRequest request) { + LOGGER.error("request={}", request); + throw new IllegalStateException("Unexpected stream request"); + + } + + @Override + public void onV3StreamDeltaRequest(long streamId, + DeltaDiscoveryRequest request) { + LOGGER.info("Got a v3StreamDeltaRequest"); + errorDetail.append(request.getErrorDetail().getMessage()); + StringBuffer resourceNonce = resourceToNonceMap + .getOrDefault(request.getTypeUrl(), new StringBuffer()); + resourceNonce.append(request.getResponseNonce()); + resourceToNonceMap.put(request.getTypeUrl(), resourceNonce); + nonce.append(request.getResponseNonce()); + onStreamRequestLatch.countDown(); + } + + @Override + public void onV3StreamResponse(long streamId, DiscoveryRequest request, + DiscoveryResponse response) { + LOGGER.info("Got a v3StreamResponse"); + } +} + diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsDeltaResourcesIT.java b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsDeltaResourcesIT.java new file mode 100644 index 000000000..7bc4c4036 --- /dev/null +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsDeltaResourcesIT.java @@ -0,0 +1,182 @@ +package io.envoyproxy.controlplane.server; + +import static io.restassured.RestAssured.given; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.containsString; + +import io.envoyproxy.controlplane.cache.Resources.V3; +import io.envoyproxy.controlplane.cache.v3.SimpleCache; +import io.envoyproxy.controlplane.cache.v3.Snapshot; +import io.grpc.netty.NettyServerBuilder; +import io.restassured.http.ContentType; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.AfterClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.RuleChain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; + +public class V3DiscoveryServerAdsDeltaResourcesIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(V3DiscoveryServerAdsDeltaResourcesIT.class); + + private static final String CONFIG = "envoy/ads.v3.delta.config.yaml"; + private static final String GROUP = "key"; + private static final Integer LISTENER_PORT = 10000; + + private static final CountDownLatch onStreamOpenLatch = new CountDownLatch(1); + private static final CountDownLatch onStreamRequestLatch = new CountDownLatch(1); + + private static ConcurrentHashMap resourceToNonceMap = new ConcurrentHashMap(); + private static StringBuffer nonce = new StringBuffer(); + private static StringBuffer errorDetails = new StringBuffer(); + + private static final SimpleCache cache = new SimpleCache<>(node -> GROUP); + + private static final NettyGrpcServerRule ADS = new NettyGrpcServerRule() { + @Override + protected void configureServerBuilder(NettyServerBuilder builder) { + + final DiscoveryServerCallbacks callbacks = + new V3DeltaDiscoveryServerCallbacks(onStreamOpenLatch, onStreamRequestLatch, nonce, + errorDetails, resourceToNonceMap); + + Snapshot snapshot = V3TestSnapshots.createSnapshot(true, + true, + "upstream", + UPSTREAM.ipAddress(), + EchoContainer.PORT, + "listener0", + LISTENER_PORT, + "route0", + "1"); + LOGGER.info("snapshot={}", snapshot); + cache.setSnapshot( + GROUP, + snapshot + ); + + V3DiscoveryServer server = new V3DiscoveryServer(callbacks, cache); + + builder.addService(server.getAggregatedDiscoveryServiceImpl()); + } + }; + + private static final Network NETWORK = Network.newNetwork(); + + private static final EnvoyContainer ENVOY = new EnvoyContainer(CONFIG, () -> ADS.getServer().getPort()) + .withExposedPorts(LISTENER_PORT) + .withNetwork(NETWORK); + + private static final EchoContainer UPSTREAM = new EchoContainer() + .withNetwork(NETWORK) + .withNetworkAliases("upstream"); + + @ClassRule + public static final RuleChain RULES = RuleChain.outerRule(UPSTREAM) + .around(ADS) + .around(ENVOY); + + @Test + public void validateTestRequestToEchoServerViaEnvoy() throws InterruptedException { + assertThat(onStreamOpenLatch.await(15, TimeUnit.SECONDS)).isTrue() + .overridingErrorMessage("failed to open ADS stream"); + + assertThat(onStreamRequestLatch.await(15, TimeUnit.SECONDS)).isTrue() + .overridingErrorMessage("failed to receive ADS request"); + + // there is no onStreamResponseLatch because V3DiscoveryServer doesn't call the callbacks + // when responding to a delta request + + String baseUri = String + .format("http://%s:%d", ENVOY.getContainerIpAddress(), ENVOY.getMappedPort(LISTENER_PORT)); + + await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().untilAsserted( + () -> given().baseUri(baseUri).contentType(ContentType.TEXT) + .when().get("/") + .then().statusCode(200) + .and().body(containsString(UPSTREAM.response))); + + // basically the nonces will count up from 0 to 3 as envoy receives more resources + // and check that no messages have been sent to errorDetails + // here just check that the nonceMap contains each of the resources we expect + // as it's not guaranteed what order they'll be received in + assertThat(nonce.toString()).isEqualTo("0123"); + assertThat(resourceToNonceMap.containsKey(V3.CLUSTER_TYPE_URL)).isTrue(); + assertThat(resourceToNonceMap.containsKey(V3.LISTENER_TYPE_URL)).isTrue(); + assertThat(resourceToNonceMap.containsKey(V3.ROUTE_TYPE_URL)).isTrue(); + assertThat(errorDetails.toString()).isEqualTo(""); + + // now write a new snapshot, with the only change being an update + // to the listener name, wait for a few seconds for envoy to pick it up, and + // check that the nonce envoy most recently ACK'd is "4" + Snapshot snapshot = V3TestSnapshots.createSnapshot(true, + true, + "upstream", + UPSTREAM.ipAddress(), + EchoContainer.PORT, + "listener1", + LISTENER_PORT, + "route0", + "2"); + LOGGER.info("snapshot={}", snapshot); + cache.setSnapshot( + GROUP, + snapshot + ); + + await().atMost(5, TimeUnit.SECONDS).pollDelay(2, TimeUnit.SECONDS).untilAsserted( + () -> { + assertThat(nonce.toString()).isEqualTo("01234"); + assertThat(errorDetails.toString()).isEqualTo(""); + assertThat(resourceToNonceMap.containsKey(V3.LISTENER_TYPE_URL)).isTrue(); + // we know that the most recent update was to the listener, so check + // that it received the most recent nonce + assertThat(resourceToNonceMap.get(V3.LISTENER_TYPE_URL).toString()).contains("4"); + } + ); + + // now increment the version but keep all the underlying resources the same. This should not + // trigger any updates, so the nonces should remain constant to above. + snapshot = V3TestSnapshots.createSnapshot(true, + true, + "upstream", + UPSTREAM.ipAddress(), + EchoContainer.PORT, + "listener1", + LISTENER_PORT, + "route0", + "3"); + LOGGER.info("snapshot={}", snapshot); + cache.setSnapshot( + GROUP, + snapshot + ); + + // wait 2 seconds before we start checking this + await().atMost(5, TimeUnit.SECONDS).pollDelay(2, TimeUnit.SECONDS).untilAsserted( + () -> { + + LOGGER.info("lastWatchRequestTime={}", cache.statusInfo(GROUP)); + assertThat(nonce.toString()).isEqualTo("01234"); + assertThat(errorDetails.toString()).isEqualTo(""); + assertThat(resourceToNonceMap.containsKey(V3.LISTENER_TYPE_URL)).isTrue(); + // we know that the most recent update was to the listener, so check + // that it received the most recent nonce + assertThat(resourceToNonceMap.get(V3.LISTENER_TYPE_URL).toString()).contains("4"); + } + ); + } + + @AfterClass + public static void after() throws Exception { + ENVOY.close(); + UPSTREAM.close(); + NETWORK.close(); + } +} diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsIT.java b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsIT.java index 7e32fd165..98061ef28 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsIT.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsIT.java @@ -42,6 +42,7 @@ protected void configureServerBuilder(NettyServerBuilder builder) { GROUP, createSnapshot( true, + false, "upstream", UPSTREAM.ipAddress(), EchoContainer.PORT, diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsWarmingClusterIT.java b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsWarmingClusterIT.java index 693155586..26eeceb69 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsWarmingClusterIT.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerAdsWarmingClusterIT.java @@ -19,6 +19,7 @@ import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; import io.envoyproxy.envoy.extensions.upstreams.http.v3.HttpProtocolOptions; +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.grpc.netty.NettyServerBuilder; @@ -60,6 +61,12 @@ public void onV3StreamRequest(long streamId, DiscoveryRequest request) { onStreamRequestLatch.countDown(); } + @Override + public void onV3StreamDeltaRequest(long streamId, + DeltaDiscoveryRequest request) { + throw new IllegalStateException("Unexpected delta request"); + } + @Override public void onV3StreamResponse(long streamId, DiscoveryRequest request, DiscoveryResponse response) { @@ -131,6 +138,7 @@ private static void createSnapshotWithWorkingClusterWithTheSameEdsVersion() { cache.setSnapshot( GROUP, createSnapshot(true, + false, "upstream", UPSTREAM.ipAddress(), EchoContainer.PORT, @@ -173,7 +181,7 @@ private static Snapshot createSnapshotWithNotWorkingCluster(boolean ads, .build(); ClusterLoadAssignment endpoint = TestResources.createEndpoint(clusterName, endpointAddress, endpointPort); - Listener listener = TestResources.createListener(ads, V3, V3, listenerName, + Listener listener = TestResources.createListener(ads, false, V3, V3, listenerName, listenerPort, routeName); RouteConfiguration route = TestResources.createRoute(routeName, clusterName); diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerTest.java b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerTest.java index a64242e0b..4b80e9d06 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerTest.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerTest.java @@ -12,6 +12,9 @@ import com.google.common.collect.Table; import com.google.protobuf.Message; import io.envoyproxy.controlplane.cache.ConfigWatcher; +import io.envoyproxy.controlplane.cache.DeltaResponse; +import io.envoyproxy.controlplane.cache.DeltaWatch; +import io.envoyproxy.controlplane.cache.DeltaXdsRequest; import io.envoyproxy.controlplane.cache.Resources; import io.envoyproxy.controlplane.cache.Response; import io.envoyproxy.controlplane.cache.TestResources; @@ -29,6 +32,7 @@ import io.envoyproxy.envoy.service.cluster.v3.ClusterDiscoveryServiceGrpc.ClusterDiscoveryServiceStub; import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub; +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.envoyproxy.envoy.service.endpoint.v3.EndpointDiscoveryServiceGrpc; @@ -86,7 +90,7 @@ public class V3DiscoveryServerTest { private static final ClusterLoadAssignment ENDPOINT = TestResources.createEndpoint(CLUSTER_NAME, ENDPOINT_PORT); private static final Listener - LISTENER = TestResources.createListener(ADS, V3, V3, LISTENER_NAME, LISTENER_PORT, + LISTENER = TestResources.createListener(ADS, false, V3, V3, LISTENER_NAME, LISTENER_PORT, ROUTE_NAME); private static final RouteConfiguration ROUTE = TestResources.createRoute(ROUTE_NAME, CLUSTER_NAME); @@ -983,7 +987,7 @@ private static class MockConfigWatcher implements ConfigWatcher { public Watch createWatch( boolean ads, XdsRequest request, - Set knownResources, + Set knownResourceNames, Consumer responseConsumer, boolean hasClusterChanged) { @@ -1015,13 +1019,23 @@ public Watch createWatch( watch.cancel(); } else { Set expectedKnown = expectedKnownResources.get(request.getTypeUrl()); - if (expectedKnown != null && !expectedKnown.equals(knownResources)) { + if (expectedKnown != null && !expectedKnown.equals(knownResourceNames)) { fail("unexpected known resources after sending all responses"); } } return watch; } + + @Override + public DeltaWatch createDeltaWatch(DeltaXdsRequest request, String requesterVersion, + Map resourceVersions, + Set pendingResources, + boolean isWildcard, + Consumer responseConsumer, + boolean hasClusterChanged) { + throw new IllegalStateException("not implemented"); + } } private static class MockDiscoveryServerCallbacks @@ -1064,6 +1078,12 @@ public void onV3StreamRequest(long streamId, DiscoveryRequest request) { } } + @Override + public void onV3StreamDeltaRequest(long streamId, + DeltaDiscoveryRequest request) { + throw new IllegalStateException("Unexpected delta request"); + } + @Override public void onV3StreamResponse(long streamId, DiscoveryRequest request, DiscoveryResponse response) { diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerXdsDeltaResourcesIT.java b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerXdsDeltaResourcesIT.java new file mode 100644 index 000000000..15527f30b --- /dev/null +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerXdsDeltaResourcesIT.java @@ -0,0 +1,187 @@ +package io.envoyproxy.controlplane.server; + +import static io.restassured.RestAssured.given; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.containsString; + +import io.envoyproxy.controlplane.cache.Resources.V3; +import io.envoyproxy.controlplane.cache.v3.SimpleCache; +import io.envoyproxy.controlplane.cache.v3.Snapshot; +import io.grpc.netty.NettyServerBuilder; +import io.restassured.http.ContentType; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.AfterClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.RuleChain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; + +public class V3DiscoveryServerXdsDeltaResourcesIT { + + private static final Logger LOGGER = LoggerFactory + .getLogger(V3DiscoveryServerXdsDeltaResourcesIT.class); + + private static final String CONFIG = "envoy/xds.v3.delta.config.yaml"; + private static final String GROUP = "key"; + private static final Integer LISTENER_PORT = 10000; + + private static final CountDownLatch onStreamOpenLatch = new CountDownLatch(1); + private static final CountDownLatch onStreamRequestLatch = new CountDownLatch(1); + + private static ConcurrentHashMap resourceToNonceMap = new ConcurrentHashMap(); + private static StringBuffer nonce = new StringBuffer(); + private static StringBuffer errorDetails = new StringBuffer(); + + private static final SimpleCache cache = new SimpleCache<>(node -> GROUP); + + private static final NettyGrpcServerRule XDS = new NettyGrpcServerRule() { + @Override + protected void configureServerBuilder(NettyServerBuilder builder) { + + final DiscoveryServerCallbacks callbacks = + new V3DeltaDiscoveryServerCallbacks(onStreamOpenLatch, onStreamRequestLatch, nonce, + errorDetails, resourceToNonceMap); + + Snapshot snapshot = V3TestSnapshots.createSnapshotNoEds(false, + true, + "upstream", + UPSTREAM.ipAddress(), + EchoContainer.PORT, + "listener0", + LISTENER_PORT, + "route0", + "1"); + LOGGER.info("snapshot={}", snapshot); + cache.setSnapshot( + GROUP, + snapshot + ); + + V3DiscoveryServer server = new V3DiscoveryServer(callbacks, cache); + + builder.addService(server.getRouteDiscoveryServiceImpl()); + builder.addService(server.getListenerDiscoveryServiceImpl()); + builder.addService(server.getEndpointDiscoveryServiceImpl()); + builder.addService(server.getClusterDiscoveryServiceImpl()); + builder.addService(server.getSecretDiscoveryServiceImpl()); + } + }; + + private static final Network NETWORK = Network.newNetwork(); + + private static final EnvoyContainer ENVOY = new EnvoyContainer(CONFIG, + () -> XDS.getServer().getPort()) + .withExposedPorts(LISTENER_PORT) + .withNetwork(NETWORK); + + private static final EchoContainer UPSTREAM = new EchoContainer() + .withNetwork(NETWORK) + .withNetworkAliases("upstream"); + + @ClassRule + public static final RuleChain RULES = RuleChain.outerRule(UPSTREAM) + .around(XDS) + .around(ENVOY); + + @Test + public void validateTestRequestToEchoServerViaEnvoy() throws InterruptedException { + assertThat(onStreamOpenLatch.await(15, TimeUnit.SECONDS)).isTrue() + .overridingErrorMessage("failed to open ADS stream"); + + assertThat(onStreamRequestLatch.await(15, TimeUnit.SECONDS)).isTrue() + .overridingErrorMessage("failed to receive ADS request"); + + // there is no onStreamResponseLatch because V3DiscoveryServer doesn't call the callbacks + // when responding to a delta request + + String baseUri = String + .format("http://%s:%d", ENVOY.getContainerIpAddress(), ENVOY.getMappedPort(LISTENER_PORT)); + + await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().untilAsserted( + () -> given().baseUri(baseUri).contentType(ContentType.TEXT) + .when().get("/") + .then().statusCode(200) + .and().body(containsString(UPSTREAM.response))); + + // we'll get three 0 nonces as this relates to the first version of resource state + // and check that errorDetails is empty + assertThat(errorDetails.toString()).isEqualTo(""); + assertThat(resourceToNonceMap.containsKey(V3.CLUSTER_TYPE_URL)).isTrue(); + assertThat(resourceToNonceMap.get(V3.CLUSTER_TYPE_URL).toString()).isEqualTo("0"); + assertThat(resourceToNonceMap.containsKey(V3.LISTENER_TYPE_URL)).isTrue(); + assertThat(resourceToNonceMap.get(V3.LISTENER_TYPE_URL).toString()).isEqualTo("0"); + assertThat(resourceToNonceMap.containsKey(V3.ROUTE_TYPE_URL)).isTrue(); + assertThat(resourceToNonceMap.get(V3.ROUTE_TYPE_URL).toString()).isEqualTo("0"); + + // now write a new snapshot, with the only change being an update + // to the listener name, wait for a few seconds for envoy to pick it up, and + // check that the nonce envoy most recently ACK'd is "1" + Snapshot snapshot = V3TestSnapshots.createSnapshotNoEds(false, + true, + "upstream", + UPSTREAM.ipAddress(), + EchoContainer.PORT, + "listener1", + LISTENER_PORT, + "route0", + "2"); + LOGGER.info("snapshot={}", snapshot); + cache.setSnapshot( + GROUP, + snapshot + ); + + // after the update, we've changed listener1, so will get a new nonce + await().atMost(3, TimeUnit.SECONDS).untilAsserted( + () -> { + assertThat(resourceToNonceMap.containsKey(V3.LISTENER_TYPE_URL)).isTrue(); + assertThat(resourceToNonceMap.get(V3.LISTENER_TYPE_URL).toString()).isEqualTo("01"); + assertThat(errorDetails.toString()).isEqualTo(""); + } + ); + + // now write a new snapshot, with no changes to the params we pass in + // but update version. This being a Delta request, this version doesn't + // really matter, and Envoy should not receive a spontaneous update + // because the hash of the resources will be the same + snapshot = V3TestSnapshots.createSnapshotNoEds(false, + true, + "upstream", + UPSTREAM.ipAddress(), + EchoContainer.PORT, + "listener1", + LISTENER_PORT, + "route0", + "2"); + LOGGER.info("snapshot={}", snapshot); + cache.setSnapshot( + GROUP, + snapshot + ); + + // delay polling by 2 seconds to check that no upsates are received + await().pollDelay(2, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).untilAsserted( + () -> { + assertThat(resourceToNonceMap.containsKey(V3.CLUSTER_TYPE_URL)).isTrue(); + assertThat(resourceToNonceMap.get(V3.CLUSTER_TYPE_URL).toString()).isEqualTo("0"); + assertThat(resourceToNonceMap.containsKey(V3.LISTENER_TYPE_URL)).isTrue(); + assertThat(resourceToNonceMap.get(V3.LISTENER_TYPE_URL).toString()).isEqualTo("01"); + assertThat(resourceToNonceMap.containsKey(V3.ROUTE_TYPE_URL)).isTrue(); + assertThat(resourceToNonceMap.get(V3.ROUTE_TYPE_URL).toString()).isEqualTo("0"); + assertThat(errorDetails.toString()).isEqualTo(""); + } + ); + } + + @AfterClass + public static void after() throws Exception { + ENVOY.close(); + UPSTREAM.close(); + NETWORK.close(); + } +} diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerXdsIT.java b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerXdsIT.java index 5c66f77bf..c6cb148c9 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerXdsIT.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V3DiscoveryServerXdsIT.java @@ -11,6 +11,7 @@ import io.restassured.http.ContentType; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.RuleChain; @@ -38,6 +39,7 @@ protected void configureServerBuilder(NettyServerBuilder builder) { cache.setSnapshot( GROUP, createSnapshotNoEds(false, + false, "upstream", "upstream", EchoContainer.PORT, @@ -90,4 +92,11 @@ public void validateTestRequestToEchoServerViaEnvoy() throws InterruptedExceptio .then().statusCode(200) .and().body(containsString(UPSTREAM.response))); } + + @After + public void after() throws Exception { + ENVOY.stop(); + UPSTREAM.stop(); + NETWORK.close(); + } } diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V3OnlyDiscoveryServerCallbacks.java b/server/src/test/java/io/envoyproxy/controlplane/server/V3OnlyDiscoveryServerCallbacks.java index 7bd9960c6..123088911 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/V3OnlyDiscoveryServerCallbacks.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V3OnlyDiscoveryServerCallbacks.java @@ -1,5 +1,6 @@ package io.envoyproxy.controlplane.server; +import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import java.util.concurrent.CountDownLatch; @@ -34,6 +35,12 @@ public void onV3StreamRequest(long streamId, DiscoveryRequest request) { onStreamRequestLatch.countDown(); } + @Override + public void onV3StreamDeltaRequest(long streamId, + DeltaDiscoveryRequest request) { + throw new IllegalStateException("Unexpected delta request"); + } + @Override public void onV3StreamResponse(long streamId, DiscoveryRequest request, DiscoveryResponse response) { diff --git a/server/src/test/java/io/envoyproxy/controlplane/server/V3TestSnapshots.java b/server/src/test/java/io/envoyproxy/controlplane/server/V3TestSnapshots.java index 390b96892..44980eea2 100644 --- a/server/src/test/java/io/envoyproxy/controlplane/server/V3TestSnapshots.java +++ b/server/src/test/java/io/envoyproxy/controlplane/server/V3TestSnapshots.java @@ -15,6 +15,7 @@ class V3TestSnapshots { static Snapshot createSnapshot( boolean ads, + boolean delta, String clusterName, String endpointAddress, int endpointPort, @@ -24,10 +25,10 @@ static Snapshot createSnapshot( String version) { Cluster cluster = TestResources.createCluster(clusterName); - ClusterLoadAssignment endpoint = - TestResources.createEndpoint(clusterName, endpointAddress, endpointPort); - Listener listener = - TestResources.createListener(ads, V3, V3, listenerName, listenerPort, routeName); + ClusterLoadAssignment + endpoint = TestResources.createEndpoint(clusterName, endpointAddress, endpointPort); + Listener listener = TestResources.createListener(ads, delta, V3, V3, listenerName, + listenerPort, routeName); RouteConfiguration route = TestResources.createRoute(routeName, clusterName); return Snapshot.create( @@ -41,6 +42,7 @@ static Snapshot createSnapshot( static Snapshot createSnapshotNoEds( boolean ads, + boolean delta, String clusterName, String endpointAddress, int endpointPort, @@ -48,21 +50,13 @@ static Snapshot createSnapshotNoEds( int listenerPort, String routeName, String version) { - return createSnapshotNoEds( - ads, - V3, - V3, - clusterName, - endpointAddress, - endpointPort, - listenerName, - listenerPort, - routeName, - version); + return createSnapshotNoEds(ads, delta, V3, V3, clusterName, endpointAddress, + endpointPort, listenerName, listenerPort, routeName, version); } private static Snapshot createSnapshotNoEds( boolean ads, + boolean delta, ApiVersion rdsTransportVersion, ApiVersion rdsResourceVersion, String clusterName, @@ -73,12 +67,11 @@ private static Snapshot createSnapshotNoEds( String routeName, String version) { - Cluster cluster = - TestResources.createCluster( - clusterName, endpointAddress, endpointPort, Cluster.DiscoveryType.STRICT_DNS); - Listener listener = - TestResources.createListener( - ads, rdsTransportVersion, rdsResourceVersion, listenerName, listenerPort, routeName); + Cluster cluster = TestResources.createCluster(clusterName, endpointAddress, + endpointPort, Cluster.DiscoveryType.STRICT_DNS); + Listener listener = TestResources + .createListener(ads, delta, rdsTransportVersion, rdsResourceVersion, + listenerName, listenerPort, routeName); RouteConfiguration route = TestResources.createRoute(routeName, clusterName); return Snapshot.create( diff --git a/server/src/test/resources/envoy/ads.v2.config.yaml b/server/src/test/resources/envoy/ads.v2.config.yaml deleted file mode 100644 index 1acd636f4..000000000 --- a/server/src/test/resources/envoy/ads.v2.config.yaml +++ /dev/null @@ -1,31 +0,0 @@ -admin: - access_log_path: /dev/null - address: - socket_address: { address: 0.0.0.0, port_value: 9901 } -dynamic_resources: - ads_config: - api_type: GRPC - grpc_services: - envoy_grpc: - cluster_name: ads_cluster - cds_config: - ads: {} - lds_config: - ads: {} -node: - cluster: test-cluster - id: test-id -static_resources: - clusters: - - connect_timeout: 1s - load_assignment: - cluster_name: ads_server - endpoints: - - lb_endpoints: - - endpoint: - address: - socket_address: - address: HOST_IP - port_value: HOST_PORT - http2_protocol_options: {} - name: ads_cluster diff --git a/server/src/test/resources/envoy/ads.v3.delta.config.yaml b/server/src/test/resources/envoy/ads.v3.delta.config.yaml new file mode 100644 index 000000000..cfced3abc --- /dev/null +++ b/server/src/test/resources/envoy/ads.v3.delta.config.yaml @@ -0,0 +1,35 @@ +admin: + access_log_path: /dev/null + address: + socket_address: { address: 0.0.0.0, port_value: 9901 } +dynamic_resources: + ads_config: + api_type: DELTA_GRPC + grpc_services: + envoy_grpc: + cluster_name: ads_cluster + transport_api_version: V3 + cds_config: + ads: { } + resource_api_version: V3 + lds_config: + ads: { } + resource_api_version: V3 +node: + cluster: test-cluster + id: test-id +static_resources: + clusters: + - connect_timeout: 1s + type: STRICT_DNS + load_assignment: + cluster_name: ads_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: host.docker.internal + port_value: HOST_PORT + http2_protocol_options: {} + name: ads_cluster diff --git a/server/src/test/resources/envoy/xds.v3.delta.config.yaml b/server/src/test/resources/envoy/xds.v3.delta.config.yaml new file mode 100644 index 000000000..95bf086b2 --- /dev/null +++ b/server/src/test/resources/envoy/xds.v3.delta.config.yaml @@ -0,0 +1,39 @@ +admin: + access_log_path: /dev/null + address: + socket_address: { address: 0.0.0.0, port_value: 9901 } +dynamic_resources: + cds_config: + api_config_source: + api_type: DELTA_GRPC + grpc_services: + envoy_grpc: + cluster_name: xds_cluster + transport_api_version: V3 + resource_api_version: V3 + lds_config: + api_config_source: + api_type: DELTA_GRPC + grpc_services: + envoy_grpc: + cluster_name: xds_cluster + transport_api_version: V3 + resource_api_version: V3 +node: + cluster: test-cluster + id: test-id +static_resources: + clusters: + - connect_timeout: 1s + type: STRICT_DNS + load_assignment: + cluster_name: xds_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: host.docker.internal + port_value: HOST_PORT + http2_protocol_options: {} + name: xds_cluster