diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index ce48948dd..980ac982c 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -83,6 +83,30 @@ const RETRY_CODES: {[key: string]: status} = { export const XDS_CONFIG_KEY = `${experimental.SUBCHANNEL_ARGS_EXCLUDE_KEY_PREFIX}.xds_config`; export const XDS_CLIENT_KEY = 'grpc.internal.xds_client'; +/** + * Tracks a dynamic subscription to a cluster that is currently or previously + * referenced in a RouteConfiguration. + */ +class ClusterRef { + private refCount = 0; + constructor(private unsubscribe: () => void) {} + + ref() { + this.refCount += 1; + } + + unref() { + this.refCount -= 1; + if (this.refCount <= 0) { + this.unsubscribe(); + } + } + + hasRef() { + return this.refCount > 0; + } +} + class XdsResolver implements Resolver { private listenerResourceName: string | null = null; @@ -93,6 +117,7 @@ class XdsResolver implements Resolver { private xdsConfigWatcher: XdsConfigWatcher; private xdsDependencyManager: XdsDependencyManager | null = null; + private clusterRefs: Map = new Map(); constructor( private target: GrpcUri, @@ -123,11 +148,20 @@ class XdsResolver implements Resolver { } } + private pruneUnusedClusters() { + for (const [cluster, clusterRef] of this.clusterRefs) { + if (!clusterRef.hasRef()) { + this.clusterRefs.delete(cluster); + } + } + } + private async handleXdsConfig(xdsConfig: XdsConfig) { /* We need to load the xxhash API before this function finishes, because * it is invoked in the config selector, which can be called immediately * after this function returns. */ await loadXxhashApi(); + this.pruneUnusedClusters(); const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, xdsConfig.listener.api_listener!.api_listener!.value); const configDefaultTimeout = httpConnectionManager.common_http_protocol_options?.idle_timeout; let defaultTimeout: Duration | undefined = undefined; @@ -312,44 +346,60 @@ class XdsResolver implements Resolver { const routeMatcher = getPredicateForMatcher(route.match!); matchList.push({matcher: routeMatcher, action: routeAction}); } - const configSelector: ConfigSelector = (methodName, metadata, channelId) => { - for (const {matcher, action} of matchList) { - if (matcher.apply(methodName, metadata)) { - const clusterResult = action.getCluster(); - const unrefCluster = this.xdsDependencyManager!.addClusterSubscription(clusterResult.name); - const onCommitted = () => { - unrefCluster(); - } - let hash: string; - if (EXPERIMENTAL_RING_HASH) { - hash = `${action.getHash(metadata, channelId)}`; - } else { - hash = ''; + for (const cluster of allConfigClusters) { + let clusterRef = this.clusterRefs.get(cluster); + if (!clusterRef) { + clusterRef = new ClusterRef(this.xdsDependencyManager!.addClusterSubscription(cluster)); + this.clusterRefs.set(cluster, clusterRef); + } + clusterRef.ref(); + } + const configSelector: ConfigSelector = { + invoke: (methodName, metadata, channelId) => { + for (const {matcher, action} of matchList) { + if (matcher.apply(methodName, metadata)) { + const clusterResult = action.getCluster(); + const clusterRef = this.clusterRefs.get(clusterResult.name)!; + clusterRef.ref(); + const onCommitted = () => { + clusterRef.unref(); + } + let hash: string; + if (EXPERIMENTAL_RING_HASH) { + hash = `${action.getHash(metadata, channelId)}`; + } else { + hash = ''; + } + return { + methodConfig: clusterResult.methodConfig, + onCommitted: onCommitted, + pickInformation: {cluster: clusterResult.name, hash: hash}, + status: status.OK, + dynamicFilterFactories: clusterResult.dynamicFilterFactories + }; } - return { - methodConfig: clusterResult.methodConfig, - onCommitted: onCommitted, - pickInformation: {cluster: clusterResult.name, hash: hash}, - status: status.OK, - dynamicFilterFactories: clusterResult.dynamicFilterFactories - }; + } + return { + methodConfig: {name: []}, + // These fields won't be used here, but they're set because of some TypeScript weirdness + pickInformation: {cluster: '', hash: ''}, + status: status.UNAVAILABLE, + dynamicFilterFactories: [] + }; + }, + unref: () => { + for (const cluster of allConfigClusters) { + this.clusterRefs.get(cluster)?.unref(); } } - return { - methodConfig: {name: []}, - // These fields won't be used here, but they're set because of some TypeScript weirdness - pickInformation: {cluster: '', hash: ''}, - status: status.UNAVAILABLE, - dynamicFilterFactories: [] - }; - }; + } trace('Created ConfigSelector with configuration:'); for (const {matcher, action} of matchList) { trace(matcher.toString()); trace('=> ' + action.toString()); } const clusterConfigMap: {[key: string]: {child_policy: LoadBalancingConfig[]}} = {}; - for (const clusterName of allConfigClusters) { + for (const clusterName of this.clusterRefs.keys()) { clusterConfigMap[clusterName] = {child_policy: [{cds: {cluster: clusterName}}]}; } const lbPolicyConfig = {xds_cluster_manager: {children: clusterConfigMap}}; diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index 94b76fa9b..624c0b268 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -377,6 +377,7 @@ export class InternalChannel { 'Address resolution succeeded' ); } + this.configSelector?.unref(); this.configSelector = configSelector; this.currentResolutionError = null; /* We process the queue asynchronously to ensure that the corresponding @@ -568,7 +569,7 @@ export class InternalChannel { if (this.configSelector) { return { type: 'SUCCESS', - config: this.configSelector(method, metadata, this.randomChannelId), + config: this.configSelector.invoke(method, metadata, this.randomChannelId), }; } else { if (this.currentResolutionError) { @@ -790,6 +791,8 @@ export class InternalChannel { } this.subchannelPool.unrefUnusedSubchannels(); + this.configSelector?.unref(); + this.configSelector = null; } getTarget() { diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index 1c84c0490..9cbcff591 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -37,7 +37,8 @@ export interface CallConfig { * https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md#new-functionality-in-grpc */ export interface ConfigSelector { - (methodName: string, metadata: Metadata, channelId: number): CallConfig; + invoke(methodName: string, metadata: Metadata, channelId: number): CallConfig; + unref(): void; } /** diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index d3eeb2369..694613483 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -103,43 +103,46 @@ function findMatchingConfig( function getDefaultConfigSelector( serviceConfig: ServiceConfig | null ): ConfigSelector { - return function defaultConfigSelector( - methodName: string, - metadata: Metadata - ) { - const splitName = methodName.split('/').filter(x => x.length > 0); - const service = splitName[0] ?? ''; - const method = splitName[1] ?? ''; - if (serviceConfig && serviceConfig.methodConfig) { - /* Check for the following in order, and return the first method - * config that matches: - * 1. A name that exactly matches the service and method - * 2. A name with no method set that matches the service - * 3. An empty name - */ - for (const matchLevel of NAME_MATCH_LEVEL_ORDER) { - const matchingConfig = findMatchingConfig( - service, - method, - serviceConfig.methodConfig, - matchLevel - ); - if (matchingConfig) { - return { - methodConfig: matchingConfig, - pickInformation: {}, - status: Status.OK, - dynamicFilterFactories: [], - }; + return { + invoke( + methodName: string, + metadata: Metadata + ) { + const splitName = methodName.split('/').filter(x => x.length > 0); + const service = splitName[0] ?? ''; + const method = splitName[1] ?? ''; + if (serviceConfig && serviceConfig.methodConfig) { + /* Check for the following in order, and return the first method + * config that matches: + * 1. A name that exactly matches the service and method + * 2. A name with no method set that matches the service + * 3. An empty name + */ + for (const matchLevel of NAME_MATCH_LEVEL_ORDER) { + const matchingConfig = findMatchingConfig( + service, + method, + serviceConfig.methodConfig, + matchLevel + ); + if (matchingConfig) { + return { + methodConfig: matchingConfig, + pickInformation: {}, + status: Status.OK, + dynamicFilterFactories: [], + }; + } } } - } - return { - methodConfig: { name: [] }, - pickInformation: {}, - status: Status.OK, - dynamicFilterFactories: [], - }; + return { + methodConfig: { name: [] }, + pickInformation: {}, + status: Status.OK, + dynamicFilterFactories: [], + }; + }, + unref() {} }; } @@ -298,6 +301,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { 'All load balancer options in service config are not compatible', metadata: new Metadata(), }); + configSelector?.unref(); return; } this.childLoadBalancer.updateAddressList(