Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReplicaValidationForOpenConnectionAndInitCaches #4

Open
wants to merge 20 commits into
base: replicaValidationBeforeUse
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.net.MalformedURLException;
import java.net.URI;
Expand All @@ -61,6 +60,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -106,6 +106,7 @@ public class GatewayAddressCache implements IAddressCache {
private IOpenConnectionsHandler openConnectionsHandler;
private final ConnectionPolicy connectionPolicy;
private final boolean replicaAddressValidationEnabled;
private final List<Uri.HealthStatus> replicaValidationScopes;

public GatewayAddressCache(
DiagnosticsClientContext clientContext,
Expand Down Expand Up @@ -164,6 +165,10 @@ public GatewayAddressCache(
this.openConnectionsHandler = openConnectionsHandler;
this.connectionPolicy = connectionPolicy;
this.replicaAddressValidationEnabled = Configs.isReplicaAddressValidationEnabled();
this.replicaValidationScopes = new ArrayList<>();
if (this.replicaAddressValidationEnabled) {
this.replicaValidationScopes.add(Uri.HealthStatus.UnhealthyPending);
}
}

public GatewayAddressCache(
Expand Down Expand Up @@ -898,7 +903,15 @@ private void validateReplicaAddresses(AddressInformation[] addresses) {
Arrays
.stream(addresses)
.map(address -> address.getPhysicalUri())
.filter(addressUri -> addressUri.getHealthStatus() == Uri.HealthStatus.UnhealthyPending)
.filter(addressUri -> this.replicaValidationScopes.contains(addressUri.getHealthStatus()))
.sorted(new Comparator<Uri>() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just my 2 cents: I am not a fan of these stream-based or Linq-based operations liek sorted where it is unclear how the functional requirement is implemented.

Does sorted force using an ordered data structure? or use an unstructured data structure and then apply an order by after all results have been cached - like you would do when sorting the List you get back without this operator.

Perf-wise the latter would probably be better in this case>

Not blocking - and I don't have time to look into what .sorted actually is doing - just raising this as a general issue I have when these operators obfuscate the non-functional side-effects.

@Override
public int compare(Uri o1, Uri o2) {
// Generally, an unhealthyPending replica has more chances to fail the request compared to unknown replica
// and we will want to validate replicas with the highest chance to fail the request first
return o2.getHealthStatus().getPriority() - o1.getHealthStatus().getPriority();
}
})
.collect(Collectors.toList());

if (addressesNeedToValidation.size() > 0) {
Expand Down Expand Up @@ -965,6 +978,10 @@ public Flux<OpenConnectionResponse> openConnectionsAndInitCaches(
JavaStreamUtils.toString(partitionKeyRangeIdentities, ","));
}

if (this.replicaAddressValidationEnabled) {
this.replicaValidationScopes.add(Uri.HealthStatus.Unknown);
}

List<Flux<List<Address>>> tasks = new ArrayList<>();
int batchSize = GatewayAddressCache.DefaultBatchSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,10 @@ public String toString() {
* </p>
*/
public enum HealthStatus {
Connected(0),
Unknown(1),
UnhealthyPending(2),
Unhealthy(3);
Connected(100),
Unknown(200),
UnhealthyPending(300),
Unhealthy(400);

private int priority;
HealthStatus(int priority) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
import static com.azure.cosmos.implementation.directconnectivity.Uri.HealthStatus.Connected;
import static com.azure.cosmos.implementation.directconnectivity.Uri.HealthStatus.UnhealthyPending;
import static com.azure.cosmos.implementation.directconnectivity.Uri.HealthStatus.Unknown;
import static org.assertj.core.api.Assertions.assertThat;

public class GatewayAddressCacheTest extends TestSuiteBase {
Expand Down Expand Up @@ -105,9 +106,11 @@ public Object[][] protocolProvider() {
@DataProvider(name = "replicaValidationArgsProvider")
public Object[][] replicaValidationArgsProvider() {
return new Object[][]{
// replica validation is enabled
{ false },
{ true },
// replicaValidationIsEnabled, openConnectionsAndInitCaches
{ false, false },
{ false, true },
{ true, false },
{ true, true },
};
}

Expand Down Expand Up @@ -928,7 +931,7 @@ public Mono<List<Address>> answer(InvocationOnMock invocationOnMock) throws Thro

@SuppressWarnings("unchecked")
@Test(groups = { "direct" }, dataProvider = "replicaValidationArgsProvider", timeOut = TIMEOUT)
public void tryGetAddress_replicaValidationTests(boolean replicaValidationEnabled) throws Exception {
public void tryGetAddress_replicaValidationTests(boolean replicaValidationEnabled, boolean openConnectionAndInitCaches) throws Exception {
Configs configs = ConfigsBuilder.instance().withProtocol(Protocol.TCP).build();
URI serviceEndpoint = new URI(TestConfigurations.HOST);
IAuthorizationTokenProvider authorizationTokenProvider = (RxDocumentClientImpl) client;
Expand Down Expand Up @@ -966,8 +969,15 @@ public void tryGetAddress_replicaValidationTests(boolean replicaValidationEnable
new Database(),
new HashMap<>());

if (openConnectionAndInitCaches) {
List<PartitionKeyRangeIdentity> pkriList = Arrays.asList(new PartitionKeyRangeIdentity("0"));
cache.openConnectionsAndInitCaches(createdCollection, pkriList).blockLast();
Mockito.clearInvocations(openConnectionsHandlerMock);
httpClientWrapper.capturedRequests.clear();
}

PartitionKeyRangeIdentity partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(createdCollection.getResourceId(), "0");
boolean forceRefreshPartitionAddresses = false;
boolean forceRefreshPartitionAddresses = true;

Mono<Utils.ValueHolder<AddressInformation[]>> addressesInfosFromCacheObs =
cache.tryGetAddresses(req, partitionKeyRangeIdentity, forceRefreshPartitionAddresses);
Expand All @@ -982,21 +992,38 @@ public void tryGetAddress_replicaValidationTests(boolean replicaValidationEnable
if (replicaValidationEnabled) {
ArgumentCaptor<List<Uri>> openConnectionArguments = ArgumentCaptor.forClass(List.class);

// Open connection will only be called for unhealthyPending status address
Mockito.verify(openConnectionsHandlerMock, Mockito.times(0)).openConnections(openConnectionArguments.capture());
if (openConnectionAndInitCaches) {
// If openConnectionAndInitCaches is called, then replica validation will also include for unknown status
Mockito.verify(openConnectionsHandlerMock, Mockito.times(1)).openConnections(openConnectionArguments.capture());
assertThat(openConnectionArguments.getValue()).hasSize(addressInfosFromCache.size());
} else {
// Open connection will only be called for unhealthyPending status address
Mockito.verify(openConnectionsHandlerMock, Mockito.times(0)).openConnections(openConnectionArguments.capture());
}
} else {
Mockito.verify(openConnectionsHandlerMock, Mockito.never()).openConnections(Mockito.any());
}

// Mark one of the uri as unhealthy, others as connected
// and then force refresh the addresses again, make sure the health status of the uri is reserved
httpClientWrapper.capturedRequests.clear();
Mockito.clearInvocations(openConnectionsHandlerMock);
for (AddressInformation address : addressInfosFromCache) {
address.getPhysicalUri().setConnected();

// Mark one of the uri as unhealthy, one as unknown, others as connected
// and then force refresh the addresses again, make sure the health status of the uri is reserved
assertThat(addressInfosFromCache.size()).isGreaterThan(2);
Uri unknownAddressUri = null;
Uri unhealthyAddressUri = null;
for (int i = 0; i < addressInfosFromCache.size(); i++) {
if (i == 0) {
unknownAddressUri = addressInfosFromCache.get(0).getPhysicalUri();
continue;
}
if (i == 1) {
unhealthyAddressUri = addressInfosFromCache.get(1).getPhysicalUri();
unhealthyAddressUri.setUnhealthy();
} else {
addressInfosFromCache.get(i).getPhysicalUri().setConnected();
}
}
Uri unhealthyAddressUri = addressInfosFromCache.get(0).getPhysicalUri();
unhealthyAddressUri.setUnhealthy();

ArrayList<AddressInformation> refreshedAddresses =
Lists.newArrayList(getSuccessResult(cache.tryGetAddresses(req, partitionKeyRangeIdentity, true), TIMEOUT).v);
Expand All @@ -1007,9 +1034,12 @@ public void tryGetAddress_replicaValidationTests(boolean replicaValidationEnable

// validate connected status will be reserved
// validate unhealthy status will change into unhealthyPending status
// validate openConnection will only be called for addresses not in connected status
// Validate openConnection will be called for addresses in unhealthyPending status
// Validate openConnection will be called for addresses in unknown status if openConnectionAndInitCaches is called
for (AddressInformation addressInformation : refreshedAddresses) {
if (addressInformation.getPhysicalUri().equals(unhealthyAddressUri)) {
if (addressInformation.getPhysicalUri().equals(unknownAddressUri)) {
assertThat(addressInformation.getPhysicalUri().getHealthStatus()).isEqualTo(Unknown);
} else if (addressInformation.getPhysicalUri().equals(unhealthyAddressUri)) {
assertThat(addressInformation.getPhysicalUri().getHealthStatus()).isEqualTo(UnhealthyPending);
} else {
assertThat(addressInformation.getPhysicalUri().getHealthStatus()).isEqualTo(Connected);
Expand All @@ -1019,8 +1049,12 @@ public void tryGetAddress_replicaValidationTests(boolean replicaValidationEnable
if (replicaValidationEnabled) {
ArgumentCaptor<List<Uri>> openConnectionArguments = ArgumentCaptor.forClass(List.class);
Mockito.verify(openConnectionsHandlerMock, Mockito.times(1)).openConnections(openConnectionArguments.capture());
if (openConnectionAndInitCaches) {
assertThat(openConnectionArguments.getValue()).containsExactlyElementsOf(Arrays.asList(unhealthyAddressUri, unknownAddressUri));
} else {
assertThat(openConnectionArguments.getValue()).containsExactly(unhealthyAddressUri);
}

assertThat(openConnectionArguments.getValue()).hasSize(1).containsExactly(unhealthyAddressUri);
} else {
Mockito.verify(openConnectionsHandlerMock, Mockito.never()).openConnections(Mockito.any());
}
Expand Down Expand Up @@ -1125,7 +1159,7 @@ public void validateReplicaAddressesTests() throws URISyntaxException, NoSuchMet
AddressInformation address1 = new AddressInformation(true, true, "rntbd://127.0.0.1:1", Protocol.TCP);
address1.getPhysicalUri().setConnected();

// remain in unknwon status
// remain in unknown status
AddressInformation address2 = new AddressInformation(true, false, "rntbd://127.0.0.1:2", Protocol.TCP);

// unhealthy status
Expand All @@ -1137,14 +1171,19 @@ public void validateReplicaAddressesTests() throws URISyntaxException, NoSuchMet
AtomicReference<Uri.HealthStatus> healthStatus = ReflectionUtils.getHealthStatus(address4.getPhysicalUri());
healthStatus.set(UnhealthyPending);

// Set the replica validation scope
List<Uri.HealthStatus> replicaValidationScopes = ReflectionUtils.getReplicaValidationScopes(cache);
replicaValidationScopes.add(Unknown);
replicaValidationScopes.add(UnhealthyPending);

validateReplicaAddressesMethod.invoke(cache, new Object[]{ new AddressInformation[]{ address1, address2, address3, address4 }}) ;

// Validate openConnection will only be called for address in unhealthyPending status
ArgumentCaptor<List<Uri>> openConnectionArguments = ArgumentCaptor.forClass(List.class);
Mockito.verify(openConnectionsHandlerMock, Mockito.times(1)).openConnections(openConnectionArguments.capture());

assertThat(openConnectionArguments.getValue()).hasSize(1).containsExactlyElementsOf(
Arrays.asList(address4)
assertThat(openConnectionArguments.getValue()).containsExactlyElementsOf(
Arrays.asList(address4, address2)
.stream()
.map(addressInformation -> addressInformation.getPhysicalUri())
.collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,4 +384,9 @@ public static void setClientTelemetryMetadataHttpClient(ClientTelemetry clientTe
public static AtomicReference<Uri.HealthStatus> getHealthStatus(Uri uri) {
return get(AtomicReference.class, uri, "healthStatus");
}

@SuppressWarnings("unchecked")
public static List<Uri.HealthStatus> getReplicaValidationScopes(GatewayAddressCache gatewayAddressCache) {
return get(List.class, gatewayAddressCache, "replicaValidationScopes");
}
}