From a32597f73f0417181dfe23d285165ede5717d4be Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Thu, 11 Feb 2021 14:49:40 -0800 Subject: [PATCH 1/2] Added few optimizations on ThroughputControl stack --- .../ThroughputControlStore.java | 23 ++-- .../ThroughputContainerController.java | 120 +++++++++++------- .../ThroughputControlContainerManager.java | 16 +-- .../ThroughputGroupGlobalController.java | 28 ++-- .../local/ThroughputGroupLocalController.java | 10 +- .../GlobalThroughputRequestController.java | 5 +- .../PkRangesThroughputRequestController.java | 6 +- 7 files changed, 117 insertions(+), 91 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlStore.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlStore.java index bb1cb0d764c3b..4769d1a706739 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlStore.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlStore.java @@ -204,20 +204,17 @@ private Mono createAndInitContainerController(St checkArgument(StringUtils.isNotEmpty(containerLink), "Container link should not be null or empty"); if (this.groupMapByContainer.containsKey(containerLink)) { - return Mono.just(this.groupMapByContainer.get(containerLink)) - .flatMap(groups -> { - ThroughputContainerController containerController = - new ThroughputContainerController( - this.connectionMode, - this.globalEndpointManager, - groups, - this.partitionKeyRangeCache); - - return containerController.init(); - }); + Set groups = + this.groupMapByContainer.get(containerLink); + ThroughputContainerController containerController = + new ThroughputContainerController( + this.connectionMode, + this.globalEndpointManager, + groups, + this.partitionKeyRangeCache); + return containerController.init(); } else { - return Mono.just(new EmptyThroughputContainerController()) - .flatMap(EmptyThroughputContainerController::init); + return Mono.just(new EmptyThroughputContainerController()); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/container/ThroughputContainerController.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/container/ThroughputContainerController.java index 785e189aac358..0598c27b2afd9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/container/ThroughputContainerController.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/container/ThroughputContainerController.java @@ -109,11 +109,11 @@ private ThroughputResolveLevel getThroughputResolveLevel(Set Mono init() { return this.resolveContainerMaxThroughput() - .flatMap(controller -> this.createAndInitializeGroupControllers()) - .doOnSuccess(controller -> { - Schedulers.parallel().schedule(() -> this.refreshContainerMaxThroughputTask(this.cancellationTokenSource.getToken()).subscribe()); - }) - .thenReturn((T) this); + .then(this.createAndInitializeGroupControllers()) + .then(Mono.fromRunnable(() -> { + this.refreshContainerMaxThroughputTask(this.cancellationTokenSource.getToken()).publishOn(Schedulers.parallel()).subscribe(); + })) + .thenReturn((T) this); } private Mono resolveDatabaseResourceId() { @@ -145,43 +145,73 @@ private Mono resolveContainerThroughput() { } private Mono resolveContainerMaxThroughput() { - return Mono.just(this.throughputResolveLevel) // TODO: ---> test whether it works without defer - .flatMap(throughputResolveLevel -> { - if (throughputResolveLevel == ThroughputResolveLevel.CONTAINER) { - return this.resolveContainerThroughput() - .onErrorResume(throwable -> { - if (this.isOfferNotConfiguredException(throwable)) { - this.throughputResolveLevel = ThroughputResolveLevel.DATABASE; - } - - return Mono.error(throwable); - }); - } else if (throughputResolveLevel == ThroughputResolveLevel.DATABASE) { - return this.resolveDatabaseThroughput() - .onErrorResume(throwable -> { - if (this.isOfferNotConfiguredException(throwable)) { - this.throughputResolveLevel = ThroughputResolveLevel.CONTAINER; - } - - return Mono.error(throwable); - }); - } - - // All the underlying throughput control groups are using target throughput, - // which is constant value, hence no need to resolve throughput - return Mono.empty(); - }) - .flatMap(throughputResponse -> { - this.updateMaxContainerThroughput(throughputResponse); - return Mono.empty(); - }) - .retryWhen( - // Throughput can be configured on database level or container level - // Retry at most 1 time so we can try on database and container both - RetrySpec.max(1).filter(throwable -> this.isOfferNotConfiguredException(throwable)) - ).thenReturn(this); + if (ThroughputResolveLevel.NONE.equals(throughputResolveLevel)) { + return Mono.empty(); + } + final Mono throughputResponseMono; + if (throughputResolveLevel == ThroughputResolveLevel.CONTAINER) { + throughputResponseMono = this.resolveContainerThroughput() + .onErrorResume(throwable -> { + if (this.isOfferNotConfiguredException(throwable)) { + this.throughputResolveLevel = ThroughputResolveLevel.DATABASE; + return this.resolveDatabaseThroughput(); + } + return Mono.error(throwable); + }); + } else { + throughputResponseMono = this.resolveDatabaseThroughput() + .onErrorResume(throwable -> { + if (this.isOfferNotConfiguredException(throwable)) { + this.throughputResolveLevel = ThroughputResolveLevel.CONTAINER; + return this.resolveContainerThroughput(); + } + return Mono.error(throwable); + }); + } + return throughputResponseMono.flatMap(throughputResponse -> { + this.updateMaxContainerThroughput(throughputResponse); + return Mono.just(this); + }); } +// private Mono resolveContainerMaxThroughput() { +// return Mono.just(this.throughputResolveLevel) // TODO: ---> test whether it works without defer +// .flatMap(throughputResolveLevel -> { +// if (throughputResolveLevel == ThroughputResolveLevel.CONTAINER) { +// return this.resolveContainerThroughput() +// .onErrorResume(throwable -> { +// if (this.isOfferNotConfiguredException(throwable)) { +// this.throughputResolveLevel = ThroughputResolveLevel.DATABASE; +// } +// +// return Mono.error(throwable); +// }); +// } else if (throughputResolveLevel == ThroughputResolveLevel.DATABASE) { +// return this.resolveDatabaseThroughput() +// .onErrorResume(throwable -> { +// if (this.isOfferNotConfiguredException(throwable)) { +// this.throughputResolveLevel = ThroughputResolveLevel.CONTAINER; +// } +// +// return Mono.error(throwable); +// }); +// } +// +// // All the underlying throughput control groups are using target throughput, +// // which is constant value, hence no need to resolve throughput +// return Mono.empty(); +// }) +// .flatMap(throughputResponse -> { +// this.updateMaxContainerThroughput(throughputResponse); +// return Mono.empty(); +// }) +// .retryWhen( +// // Throughput can be configured on database level or container level +// // Retry at most 1 time so we can try on database and container both +// RetrySpec.max(1).filter(throwable -> this.isOfferNotConfiguredException(throwable)) +// ).thenReturn(this); +// } + private Mono resolveThroughputByResourceId(String resourceId) { // Note: for serverless account, when we trying to query offers, // we will get 400/0 with error message: Reading or replacing offers is not supported for serverless accounts. @@ -298,15 +328,15 @@ private Mono createAndInitializeGroupController(T } - private Flux refreshContainerMaxThroughputTask(CancellationToken cancellationToken) { + private Mono refreshContainerMaxThroughputTask(CancellationToken cancellationToken) { checkNotNull(cancellationToken, "Cancellation token can not be null"); if (this.throughputResolveLevel == ThroughputResolveLevel.NONE) { - return Flux.empty(); + return Mono.empty(); } return Mono.delay(DEFAULT_THROUGHPUT_REFRESH_INTERVAL) - .flatMap(t -> this.resolveContainerMaxThroughput()) + .then(this.resolveContainerMaxThroughput()) .flatMapIterable(controller -> this.groups) .flatMap(group -> this.resolveThroughputGroupController(group)) .doOnNext(groupController -> groupController.onContainerMaxThroughputRefresh(this.maxContainerThroughput.get())) @@ -314,8 +344,8 @@ private Flux refreshContainerMaxThroughputTask(CancellationToken cancellat logger.warn("Refresh throughput failed with reason %s", throwable); return Mono.empty(); }) - .then() - .repeat(() -> !cancellationToken.isCancellationRequested()); + .repeat(() -> !cancellationToken.isCancellationRequested()) + .then(); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/global/ThroughputControlContainerManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/global/ThroughputControlContainerManager.java index 6d9381b0cd5fa..f90ba27cc5090 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/global/ThroughputControlContainerManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/global/ThroughputControlContainerManager.java @@ -8,6 +8,7 @@ import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.throughputControl.config.ThroughputGlobalControlGroup; +import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.PartitionKey; import org.slf4j.Logger; @@ -106,9 +107,8 @@ public Mono getOrCreateConfigItem() { if (!expectedConfigItem.equals(configItem)) { logger.warn( - "Group config using by this client is different than the one in control container, will be ignored. Using following config: {}" + - "targetThroughput: {}, targetThroughputThreshold: {}", - this.configItem.toString()); + "Group config using by this client is different than the one in control container, will be ignored. " + + "Using following config: {}", this.configItem); } return Mono.just(this.configItem); @@ -166,8 +166,8 @@ public Mono replaceOrCreateGroupClientItem(do */ public Mono validateControlContainer() { return this.globalControlContainer.read() - .map(containerResponse -> containerResponse.getProperties()) - .flatMap(containerProperties -> { + .flatMap(containerResponse -> { + CosmosContainerProperties containerProperties = containerResponse.getProperties(); boolean isPartitioned = containerProperties.getPartitionKeyDefinition() != null && containerProperties.getPartitionKeyDefinition().getPaths() != null && @@ -177,9 +177,7 @@ public Mono validateControlContainer() { || !containerProperties.getPartitionKeyDefinition().getPaths().get(0).equals(PARTITION_KEY_PATH))) { return Mono.error(new IllegalArgumentException("The control container must have partition key equal to " + PARTITION_KEY_PATH)); } - - return Mono.empty(); - }) - .thenReturn(this); + return Mono.just(this); + }); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/global/ThroughputGroupGlobalController.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/global/ThroughputGroupGlobalController.java index 976b5b9d57b9a..3cc53a3f82dea 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/global/ThroughputGroupGlobalController.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/global/ThroughputGroupGlobalController.java @@ -53,19 +53,19 @@ public ThroughputGroupGlobalController( @Override @SuppressWarnings("unchecked") public Mono init() { - return this.containerManager.validateControlContainer() - .flatMap(dummy -> this.containerManager.getOrCreateConfigItem()) - .flatMap(dummy -> { - double loadFactor = this.calculateLoadFactor(); - return this.containerManager.createGroupClientItem(loadFactor) - .flatMap(clientItem -> this.calculateClientThroughputShare(loadFactor)); - }) - .flatMap(dummy -> this.resolveRequestController()) - .doOnSuccess(dummy -> { + return this.containerManager + .validateControlContainer() + .then(this.containerManager.getOrCreateConfigItem()) + .then(Mono.just(this.calculateLoadFactor())) + .flatMap(loadFactor -> this.containerManager + .createGroupClientItem(loadFactor) + .flatMap(clientItem -> this.calculateClientThroughputShare(loadFactor))) + .then(this.resolveRequestController()) + .then(Mono.fromRunnable(() -> { this.throughputUsageCycleRenewTask(this.cancellationTokenSource.getToken()).publishOn(Schedulers.parallel()).subscribe(); this.calculateClientThroughputShareTask(this.cancellationTokenSource.getToken()).publishOn(Schedulers.parallel()).subscribe(); - }) - .thenReturn((T)this); + })) + .thenReturn((T) this); } @Override @@ -82,8 +82,10 @@ public void recordThroughputUsage(double throughputUsage) { private Mono calculateClientThroughputShare(double loadFactor) { return this.containerManager.queryLoadFactorFromAllClients() - .doOnSuccess(totalLoads -> this.clientThroughputShare.set(loadFactor / totalLoads)) - .thenReturn(this); + .flatMap(totalLoads -> { + this.clientThroughputShare.set(loadFactor / totalLoads); + return Mono.just(this); + }); } private double calculateLoadFactor() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/local/ThroughputGroupLocalController.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/local/ThroughputGroupLocalController.java index 6ee815589eb99..f7a5fa2a96742 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/local/ThroughputGroupLocalController.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/local/ThroughputGroupLocalController.java @@ -29,10 +29,10 @@ public ThroughputGroupLocalController( @SuppressWarnings("unchecked") public Mono init() { return this.resolveRequestController() - .doOnSuccess(dummy -> { - this.throughputUsageCycleRenewTask(this.cancellationTokenSource.getToken()).publishOn(Schedulers.parallel()).subscribe(); - }) - .thenReturn((T)this); + .then(Mono.fromRunnable(() -> { + this.throughputUsageCycleRenewTask(this.cancellationTokenSource.getToken()).publishOn(Schedulers.parallel()).subscribe(); + })) + .thenReturn((T) this); } @Override @@ -42,6 +42,6 @@ public double getClientThroughputShare() { @Override public void recordThroughputUsage(double loadFactor) { - return; + // No-op } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/GlobalThroughputRequestController.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/GlobalThroughputRequestController.java index c18a617222e89..de637b316df1e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/GlobalThroughputRequestController.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/GlobalThroughputRequestController.java @@ -35,9 +35,8 @@ public Mono init() { return Flux.fromIterable(this.globalEndpointManager.getReadEndpoints()) .flatMap(endpoint -> { requestThrottlerMapByRegion.computeIfAbsent(endpoint, key -> new ThroughputRequestThrottler(this.scheduledThroughput.get())); - return Mono.empty(); - }) - .then(Mono.just((T)this)); + return Mono.just((T)this); + }).single(); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/PkRangesThroughputRequestController.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/PkRangesThroughputRequestController.java index 68de324bbf14d..04f31322d8eb0 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/PkRangesThroughputRequestController.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/PkRangesThroughputRequestController.java @@ -93,11 +93,11 @@ public Mono close() { @SuppressWarnings("unchecked") public Mono init() { return this.getPartitionKeyRanges(RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES) - .doOnSuccess(pkRanges -> { + .flatMap(pkRanges -> { this.pkRanges = pkRanges; this.createRequestThrottlers(); - }) - .then(Mono.just((T)this)); + return Mono.just((T)this); + }); } private void createRequestThrottlers() { From 895a5c3bb5fe237a40690debde3894f73bd2dc43 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Thu, 11 Feb 2021 18:13:53 -0800 Subject: [PATCH 2/2] Adding few optimizations and fixed some typos --- .../ThroughputControlStore.java | 43 ++++++++-------- .../ThroughputContainerController.java | 38 -------------- .../group/ThroughputGroupControllerBase.java | 49 ++++++++++--------- .../GlobalThroughputRequestController.java | 10 ++-- .../PkRangesThroughputRequestController.java | 2 +- 5 files changed, 53 insertions(+), 89 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlStore.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlStore.java index 4769d1a706739..f4138bd063d47 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlStore.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/ThroughputControlStore.java @@ -166,27 +166,30 @@ private Mono updateControllerAndRetry( return this.shouldRefreshContainerController(collectionLink, request) .flatMap(shouldRefresh -> { - if (shouldRefresh) { - currentContainerController.close().subscribeOn(Schedulers.parallel()).subscribe(); - this.containerControllerCache.refresh(collectionLink, () -> this.createAndInitContainerController(collectionLink)); - return this.resolveContainerController(collectionLink) - .flatMap(updatedContainerController -> { - if (updatedContainerController.canHandleRequest(request)) { - return updatedContainerController.processRequest(request, originalRequestMono) - .doOnError(throwable -> this.handleException(request, updatedContainerController, throwable)); - } else { - // still can not handle the request - logger.warn( - "Can not find container controller to process request {} with collectionRid {} ", - request.getActivityId(), - request.requestContext.resolvedCollectionRid); - - return originalRequestMono; - } - }); + if (!shouldRefresh) { + return originalRequestMono; + } else { + return Mono.fromRunnable(() -> { + currentContainerController.close().subscribeOn(Schedulers.parallel()).subscribe(); + }).then(Mono.defer(() -> { + this.containerControllerCache.refresh(collectionLink, () -> this.createAndInitContainerController(collectionLink)); + return this.resolveContainerController(collectionLink) + .flatMap(updatedContainerController -> { + if (updatedContainerController.canHandleRequest(request)) { + return updatedContainerController.processRequest(request, originalRequestMono) + .doOnError(throwable -> this.handleException(request, updatedContainerController, throwable)); + } else { + // still can not handle the request + logger.warn( + "Can not find container controller to process request {} with collectionRid {} ", + request.getActivityId(), + request.requestContext.resolvedCollectionRid); + + return originalRequestMono; + } + }); + })); } - - return originalRequestMono; }); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/container/ThroughputContainerController.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/container/ThroughputContainerController.java index 0598c27b2afd9..4c3e3e5eb7d10 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/container/ThroughputContainerController.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/container/ThroughputContainerController.java @@ -174,44 +174,6 @@ private Mono resolveContainerMaxThroughput() { }); } -// private Mono resolveContainerMaxThroughput() { -// return Mono.just(this.throughputResolveLevel) // TODO: ---> test whether it works without defer -// .flatMap(throughputResolveLevel -> { -// if (throughputResolveLevel == ThroughputResolveLevel.CONTAINER) { -// return this.resolveContainerThroughput() -// .onErrorResume(throwable -> { -// if (this.isOfferNotConfiguredException(throwable)) { -// this.throughputResolveLevel = ThroughputResolveLevel.DATABASE; -// } -// -// return Mono.error(throwable); -// }); -// } else if (throughputResolveLevel == ThroughputResolveLevel.DATABASE) { -// return this.resolveDatabaseThroughput() -// .onErrorResume(throwable -> { -// if (this.isOfferNotConfiguredException(throwable)) { -// this.throughputResolveLevel = ThroughputResolveLevel.CONTAINER; -// } -// -// return Mono.error(throwable); -// }); -// } -// -// // All the underlying throughput control groups are using target throughput, -// // which is constant value, hence no need to resolve throughput -// return Mono.empty(); -// }) -// .flatMap(throughputResponse -> { -// this.updateMaxContainerThroughput(throughputResponse); -// return Mono.empty(); -// }) -// .retryWhen( -// // Throughput can be configured on database level or container level -// // Retry at most 1 time so we can try on database and container both -// RetrySpec.max(1).filter(throwable -> this.isOfferNotConfiguredException(throwable)) -// ).thenReturn(this); -// } - private Mono resolveThroughputByResourceId(String resourceId) { // Note: for serverless account, when we trying to query offers, // we will get 400/0 with error message: Reading or replacing offers is not supported for serverless accounts. diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/ThroughputGroupControllerBase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/ThroughputGroupControllerBase.java index 4dd28138d35f6..cb8b3588dfa05 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/ThroughputGroupControllerBase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/group/ThroughputGroupControllerBase.java @@ -182,26 +182,29 @@ private Mono updateControllerAndRetry( return this.shouldUpdateRequestController(request) .flatMap(shouldUpdate -> { - if (shouldUpdate) { - currentRequestController.close().subscribeOn(Schedulers.parallel()).subscribe(); - this.refreshRequestController(); - return this.resolveRequestController() - .flatMap(updatedController -> { - if (updatedController.canHandleRequest(request)) { - return updatedController.processRequest(request, nextRequestMono) - .doOnError(throwable -> this.handleException(throwable)); - } else { - // If we reach here and still can not handle the request, it should mean the request has staled info - // and the request will fail by server - logger.warn( - "Can not find request controller to handle request {} with pkRangeId {}", - request.getActivityId(), - request.requestContext.resolvedPartitionKeyRange.getId()); - return nextRequestMono; - } - }); - } else { + if (!shouldUpdate) { return nextRequestMono; + } else { + return Mono.fromRunnable(() -> { + currentRequestController.close().subscribeOn(Schedulers.parallel()).subscribe(); + }).then(Mono.defer(() -> { + this.refreshRequestController(); + return this.resolveRequestController() + .flatMap(updatedController -> { + if (updatedController.canHandleRequest(request)) { + return updatedController.processRequest(request, nextRequestMono) + .doOnError(throwable -> this.handleException(throwable)); + } else { + // If we reach here and still can not handle the request, it should mean the request has staled info + // and the request will fail by server + logger.warn( + "Can not find request controller to handle request {} with pkRangeId {}", + request.getActivityId(), + request.requestContext.resolvedPartitionKeyRange.getId()); + return nextRequestMono; + } + }); + })); } }); } @@ -209,13 +212,13 @@ private Mono updateControllerAndRetry( private Mono shouldUpdateRequestController(RxDocumentServiceRequest request) { return this.partitionKeyRangeCache.tryGetRangeByPartitionKeyRangeId( null, request.requestContext.resolvedCollectionRid, request.requestContext.resolvedPartitionKeyRange.getId(), null) - .map(pkRangeHolder -> pkRangeHolder.v) - .flatMap(pkRange -> { - if (pkRange == null) { + .flatMap(pkRangeHolder -> { + if (pkRangeHolder.v == null) { return Mono.just(Boolean.FALSE); } else { return Mono.just(Boolean.TRUE); - }}); + } + }); } protected Mono resolveRequestController() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/GlobalThroughputRequestController.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/GlobalThroughputRequestController.java index de637b316df1e..c10b7285f1c8b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/GlobalThroughputRequestController.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/GlobalThroughputRequestController.java @@ -46,13 +46,9 @@ public boolean canHandleRequest(RxDocumentServiceRequest request) { @Override public Mono processRequest(RxDocumentServiceRequest request, Mono nextRequestMono) { - return Mono.defer( - () -> Mono.just( - this.requestThrottlerMapByRegion.computeIfAbsent( - this.globalEndpointManager.resolveServiceEndpoint(request), - key -> new ThroughputRequestThrottler(this.scheduledThroughput.get()))) - ) - .flatMap(requestThrottler -> requestThrottler.processRequest(request, nextRequestMono)); + return this.requestThrottlerMapByRegion + .computeIfAbsent(this.globalEndpointManager.resolveServiceEndpoint(request), + key -> new ThroughputRequestThrottler(this.scheduledThroughput.get())).processRequest(request, nextRequestMono); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/PkRangesThroughputRequestController.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/PkRangesThroughputRequestController.java index 04f31322d8eb0..2b124205119ff 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/PkRangesThroughputRequestController.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/throughputControl/controller/request/PkRangesThroughputRequestController.java @@ -128,7 +128,7 @@ public Mono processRequest(RxDocumentServiceRequest request, Mono next // If we reach here, it means we should find the mapping pkRange ThroughputRequestThrottler requestThrottler = this.getOrCreateRegionRequestThrottlers(this.globalEndpointManager.resolveServiceEndpoint(request)) - .get(resolvedPkRange); + .get(resolvedPkRange.getId()); if (requestThrottler != null) { return requestThrottler.processRequest(request, nextRequestMono);