Skip to content

Commit

Permalink
Merge pull request #1 from xinlian12/excludeRegion
Browse files Browse the repository at this point in the history
ExcludeRegions
  • Loading branch information
mbhaskar authored Jun 19, 2023
2 parents e7cbded + 74dab78 commit 69896bd
Show file tree
Hide file tree
Showing 5 changed files with 350 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos;

import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.DatabaseAccount;
import com.azure.cosmos.implementation.DatabaseAccountLocation;
import com.azure.cosmos.implementation.GlobalEndpointManager;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.RxDocumentClientImpl;
import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.implementation.throughputControl.TestItem;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosPatchItemRequestOptions;
import com.azure.cosmos.models.CosmosPatchOperations;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.rx.TestSuiteBase;
import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper;
import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder;
import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType;
import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders;
import com.azure.cosmos.test.faultinjection.FaultInjectionRule;
import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder;
import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

public class ExcludeRegionTests extends TestSuiteBase {
private static final int TIMEOUT = 60000;
private CosmosAsyncClient clientWithPreferredRegions;
private CosmosAsyncContainer cosmosAsyncContainer;
private List<String> preferredRegionList;

@BeforeClass(groups = {"multi-master"}, timeOut = SETUP_TIMEOUT)
public void beforeClass() {
CosmosAsyncClient dummyClient = null;
try {
dummyClient = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.contentResponseOnWriteEnabled(true)
.directMode()
.buildAsyncClient();

this.preferredRegionList = this.getPreferredRegionList(dummyClient);
this.clientWithPreferredRegions =
new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.contentResponseOnWriteEnabled(true)
.preferredRegions(this.preferredRegionList)
.buildAsyncClient();

this.cosmosAsyncContainer = getSharedSinglePartitionCosmosContainer(this.clientWithPreferredRegions);
} finally {
safeClose(dummyClient);
}
}

@AfterClass(groups = {"multi-master"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
public void afterClass() {
safeClose(this.clientWithPreferredRegions);
}

@DataProvider(name = "operationTypeArgProvider")
public static Object[][] operationTypeArgProvider() {
return new Object[][]{
{ OperationType.Read },
{ OperationType.Replace },
{ OperationType.Create },
{ OperationType.Delete },
{ OperationType.Query },
{ OperationType.Patch }
};
}

@DataProvider(name = "faultInjectionArgProvider")
public static Object[][] faultInjectionArgProvider() {
return new Object[][]{
{ OperationType.Read, FaultInjectionOperationType.READ_ITEM },
{ OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM },
{ OperationType.Create, FaultInjectionOperationType.CREATE_ITEM },
{ OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM },
{ OperationType.Query, FaultInjectionOperationType.QUERY_ITEM },
{ OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM }
};
}

@Test(groups = {"multi-master"}, dataProvider = "operationTypeArgProvider", timeOut = TIMEOUT)
public void excludeRegionTest_SkipFirstPreferredRegion(OperationType operationType) {
TestItem createdItem = TestItem.createNewItem();
this.cosmosAsyncContainer.createItem(createdItem).block();

CosmosDiagnostics cosmosDiagnostics = this.performDocumentOperation(cosmosAsyncContainer, operationType, createdItem, null);
assertThat(cosmosDiagnostics.getContactedRegionNames().size()).isEqualTo(1);
assertThat(cosmosDiagnostics.getContactedRegionNames()).containsAll(this.preferredRegionList.subList(0, 1));

// now exclude the first preferred region
cosmosDiagnostics =
this.performDocumentOperation(
cosmosAsyncContainer,
operationType,
createdItem,
Arrays.asList(this.preferredRegionList.get(0)));

assertThat(cosmosDiagnostics.getContactedRegionNames().size()).isEqualTo(1);
assertThat(cosmosDiagnostics.getContactedRegionNames()).containsAll(this.preferredRegionList.subList(1, 2));
}

@Test(groups = {"multi-master"}, dataProvider = "faultInjectionArgProvider", timeOut = TIMEOUT)
public void excludeRegionTest_readSessionNotAvailable(
OperationType operationType,
FaultInjectionOperationType faultInjectionOperationType) {
TestItem createdItem = TestItem.createNewItem();
this.cosmosAsyncContainer.createItem(createdItem).block();

// TODO: reduce the max wait time
// Configure 404/1002 fault injection rule, validate the request should succeed in other region
FaultInjectionRule serverErrorRule = new FaultInjectionRuleBuilder("excludeRegionTest-" + operationType)
.condition(
new FaultInjectionConditionBuilder()
.region(this.preferredRegionList.get(0))
.operationType(faultInjectionOperationType)
.build())
.result(
FaultInjectionResultBuilders
.getResultBuilder(FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE)
.build()
).build();

CosmosFaultInjectionHelper.configureFaultInjectionRules(this.cosmosAsyncContainer, Arrays.asList(serverErrorRule)).block();
try {
CosmosDiagnostics cosmosDiagnostics = this.performDocumentOperation(cosmosAsyncContainer, operationType, createdItem, null);
assertThat(cosmosDiagnostics.getContactedRegionNames().size()).isEqualTo(2);
assertThat(cosmosDiagnostics.getContactedRegionNames().containsAll(this.preferredRegionList.subList(0, 2)));
} catch (CosmosException e) {
fail("Request should succeeded in other regions");
}

// now exclude all the other regions except the first preferred region
try {
CosmosDiagnostics cosmosDiagnostics = this.performDocumentOperation(
cosmosAsyncContainer,
operationType,
createdItem,
this.preferredRegionList.subList(1, this.preferredRegionList.size()));

fail("Request should have failed");
} catch (CosmosException exception) {
CosmosDiagnostics cosmosDiagnostics = exception.getDiagnostics();
assertThat(cosmosDiagnostics.getContactedRegionNames().size()).isEqualTo(1);
assertThat(cosmosDiagnostics.getContactedRegionNames().containsAll(this.preferredRegionList.subList(0, 1)));
}
}

private List<String> getPreferredRegionList(CosmosAsyncClient client) {
assertThat(client).isNotNull();

AsyncDocumentClient asyncDocumentClient = ReflectionUtils.getAsyncDocumentClient(client);
RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) asyncDocumentClient;
GlobalEndpointManager globalEndpointManager =
ReflectionUtils.getGlobalEndpointManager(rxDocumentClient);
DatabaseAccount databaseAccount = globalEndpointManager.getLatestDatabaseAccount();

Iterator<DatabaseAccountLocation> locationIterator = databaseAccount.getWritableLocations().iterator();
List<String> preferredRegionList = new ArrayList<>();

while (locationIterator.hasNext()) {
DatabaseAccountLocation accountLocation = locationIterator.next();
preferredRegionList.add(accountLocation.getName());
}

return preferredRegionList;
}

private CosmosDiagnostics performDocumentOperation(
CosmosAsyncContainer cosmosAsyncContainer,
OperationType operationType,
TestItem createdItem,
List<String> excludeRegions) {
if (operationType == OperationType.Query) {
CosmosQueryRequestOptions queryRequestOptions = new CosmosQueryRequestOptions();
String query = String.format("SELECT * from c where c.id = '%s'", createdItem.getId());
queryRequestOptions.setExcludedRegions(excludeRegions);
FeedResponse<TestItem> itemFeedResponse =
cosmosAsyncContainer.queryItems(query, queryRequestOptions, TestItem.class).byPage().blockFirst();

return itemFeedResponse.getCosmosDiagnostics();
}

if (operationType == OperationType.Read
|| operationType == OperationType.Delete
|| operationType == OperationType.Replace
|| operationType == OperationType.Create
|| operationType == OperationType.Patch
|| operationType == OperationType.Upsert) {

CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
cosmosItemRequestOptions.setExcludedRegions(excludeRegions);

if (operationType == OperationType.Read) {

return cosmosAsyncContainer.readItem(
createdItem.getId(),
new PartitionKey(createdItem.getId()),
cosmosItemRequestOptions,
TestItem.class).block().getDiagnostics();
}

if (operationType == OperationType.Replace) {
return cosmosAsyncContainer.replaceItem(
createdItem,
createdItem.getId(),
new PartitionKey(createdItem.getId()),
cosmosItemRequestOptions).block().getDiagnostics();
}

if (operationType == OperationType.Delete) {
TestItem toBeDeletedItem = TestItem.createNewItem();
cosmosAsyncContainer.createItem(toBeDeletedItem).block();
return cosmosAsyncContainer.deleteItem(toBeDeletedItem, cosmosItemRequestOptions).block().getDiagnostics();
}

if (operationType == OperationType.Create) {
return cosmosAsyncContainer.createItem(TestItem.createNewItem(), cosmosItemRequestOptions).block().getDiagnostics();
}

if (operationType == OperationType.Upsert) {
return cosmosAsyncContainer.upsertItem(TestItem.createNewItem(), cosmosItemRequestOptions).block().getDiagnostics();
}

if (operationType == OperationType.Patch) {
CosmosPatchOperations patchOperations =
CosmosPatchOperations
.create()
.add("newPath", "newPath");

CosmosPatchItemRequestOptions patchItemRequestOptions = new CosmosPatchItemRequestOptions();
patchItemRequestOptions.setExcludedRegions(excludeRegions);
return cosmosAsyncContainer
.patchItem(createdItem.getId(), new PartitionKey(createdItem.getId()), patchOperations, patchItemRequestOptions, TestItem.class)
.block().getDiagnostics();
}
}

throw new IllegalArgumentException("The operation type is not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
if (clientException != null &&
Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.NOTFOUND) &&
Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE)) {
return Mono.just(this.shouldRetryOnSessionNotAvailable());
return Mono.just(this.shouldRetryOnSessionNotAvailable(this.request));
}

// This is for gateway mode, collection recreate scenario is not handled there
Expand Down Expand Up @@ -213,15 +213,17 @@ private Mono<ShouldRetryResult> shouldRetryAddressRefresh() {
return Mono.just(ShouldRetryResult.retryAfter(retryDelay));
}

private ShouldRetryResult shouldRetryOnSessionNotAvailable() {
private ShouldRetryResult shouldRetryOnSessionNotAvailable(RxDocumentServiceRequest request) {
this.sessionTokenRetryCount++;

if (!this.enableEndpointDiscovery) {
// if endpoint discovery is disabled, the request cannot be retried anywhere else
return ShouldRetryResult.noRetry();
} else {
if (this.canUseMultipleWriteLocations) {
UnmodifiableList<URI> endpoints = this.isReadRequest ? this.globalEndpointManager.getReadEndpoints() : this.globalEndpointManager.getWriteEndpoints();
UnmodifiableList<URI> endpoints =
this.isReadRequest ?
this.globalEndpointManager.getApplicableReadEndpoints(request) : this.globalEndpointManager.getApplicableWriteEndpoints(request);

if (this.sessionTokenRetryCount > endpoints.size()) {
// When use multiple write locations is true and the request has been tried
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ public UnmodifiableList<URI> getWriteEndpoints() {
return this.locationCache.getWriteEndpoints();
}

public UnmodifiableList<URI> getApplicableReadEndpoints(RxDocumentServiceRequest request) {
// readonly
return this.locationCache.getApplicableReadEndpoints(request);
}

public UnmodifiableList<URI> getApplicableWriteEndpoints(RxDocumentServiceRequest request) {
//readonly
return this.locationCache.getApplicableWriteEndpoints(request);
}

public List<URI> getAvailableReadEndpoints() {
return this.locationCache.getAvailableReadEndpoints();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,8 @@ private Mono<RxDocumentServiceRequest> getCreateDocumentRequest(DocumentClientRe
request.setNonIdempotentWriteRetriesEnabled(true);
}

request.requestContext.setExcludeRegions(options.getExcludeRegions());

if (requestRetryPolicy != null) {
requestRetryPolicy.onBeforeSendRequest(request);
}
Expand Down Expand Up @@ -1942,8 +1944,12 @@ public Mono<ResourceResponse<Document>> createDocument(String collectionLink, Ob
return ObservableHelper.inlineIfPossibleAsObs(() -> createDocumentInternal(collectionLink, document, options, disableAutomaticIdGeneration, finalRetryPolicyInstance), requestRetryPolicy);
}

private Mono<ResourceResponse<Document>> createDocumentInternal(String collectionLink, Object document,
RequestOptions options, boolean disableAutomaticIdGeneration, DocumentClientRetryPolicy requestRetryPolicy) {
private Mono<ResourceResponse<Document>> createDocumentInternal(
String collectionLink,
Object document,
RequestOptions options,
boolean disableAutomaticIdGeneration,
DocumentClientRetryPolicy requestRetryPolicy) {
try {
logger.debug("Creating a Document. collectionLink: [{}]", collectionLink);

Expand Down Expand Up @@ -2112,6 +2118,9 @@ private Mono<ResourceResponse<Document>> replaceDocumentInternal(String document
if (options != null && options.getNonIdempotentWriteRetriesEnabled()) {
request.setNonIdempotentWriteRetriesEnabled(true);
}

request.requestContext.setExcludeRegions(options.getExcludeRegions());

if (retryPolicyInstance != null) {
retryPolicyInstance.onBeforeSendRequest(request);
}
Expand Down Expand Up @@ -2180,6 +2189,7 @@ private Mono<ResourceResponse<Document>> patchDocumentInternal(String documentLi
if (options != null && options.getNonIdempotentWriteRetriesEnabled()) {
request.setNonIdempotentWriteRetriesEnabled(true);
}
request.requestContext.setExcludeRegions(options.getExcludeRegions());

if (retryPolicyInstance != null) {
retryPolicyInstance.onBeforeSendRequest(request);
Expand Down Expand Up @@ -2235,6 +2245,8 @@ private Mono<ResourceResponse<Document>> deleteDocumentInternal(String documentL
request.setNonIdempotentWriteRetriesEnabled(true);
}

request.requestContext.setExcludeRegions(options.getExcludeRegions());

if (retryPolicyInstance != null) {
retryPolicyInstance.onBeforeSendRequest(request);
}
Expand Down Expand Up @@ -2307,6 +2319,8 @@ private Mono<ResourceResponse<Document>> readDocumentInternal(String documentLin
Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.Document, OperationType.Read);
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
OperationType.Read, ResourceType.Document, path, requestHeaders, options);
request.requestContext.setExcludeRegions(options.getExcludeRegions());

if (retryPolicyInstance != null) {
retryPolicyInstance.onBeforeSendRequest(request);
}
Expand All @@ -2316,7 +2330,6 @@ private Mono<ResourceResponse<Document>> readDocumentInternal(String documentLin
Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, null, null, options, collectionObs);
CosmosEndToEndOperationLatencyPolicyConfig endToEndPolicyConfig = getEndToEndOperationLatencyPolicyConfig(options);
request.requestContext.setEndToEndOperationLatencyPolicyConfig(endToEndPolicyConfig);
request.requestContext.setExcludeRegions(options.getExcludeRegions());
return requestObs.flatMap(req -> {
Mono<ResourceResponse<Document>> resourceResponseMono = this.read(request, retryPolicyInstance).map(serviceResponse -> toResourceResponse(serviceResponse, Document.class));
return getRxDocumentServiceResponseMonoWithE2ETimeout(request, endToEndPolicyConfig, resourceResponseMono);
Expand Down
Loading

0 comments on commit 69896bd

Please sign in to comment.