Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into tracing-2
Browse files Browse the repository at this point in the history
  • Loading branch information
dzane17 committed Feb 1, 2024
2 parents 82843fd + 16c5257 commit 279ef17
Show file tree
Hide file tree
Showing 16 changed files with 217 additions and 167 deletions.
2 changes: 1 addition & 1 deletion DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ Please follow these formatting guidelines:
* Wildcard imports (`import foo.bar.baz.*`) are forbidden and will cause the build to fail.
* If *absolutely* necessary, you can disable formatting for regions of code with the `// tag::NAME` and `// end::NAME` directives, but note that these are intended for use in documentation, so please make it clear what you have done, and only do this where the benefit clearly outweighs the decrease in consistency.
* Note that JavaDoc and block comments i.e. `/* ... */` are not formatted, but line comments i.e `// ...` are.
* There is an implicit rule that negative boolean expressions should use the form `foo == false` instead of `!foo` for better readability of the code. While this isn't strictly enforced, if might get called out in PR reviews as something to change.
* There is an implicit rule that negative boolean expressions should use the form `foo == false` instead of `!foo` for better readability of the code. While this isn't strictly enforced, it might get called out in PR reviews as something to change.

## Adding Dependencies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.MockSecureSettings;
Expand Down Expand Up @@ -188,6 +190,7 @@ protected String requestUniqueId(final HttpExchange exchange) {
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
private static class AzureHTTPStatsCollectorHandler extends HttpStatsCollectorHandler {

private static final Logger testLogger = LogManager.getLogger(AzureHTTPStatsCollectorHandler.class);
private static final Pattern listPattern = Pattern.compile("GET /[a-zA-Z0-9]+\\??.+");
private static final Pattern getPattern = Pattern.compile("GET /[^?/]+/[^?/]+\\??.*");

Expand All @@ -197,6 +200,7 @@ private AzureHTTPStatsCollectorHandler(HttpHandler delegate) {

@Override
protected void maybeTrack(String request, Headers headers) {
testLogger.info(request, headers);
if (getPattern.matcher(request).matches()) {
trackRequest("GetBlob");
} else if (Regex.simpleMatch("HEAD /*/*", request)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ public void testIndexing() throws Exception {
* @throws Exception if index creation fail
* @throws UnsupportedOperationException if cluster type is unknown
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/7679")
public void testIndexingWithSegRep() throws Exception {
if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) {
logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,12 +523,12 @@ public void testRerouteRecovery() throws Exception {

logger.info("--> waiting for recovery to start both on source and target");
final Index index = resolveIndex(INDEX_NAME);
assertBusy(() -> {
assertBusyWithFixedSleepTime(() -> {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeA);
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), equalTo(1));
indicesService = internalCluster().getInstance(IndicesService.class, nodeB);
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), equalTo(1));
});
}, TimeValue.timeValueSeconds(10), TimeValue.timeValueMillis(500));

logger.info("--> request recoveries");
RecoveryResponse response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,67 @@ public void testCancellation() throws Exception {
assertDocCounts(docCount, primaryNode);
}

public void testCancellationDuringGetCheckpointInfo() throws Exception {
cancelDuringReplicaAction(SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO);
}

public void testCancellationDuringGetSegments() throws Exception {
cancelDuringReplicaAction(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES);
}

private void cancelDuringReplicaAction(String actionToblock) throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
);
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);

final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
final SegmentReplicationTargetService targetService = internalCluster().getInstance(
SegmentReplicationTargetService.class,
replicaNode
);
final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
CountDownLatch startCancellationLatch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(1);

MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
);
primaryTransportService.addRequestHandlingBehavior(actionToblock, (handler, request, channel, task) -> {
logger.info("action {}", actionToblock);
try {
startCancellationLatch.countDown();
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});

// index a doc and trigger replication
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

// remove the replica and ensure it is cleaned up.
startCancellationLatch.await();
SegmentReplicationTarget target = targetService.get(replicaShard.shardId());
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
);
assertEquals("Replication not closed: " + target.getId(), 0, target.refCount());
assertEquals("Store has a positive refCount", 0, replicaShard.store().refCount());
// stop the replica, this will do additional checks on shutDown to ensure the replica and its store are closed properly
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));
latch.countDown();
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,12 @@ public void testMultipleIndices() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String index_2 = "tst-index-2";
List<String> nodes = new ArrayList<>();
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
nodes.add(primaryNode);
createIndex(INDEX_NAME, index_2);

ensureYellowAndNoInitializingShards(INDEX_NAME, index_2);
nodes.add(internalCluster().startNode());
nodes.add(internalCluster().startDataOnlyNode());
ensureGreen(INDEX_NAME, index_2);

final long numDocs = scaledRandomIntBetween(50, 100);
Expand All @@ -284,44 +284,47 @@ public void testMultipleIndices() throws Exception {
refresh(INDEX_NAME, index_2);
waitForSearchableDocs(INDEX_NAME, numDocs, nodes);
waitForSearchableDocs(index_2, numDocs, nodes);
ensureSearchable(INDEX_NAME, index_2);

final IndexShard index_1_primary = getIndexShard(primaryNode, INDEX_NAME);
final IndexShard index_2_primary = getIndexShard(primaryNode, index_2);

assertTrue(index_1_primary.routingEntry().primary());
assertTrue(index_2_primary.routingEntry().primary());

// test both indices are returned in the response.
SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin()
.indices()
.prepareSegmentReplicationStats()
.execute()
.actionGet();
assertBusy(() -> {
// test both indices are returned in the response.
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats()
.execute()
.actionGet();

Map<String, List<SegmentReplicationPerGroupStats>> replicationStats = segmentReplicationStatsResponse.getReplicationStats();
assertEquals(2, replicationStats.size());
List<SegmentReplicationPerGroupStats> replicationPerGroupStats = replicationStats.get(INDEX_NAME);
assertEquals(1, replicationPerGroupStats.size());
SegmentReplicationPerGroupStats perGroupStats = replicationPerGroupStats.get(0);
assertEquals(perGroupStats.getShardId(), index_1_primary.shardId());
Set<SegmentReplicationShardStats> replicaStats = perGroupStats.getReplicaStats();
assertEquals(1, replicaStats.size());
for (SegmentReplicationShardStats replica : replicaStats) {
assertNotNull(replica.getCurrentReplicationState());
}
Map<String, List<SegmentReplicationPerGroupStats>> replicationStats = segmentReplicationStatsResponse.getReplicationStats();
assertEquals(2, replicationStats.size());
List<SegmentReplicationPerGroupStats> replicationPerGroupStats = replicationStats.get(INDEX_NAME);
assertEquals(1, replicationPerGroupStats.size());
SegmentReplicationPerGroupStats perGroupStats = replicationPerGroupStats.get(0);
assertEquals(perGroupStats.getShardId(), index_1_primary.shardId());
Set<SegmentReplicationShardStats> replicaStats = perGroupStats.getReplicaStats();
assertEquals(1, replicaStats.size());
for (SegmentReplicationShardStats replica : replicaStats) {
assertNotNull(replica.getCurrentReplicationState());
}

replicationPerGroupStats = replicationStats.get(index_2);
assertEquals(1, replicationPerGroupStats.size());
perGroupStats = replicationPerGroupStats.get(0);
assertEquals(perGroupStats.getShardId(), index_2_primary.shardId());
replicaStats = perGroupStats.getReplicaStats();
assertEquals(1, replicaStats.size());
for (SegmentReplicationShardStats replica : replicaStats) {
assertNotNull(replica.getCurrentReplicationState());
}
replicationPerGroupStats = replicationStats.get(index_2);
assertEquals(1, replicationPerGroupStats.size());
perGroupStats = replicationPerGroupStats.get(0);
assertEquals(perGroupStats.getShardId(), index_2_primary.shardId());
replicaStats = perGroupStats.getReplicaStats();
assertEquals(1, replicaStats.size());
for (SegmentReplicationShardStats replica : replicaStats) {
assertNotNull(replica.getCurrentReplicationState());
}
}, 30, TimeUnit.SECONDS);

// test only single index queried.
segmentReplicationStatsResponse = client().admin()
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats()
.setIndices(index_2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public void testCloseWhileDeletingIndices() throws Exception {
throw new AssertionError(e);
}
try {
assertAcked(client().admin().indices().prepareDelete(indexToDelete));
assertAcked(client().admin().indices().prepareDelete(indexToDelete).setTimeout("60s"));
} catch (final Exception e) {
assertException(e, indexToDelete);
}
Expand All @@ -301,7 +301,7 @@ public void testCloseWhileDeletingIndices() throws Exception {
throw new AssertionError(e);
}
try {
client().admin().indices().prepareClose(indexToClose).get();
client().admin().indices().prepareClose(indexToClose).setTimeout("60s").get();
} catch (final Exception e) {
assertException(e, indexToClose);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,4 @@ public void testDisconnectsDuringRecovery() {
public void testReplicaRecovery() {

}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9580")
public void testRerouteRecovery() {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,16 @@ private void validateCurrentMetadata() throws Exception {
internalCluster().getClusterManagerName()
);
assertBusy(() -> {
ClusterMetadataManifest manifest = remoteClusterStateService.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().metadata().clusterUUID()
).get();
ClusterMetadataManifest manifest;
try {
manifest = remoteClusterStateService.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().metadata().clusterUUID()
).get();
} catch (IllegalStateException e) {
// AssertionError helps us use assertBusy and retry validation if failed due to a race condition.
throw new AssertionError("Error while validating latest cluster metadata", e);
}
ClusterState clusterState = getClusterState();
Metadata currentMetadata = clusterState.metadata();
assertEquals(currentMetadata.indices().size(), manifest.getIndices().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTarget;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.SlowClusterStateProcessing;

Expand All @@ -33,6 +31,8 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* This class runs tests with remote store + segRep while blocking file downloads
*/
Expand All @@ -59,22 +59,18 @@ public void testCancelReplicationWhileSyncingSegments() throws Exception {
indexSingleDoc();
refresh(INDEX_NAME);
waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10));
final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId());
assertEquals(SegmentReplicationState.Stage.GET_FILES, state.getStage());
ReplicationCollection.ReplicationRef<SegmentReplicationTarget> segmentReplicationTargetReplicationRef = targetService.get(
state.getReplicationId()
);
final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get();
// close the target ref here otherwise it will hold a refcount
segmentReplicationTargetReplicationRef.close();
SegmentReplicationTarget segmentReplicationTarget = targetService.get(indexShard.shardId());
assertNotNull(segmentReplicationTarget);
assertEquals(SegmentReplicationState.Stage.GET_FILES, segmentReplicationTarget.state().getStage());
assertTrue(segmentReplicationTarget.refCount() > 0);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
assertBusy(() -> {
assertTrue(indexShard.routingEntry().primary());
assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()));
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
});
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
);
assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()));
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
unblockNode(REPOSITORY_NAME, replicaNode);
cleanupRepo();
}
Expand All @@ -85,7 +81,6 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception {

final Set<String> dataNodeNames = internalCluster().getDataNodeNames();
final String replicaNode = getNode(dataNodeNames, false);
final String primaryNode = getNode(dataNodeNames, true);

SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode);
ensureGreen(INDEX_NAME);
Expand All @@ -94,22 +89,18 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception {
indexSingleDoc();
refresh(INDEX_NAME);
waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10));
final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId());
assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage());
ReplicationCollection.ReplicationRef<SegmentReplicationTarget> segmentReplicationTargetReplicationRef = targetService.get(
state.getReplicationId()
);
final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get();
// close the target ref here otherwise it will hold a refcount
segmentReplicationTargetReplicationRef.close();
SegmentReplicationTarget segmentReplicationTarget = targetService.get(indexShard.shardId());
assertNotNull(segmentReplicationTarget);
assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, segmentReplicationTarget.state().getStage());
assertTrue(segmentReplicationTarget.refCount() > 0);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
assertBusy(() -> {
assertTrue(indexShard.routingEntry().primary());
assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()));
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
});
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
);
assertNull(targetService.get(indexShard.shardId()));
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
unblockNode(REPOSITORY_NAME, replicaNode);
cleanupRepo();
}
Expand Down
Loading

0 comments on commit 279ef17

Please sign in to comment.