Skip to content

Commit

Permalink
Merge pull request #4 from xinlian12/ProactiveConnectionManagementFor…
Browse files Browse the repository at this point in the history
…BrokenConnections

Proactive connection management for broken connections
  • Loading branch information
jeet1995 authored Apr 19, 2023
2 parents a286a23 + 44a7c7d commit cc9d003
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,7 @@ public interface CosmosContainerIdentityAccessor {
public static final class CosmosContainerProactiveInitConfigHelper {

private static final AtomicReference<Boolean> cosmosContainerProactiveInitConfigClassLoaded = new AtomicReference<>(false);
private static final AtomicReference<CosmosContainerProactiveInitConfigHelper.CosmosContainerProactiveInitConfigAccessor> accessor = new AtomicReference<>();
private static final AtomicReference<CosmosContainerProactiveInitConfigAccessor> accessor = new AtomicReference<>();

private CosmosContainerProactiveInitConfigHelper() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,113 +101,118 @@ public Flux<Void> submitOpenConnectionTasksAndInitCaches(CosmosContainerProactiv
// some use path with stripped leading "/",
// TODO: ideally it should have been consistent across
return Flux.fromIterable(proactiveContainerInitConfig.getCosmosContainerIdentities())
.publishOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC)
.flatMap(cosmosContainerIdentity ->
this
.collectionCache
.resolveByNameAsync(
null,
ImplementationBridgeHelpers
.CosmosContainerIdentityHelper
.getCosmosContainerIdentityAccessor()
.getContainerLink(cosmosContainerIdentity),
null)
.flatMapMany(collection -> {
if (collection == null) {
logger.warn("Can not find the collection, no connections will be opened");
return Flux.empty();
}
.publishOn(CosmosSchedulers.OPEN_CONNECTIONS_BOUNDED_ELASTIC)
.flatMap(cosmosContainerIdentity -> {
return this
.collectionCache
.resolveByNameAsync(
null,
ImplementationBridgeHelpers
.CosmosContainerIdentityHelper
.getCosmosContainerIdentityAccessor()
.getContainerLink(cosmosContainerIdentity),
null)
.flatMapMany(collection -> {
if (collection == null) {
logger.warn("Can not find the collection, no connections will be opened");
return Flux.empty();
}

return this.routingMapProvider.tryGetOverlappingRangesAsync(
null,
collection.getResourceId(),
PartitionKeyInternalHelper.FullRange,
true,
null)
.flatMap(valueHolder -> {
return this.routingMapProvider.tryGetOverlappingRangesAsync(
null,
collection.getResourceId(),
PartitionKeyInternalHelper.FullRange,
true,
null)
.flatMap(valueHolder -> {

String containerLink = ImplementationBridgeHelpers
.CosmosContainerIdentityHelper
.getCosmosContainerIdentityAccessor()
.getContainerLink(cosmosContainerIdentity);
String containerLink = ImplementationBridgeHelpers
.CosmosContainerIdentityHelper
.getCosmosContainerIdentityAccessor()
.getContainerLink(cosmosContainerIdentity);

if (valueHolder == null || valueHolder.v == null || valueHolder.v.size() == 0) {
logger.warn(
"There is no pkRanges found for collection {}, no connections will be opened",
collection.getResourceId());
return Mono.just(new ImmutablePair<>(containerLink, new ArrayList<PartitionKeyRangeIdentity>()));
}
if (valueHolder == null || valueHolder.v == null || valueHolder.v.size() == 0) {
logger.warn(
"There is no pkRanges found for collection {}, no connections will be opened",
collection.getResourceId());
return Mono.just(new ImmutablePair<>(containerLink, new ArrayList<PartitionKeyRangeIdentity>()));
}

List<PartitionKeyRangeIdentity> pkrs = valueHolder.v
.stream()
.map(pkRange -> new PartitionKeyRangeIdentity(collection.getResourceId(), pkRange.getId()))
.collect(Collectors.toList());
List<PartitionKeyRangeIdentity> pkrs = valueHolder.v
.stream()
.map(pkRange -> new PartitionKeyRangeIdentity(collection.getResourceId(), pkRange.getId()))
.collect(Collectors.toList());

return Mono.just(new ImmutablePair<String, List<PartitionKeyRangeIdentity>>(containerLink, pkrs));
})
.flatMapMany(containerLinkToPkrs -> this.resolveAddressesPerCollection(
containerLinkToPkrs.left,
collection,
containerLinkToPkrs.right,
proactiveContainerInitConfig)
).flatMap(collectionToAddresses -> {
ImmutablePair<String, DocumentCollection> containerLinkToCollection
= collectionToAddresses.left;
AddressInformation addressInformation =
collectionToAddresses.right;
return Mono.just(new ImmutablePair<String, List<PartitionKeyRangeIdentity>>(containerLink, pkrs));
})
.flatMapMany(containerLinkToPkrs -> {
if (proactiveContainerInitConfig.getProactiveConnectionRegionsCount() > 0) {
return Flux.fromStream(this.endpointManager.getReadEndpoints().stream())
.take(proactiveContainerInitConfig.getProactiveConnectionRegionsCount())
.flatMap(readEndpoint -> {
if (this.addressCacheByEndpoint.containsKey(readEndpoint)) {
EndpointCache endpointCache = this.addressCacheByEndpoint.get(readEndpoint);
return this.resolveAddressesPerCollection(
endpointCache,
containerLinkToPkrs.left,
collection,
containerLinkToPkrs.right)
.flatMap(collectionToAddresses -> {
ImmutablePair<String, DocumentCollection> containerLinkToCollection
= collectionToAddresses.left;
AddressInformation addressInformation =
collectionToAddresses.right;

Map<String, Integer> containerLinkToMinConnectionsMap = ImplementationBridgeHelpers
.CosmosContainerProactiveInitConfigHelper
.getCosmosContainerIdentityAccessor()
.getContainerLinkToMinConnectionsMap(proactiveContainerInitConfig);
Map<String, Integer> containerLinkToMinConnectionsMap = ImplementationBridgeHelpers
.CosmosContainerProactiveInitConfigHelper
.getCosmosContainerIdentityAccessor()
.getContainerLinkToMinConnectionsMap(proactiveContainerInitConfig);

int connectionsPerEndpointCountForContainer = containerLinkToMinConnectionsMap
.getOrDefault(
int connectionsPerEndpointCountForContainer = containerLinkToMinConnectionsMap
.getOrDefault(
containerLinkToCollection.left,
Configs.getMinConnectionPoolSizePerEndpoint()
);
);

return this.submitOpenConnectionInternal(
endpointCache,
addressInformation,
containerLinkToCollection.getRight(),
connectionsPerEndpointCountForContainer).then(); // TODO: figure out return type
});
}

return this.submitOpenConnectionInternal(
addressInformation,
containerLinkToCollection.getRight(),
connectionsPerEndpointCountForContainer)
.then(); // TODO: change to return response
});
}), Configs.getCPUCnt(), Configs.getCPUCnt());
return Flux.empty();
});
}

return Flux.empty();
});
});
});
}

private Flux<ImmutablePair<ImmutablePair<String, DocumentCollection>, AddressInformation>> resolveAddressesPerCollection(
EndpointCache endpointCache,
String containerLink,
DocumentCollection collection,
List<PartitionKeyRangeIdentity> partitionKeyRangeIdentities,
CosmosContainerProactiveInitConfig proactiveContainerInitConfig) {
if (proactiveContainerInitConfig.getProactiveConnectionRegionsCount() > 0) {
return Flux.fromStream(this.endpointManager.getReadEndpoints().stream())
.take(proactiveContainerInitConfig.getProactiveConnectionRegionsCount())
.flatMap(readEndpoint -> {
if (this.addressCacheByEndpoint.containsKey(readEndpoint)) {
return this.addressCacheByEndpoint.get(readEndpoint)
.addressCache
.resolveAddressesAndInitCaches(
containerLink,
collection,
partitionKeyRangeIdentities);
}
return Flux.empty();
}, 1);
}

return Flux.empty();
List<PartitionKeyRangeIdentity> partitionKeyRangeIdentities) {
return endpointCache
.addressCache
.resolveAddressesAndInitCaches(
containerLink,
collection,
partitionKeyRangeIdentities
);
}

private Mono<OpenConnectionResponse> submitOpenConnectionInternal(
EndpointCache endpointCache,
AddressInformation address,
DocumentCollection documentCollection,
int connectionPerEndpointCount) {

EndpointCache endpointCacheCache = this.addressCacheByEndpoint.get(this.endpointManager.getReadEndpoints().stream().findFirst().get());

return endpointCacheCache.addressCache.submitOpenConnectionTask(address, documentCollection, connectionPerEndpointCount);
return endpointCache.addressCache.submitOpenConnectionTask(address, documentCollection, connectionPerEndpointCount);
}

@Override
Expand Down

0 comments on commit cc9d003

Please sign in to comment.