Skip to content

Commit

Permalink
Cleanup ReplicationServerTest and AckedCopyTest (#776)
Browse files Browse the repository at this point in the history
  • Loading branch information
aprudhomme authored Oct 29, 2024
1 parent 47f4f9b commit 9c5b2a0
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 478 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,10 @@ private void updateDocuments(
IndexState indexState,
ShardState shardState)
throws IOException {
if (shardState.isReplica()) {
throw new IllegalStateException(
"Adding documents to an index on a replica node is not supported");
}
for (Document nextDoc : documents) {
nextDoc = handleFacets(indexState, shardState, nextDoc);
shardState.writer.updateDocument(idFieldDef.getTerm(nextDoc), nextDoc);
Expand Down
190 changes: 47 additions & 143 deletions src/test/java/com/yelp/nrtsearch/server/grpc/AckedCopyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,9 @@
*/
package com.yelp.nrtsearch.server.grpc;

import static com.yelp.nrtsearch.server.grpc.GrpcServer.rmDir;
import static com.yelp.nrtsearch.server.grpc.ReplicationServerTest.validateSearchResults;

import com.amazonaws.services.s3.AmazonS3;
import com.yelp.nrtsearch.server.config.NrtsearchConfig;
import com.yelp.nrtsearch.server.remote.RemoteBackend;
import com.yelp.nrtsearch.server.remote.s3.S3Backend;
import com.yelp.nrtsearch.server.utils.LuceneServerTestConfigurationFactory;
import com.yelp.nrtsearch.test_utils.AmazonS3Provider;
import com.yelp.nrtsearch.server.config.IndexStartConfig;
import io.grpc.testing.GrpcCleanupRule;
import java.io.IOException;
import java.nio.file.Paths;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -44,167 +35,80 @@ public class AckedCopyTest {
*/
@Rule public final TemporaryFolder folder = new TemporaryFolder();

@Rule public final AmazonS3Provider s3Provider = new AmazonS3Provider(BUCKET_NAME);

private GrpcServer luceneServerPrimary;
private GrpcServer replicationServerPrimary;

private GrpcServer luceneServerSecondary;
private GrpcServer replicationServerSecondary;

private static final String BUCKET_NAME = "acked-copy-unittest";
private RemoteBackend remoteBackend;
private AmazonS3 s3;

@After
public void tearDown() throws IOException {
luceneServerPrimary.getGlobalState().close();
luceneServerSecondary.getGlobalState().close();
rmDir(Paths.get(luceneServerPrimary.getIndexDir()).getParent());
rmDir(Paths.get(luceneServerSecondary.getIndexDir()).getParent());
}

public void setUp(int chunkSize, int ackEvery, int maxInFlight) throws IOException {
s3 = s3Provider.getAmazonS3();

String extraConfig =
String.join(
"\n",
"FileCopyConfig:",
" ackedCopy: true",
" chunkSize: " + chunkSize,
" ackEvery: " + ackEvery,
" maxInFlight: " + maxInFlight);

// set up primary servers
String testIndex = "test_index";
NrtsearchConfig luceneServerPrimaryConfiguration =
LuceneServerTestConfigurationFactory.getConfig(Mode.PRIMARY, folder.getRoot(), extraConfig);
remoteBackend = new S3Backend(luceneServerPrimaryConfiguration, s3);
luceneServerPrimary =
new GrpcServer(
grpcCleanup,
luceneServerPrimaryConfiguration,
folder,
null,
luceneServerPrimaryConfiguration.getIndexDir(),
testIndex,
luceneServerPrimaryConfiguration.getPort(),
remoteBackend);
replicationServerPrimary =
new GrpcServer(
grpcCleanup,
luceneServerPrimaryConfiguration,
folder,
luceneServerPrimary.getGlobalState(),
luceneServerPrimaryConfiguration.getIndexDir(),
testIndex,
luceneServerPrimaryConfiguration.getReplicationPort(),
remoteBackend);
luceneServerPrimary
.getGlobalState()
.replicationStarted(luceneServerPrimaryConfiguration.getReplicationPort());
// set up secondary servers
NrtsearchConfig luceneServerSecondaryConfiguration =
LuceneServerTestConfigurationFactory.getConfig(Mode.REPLICA, folder.getRoot(), extraConfig);

luceneServerSecondary =
new GrpcServer(
grpcCleanup,
luceneServerSecondaryConfiguration,
folder,
null,
luceneServerSecondaryConfiguration.getIndexDir(),
testIndex,
luceneServerSecondaryConfiguration.getPort(),
remoteBackend);
replicationServerSecondary =
new GrpcServer(
grpcCleanup,
luceneServerSecondaryConfiguration,
folder,
luceneServerSecondary.getGlobalState(),
luceneServerSecondaryConfiguration.getIndexDir(),
testIndex,
luceneServerSecondaryConfiguration.getReplicationPort(),
remoteBackend);
luceneServerSecondary
.getGlobalState()
.replicationStarted(luceneServerSecondaryConfiguration.getReplicationPort());
public void cleanup() {
TestServer.cleanupAll();
}

@Test
public void ackAllLimit1() throws IOException, InterruptedException {
setUp(2, 1, 1);
testReplication();
testReplication(2, 1, 1);
}

@Test
public void ack2Limit2() throws IOException, InterruptedException {
setUp(2, 2, 2);
testReplication();
testReplication(2, 2, 2);
}

@Test
public void ack2Limit4() throws IOException, InterruptedException {
setUp(2, 2, 4);
testReplication();
testReplication(2, 2, 4);
}

@Test
public void ack2Limit2LargeChunk() throws IOException, InterruptedException {
setUp(1024, 2, 2);
testReplication();
testReplication(1024, 2, 2);
}

private void testReplication() throws IOException, InterruptedException {
private void testReplication(int chunkSize, int ackEvery, int maxInFlight)
throws IOException, InterruptedException {
String extraConfig =
String.join(
"\n",
"FileCopyConfig:",
" ackedCopy: true",
" chunkSize: " + chunkSize,
" ackEvery: " + ackEvery,
" maxInFlight: " + maxInFlight);

// index 2 documents to primary
GrpcServer.TestServer testServerPrimary =
new GrpcServer.TestServer(luceneServerPrimary, true, Mode.PRIMARY);
testServerPrimary.addDocuments();
TestServer testServerPrimary =
TestServer.builder(folder)
.withAutoStartConfig(
true, Mode.PRIMARY, 0, IndexStartConfig.IndexDataLocationType.LOCAL)
.withAdditionalConfig(extraConfig)
.build();
testServerPrimary.createSimpleIndex("test_index");
testServerPrimary.startPrimaryIndex("test_index", -1, null);
testServerPrimary.addSimpleDocs("test_index", 1, 2);

// refresh (also sends NRTPoint to replicas, but none started at this point)
luceneServerPrimary
.getBlockingStub()
.refresh(RefreshRequest.newBuilder().setIndexName("test_index").build());
testServerPrimary.refresh("test_index");
testServerPrimary.verifySimpleDocIds("test_index", 1, 2);

// startIndex replica
GrpcServer.TestServer testServerReplica =
new GrpcServer.TestServer(luceneServerSecondary, true, Mode.REPLICA);
TestServer testServerReplica =
TestServer.builder(folder)
.withAutoStartConfig(
true,
Mode.REPLICA,
testServerPrimary.getReplicationPort(),
IndexStartConfig.IndexDataLocationType.LOCAL)
.withAdditionalConfig(extraConfig)
.build();
testServerReplica.registerWithPrimary("test_index");

// add 2 more docs to primary
testServerPrimary.addDocuments();
testServerPrimary.addSimpleDocs("test_index", 3, 4);

// publish new NRT point (retrieve the current searcher version on primary)
SearcherVersion searcherVersionPrimary =
replicationServerPrimary
.getReplicationServerBlockingStub()
.writeNRTPoint(IndexName.newBuilder().setIndexName("test_index").build());
testServerPrimary.refresh("test_index");

// primary should show 4 hits now
SearchResponse searchResponsePrimary =
luceneServerPrimary
.getBlockingStub()
.search(
SearchRequest.newBuilder()
.setIndexName(luceneServerPrimary.getTestIndex())
.setStartHit(0)
.setTopHits(10)
.setVersion(searcherVersionPrimary.getVersion())
.addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES)
.build());
testServerPrimary.verifySimpleDocs("test_index", 4);

// replica should too!
SearchResponse searchResponseSecondary =
luceneServerSecondary
.getBlockingStub()
.search(
SearchRequest.newBuilder()
.setIndexName(luceneServerSecondary.getTestIndex())
.setStartHit(0)
.setTopHits(10)
.setVersion(searcherVersionPrimary.getVersion())
.addAllRetrieveFields(NrtsearchServerTest.RETRIEVED_VALUES)
.build());

validateSearchResults(searchResponsePrimary);
validateSearchResults(searchResponseSecondary);
testServerReplica.waitForReplication("test_index");
testServerReplica.verifySimpleDocIds("test_index", 1, 2, 3, 4);
}
}
Loading

0 comments on commit 9c5b2a0

Please sign in to comment.