diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index 22e9888540f1e..44696c2a676da 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -1407,7 +1407,7 @@ public interface CosmosContainerIdentityAccessor { public static final class CosmosContainerProactiveInitConfigHelper { private static final AtomicReference cosmosContainerProactiveInitConfigClassLoaded = new AtomicReference<>(false); - private static final AtomicReference accessor = new AtomicReference<>(); + private static final AtomicReference accessor = new AtomicReference<>(); private CosmosContainerProactiveInitConfigHelper() {} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java index ea52c758eb74f..cb9b9a02d476f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java @@ -101,113 +101,118 @@ public Flux submitOpenConnectionTasksAndInitCaches(CosmosContainerProactiv // some use path with stripped leading "/", // TODO: ideally it should have been consistent across return Flux.fromIterable(proactiveContainerInitConfig.getCosmosContainerIdentities()) - .publishOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC) - .flatMap(cosmosContainerIdentity -> - this - .collectionCache - .resolveByNameAsync( - null, - ImplementationBridgeHelpers - .CosmosContainerIdentityHelper - .getCosmosContainerIdentityAccessor() - .getContainerLink(cosmosContainerIdentity), - null) - .flatMapMany(collection -> { - if (collection == null) { - logger.warn("Can not find the collection, no connections will be opened"); - return Flux.empty(); - } + .publishOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC) + .flatMap(cosmosContainerIdentity -> { + return this + .collectionCache + .resolveByNameAsync( + null, + ImplementationBridgeHelpers + .CosmosContainerIdentityHelper + .getCosmosContainerIdentityAccessor() + .getContainerLink(cosmosContainerIdentity), + null) + .flatMapMany(collection -> { + if (collection == null) { + logger.warn("Can not find the collection, no connections will be opened"); + return Flux.empty(); + } - return this.routingMapProvider.tryGetOverlappingRangesAsync( - null, - collection.getResourceId(), - PartitionKeyInternalHelper.FullRange, - true, - null) - .flatMap(valueHolder -> { + return this.routingMapProvider.tryGetOverlappingRangesAsync( + null, + collection.getResourceId(), + PartitionKeyInternalHelper.FullRange, + true, + null) + .flatMap(valueHolder -> { - String containerLink = ImplementationBridgeHelpers - .CosmosContainerIdentityHelper - .getCosmosContainerIdentityAccessor() - .getContainerLink(cosmosContainerIdentity); + String containerLink = ImplementationBridgeHelpers + .CosmosContainerIdentityHelper + .getCosmosContainerIdentityAccessor() + .getContainerLink(cosmosContainerIdentity); - if (valueHolder == null || valueHolder.v == null || valueHolder.v.size() == 0) { - logger.warn( - "There is no pkRanges found for collection {}, no connections will be opened", - collection.getResourceId()); - return Mono.just(new ImmutablePair<>(containerLink, new ArrayList())); - } + if (valueHolder == null || valueHolder.v == null || valueHolder.v.size() == 0) { + logger.warn( + "There is no pkRanges found for collection {}, no connections will be opened", + collection.getResourceId()); + return Mono.just(new ImmutablePair<>(containerLink, new ArrayList())); + } - List pkrs = valueHolder.v - .stream() - .map(pkRange -> new PartitionKeyRangeIdentity(collection.getResourceId(), pkRange.getId())) - .collect(Collectors.toList()); + List pkrs = valueHolder.v + .stream() + .map(pkRange -> new PartitionKeyRangeIdentity(collection.getResourceId(), pkRange.getId())) + .collect(Collectors.toList()); - return Mono.just(new ImmutablePair>(containerLink, pkrs)); - }) - .flatMapMany(containerLinkToPkrs -> this.resolveAddressesPerCollection( - containerLinkToPkrs.left, - collection, - containerLinkToPkrs.right, - proactiveContainerInitConfig) - ).flatMap(collectionToAddresses -> { - ImmutablePair containerLinkToCollection - = collectionToAddresses.left; - AddressInformation addressInformation = - collectionToAddresses.right; + return Mono.just(new ImmutablePair>(containerLink, pkrs)); + }) + .flatMapMany(containerLinkToPkrs -> { + if (proactiveContainerInitConfig.getProactiveConnectionRegionsCount() > 0) { + return Flux.fromStream(this.endpointManager.getReadEndpoints().stream()) + .take(proactiveContainerInitConfig.getProactiveConnectionRegionsCount()) + .flatMap(readEndpoint -> { + if (this.addressCacheByEndpoint.containsKey(readEndpoint)) { + EndpointCache endpointCache = this.addressCacheByEndpoint.get(readEndpoint); + return this.resolveAddressesPerCollection( + endpointCache, + containerLinkToPkrs.left, + collection, + containerLinkToPkrs.right) + .flatMap(collectionToAddresses -> { + ImmutablePair containerLinkToCollection + = collectionToAddresses.left; + AddressInformation addressInformation = + collectionToAddresses.right; - Map containerLinkToMinConnectionsMap = ImplementationBridgeHelpers - .CosmosContainerProactiveInitConfigHelper - .getCosmosContainerIdentityAccessor() - .getContainerLinkToMinConnectionsMap(proactiveContainerInitConfig); + Map containerLinkToMinConnectionsMap = ImplementationBridgeHelpers + .CosmosContainerProactiveInitConfigHelper + .getCosmosContainerIdentityAccessor() + .getContainerLinkToMinConnectionsMap(proactiveContainerInitConfig); - int connectionsPerEndpointCountForContainer = containerLinkToMinConnectionsMap - .getOrDefault( + int connectionsPerEndpointCountForContainer = containerLinkToMinConnectionsMap + .getOrDefault( containerLinkToCollection.left, Configs.getMinConnectionPoolSizePerEndpoint() - ); + ); + + return this.submitOpenConnectionInternal( + endpointCache, + addressInformation, + containerLinkToCollection.getRight(), + connectionsPerEndpointCountForContainer).then(); // TODO: figure out return type + }); + } - return this.submitOpenConnectionInternal( - addressInformation, - containerLinkToCollection.getRight(), - connectionsPerEndpointCountForContainer) - .then(); // TODO: change to return response - }); - }), Configs.getCPUCnt(), Configs.getCPUCnt()); + return Flux.empty(); + }); + } + + return Flux.empty(); + }); + }); + }); } private Flux, AddressInformation>> resolveAddressesPerCollection( + EndpointCache endpointCache, String containerLink, DocumentCollection collection, - List partitionKeyRangeIdentities, - CosmosContainerProactiveInitConfig proactiveContainerInitConfig) { - if (proactiveContainerInitConfig.getProactiveConnectionRegionsCount() > 0) { - return Flux.fromStream(this.endpointManager.getReadEndpoints().stream()) - .take(proactiveContainerInitConfig.getProactiveConnectionRegionsCount()) - .flatMap(readEndpoint -> { - if (this.addressCacheByEndpoint.containsKey(readEndpoint)) { - return this.addressCacheByEndpoint.get(readEndpoint) - .addressCache - .resolveAddressesAndInitCaches( - containerLink, - collection, - partitionKeyRangeIdentities); - } - return Flux.empty(); - }, 1); - } - - return Flux.empty(); + List partitionKeyRangeIdentities) { + return endpointCache + .addressCache + .resolveAddressesAndInitCaches( + containerLink, + collection, + partitionKeyRangeIdentities + ); } private Mono submitOpenConnectionInternal( + EndpointCache endpointCache, AddressInformation address, DocumentCollection documentCollection, int connectionPerEndpointCount) { - EndpointCache endpointCacheCache = this.addressCacheByEndpoint.get(this.endpointManager.getReadEndpoints().stream().findFirst().get()); - - return endpointCacheCache.addressCache.submitOpenConnectionTask(address, documentCollection, connectionPerEndpointCount); + return endpointCache.addressCache.submitOpenConnectionTask(address, documentCollection, connectionPerEndpointCount); } @Override