Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Oct 10, 2024
1 parent acf209f commit 0e0edea
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
remoteGenerationDeletionPermits.release();
}
} catch (Exception e) {
logger.error("Exception in trimUnreferencedReaders", e);
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
}
}
Expand Down Expand Up @@ -441,7 +442,7 @@ protected static void deleteStaleRemotePrimaryTerms(
}
Optional<Long> minPrimaryTermFromMetadataFiles = metadataFilesNotToBeDeleted.stream().map(file -> {
try {
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap).v1();
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap, logger).v1();
} catch (IOException e) {
return Long.MIN_VALUE;
}
Expand Down Expand Up @@ -482,7 +483,8 @@ protected static Long getMinPrimaryTermInRemote(
protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
String metadataFile,
TranslogTransferManager translogTransferManager,
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap,
Logger logger
) throws IOException {
Tuple<Long, Long> minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile);
if (minMaxPrimaryTermFromFileName != null) {
Expand All @@ -504,6 +506,8 @@ protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
if (primaryTerm.isPresent()) {
minPrimaryTem = primaryTerm.get();
}
} else {
logger.warn("No primary term found from GenerationToPrimaryTermMap for file [{}]", metadataFile);
}
Tuple<Long, Long> minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem);
oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import org.mockito.Mockito;

import static org.opensearch.index.translog.RemoteFsTranslog.REMOTE_DELETION_PERMITS;
import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED;
Expand Down Expand Up @@ -1061,31 +1062,32 @@ public void testGetMinMaxTranslogGenerationFromMetadataFile() throws IOException
public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException {
TranslogTransferManager translogTransferManager = mock(TranslogTransferManager.class);

RemoteFsTimestampAwareTranslog translog = (RemoteFsTimestampAwareTranslog) this.translog;

// Fetch generations directly from the filename
assertEquals(
new Tuple<>(1L, 1008L),
RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile(
"metadata__9223372036854774799__9223372036854774799__9223370311919910393__31__9223372036854775106__1__1",
translogTransferManager,
new HashMap<>()
new HashMap<>(),
logger
)
);
assertEquals(
new Tuple<>(4L, 7L),
RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile(
"metadata__9223372036854775800__9223372036854775800__9223370311919910398__31__9223372036854775803__4__1",
translogTransferManager,
new HashMap<>()
new HashMap<>(),
logger
)
);
assertEquals(
new Tuple<>(10L, 10L),
RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile(
"metadata__9223372036854775797__9223372036854775800__9223370311919910398__31__9223372036854775803__10__1",
translogTransferManager,
new HashMap<>()
new HashMap<>(),
logger
)
);

Expand All @@ -1099,22 +1101,54 @@ public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException {
RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile(
"metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1",
translogTransferManager,
new HashMap<>()
new HashMap<>(),
logger
)
);
assertEquals(
new Tuple<>(4L, 7L),
RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile(
"metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1",
translogTransferManager,
Map.of("metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1", new Tuple<>(4L, 7L))
Map.of("metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1", new Tuple<>(4L, 7L)),
logger
)
);

verify(translogTransferManager).readMetadata("metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1");
verify(translogTransferManager, times(0)).readMetadata(
"metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1"
);

// Older md files with empty GenerationToPrimaryTermMap
md1 = mock(TranslogTransferMetadata.class);
when(md1.getGenerationToPrimaryTermMapper()).thenReturn(Map.of());
when(translogTransferManager.readMetadata("metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1"))
.thenReturn(md1);
assertEquals(
new Tuple<>(-1L, 2L),
RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile(
"metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1",
translogTransferManager,
new HashMap<>(),
logger
)
);

// Older md files with empty GenerationToPrimaryTermMap
md1 = mock(TranslogTransferMetadata.class);
when(md1.getGenerationToPrimaryTermMapper()).thenReturn(null);
when(translogTransferManager.readMetadata("metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1"))
.thenReturn(md1);
assertEquals(
new Tuple<>(-1L, 2L),
RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile(
"metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1",
translogTransferManager,
new HashMap<>(),
logger
)
);
}

public void testDeleteStaleRemotePrimaryTerms() throws IOException {
Expand Down Expand Up @@ -1332,4 +1366,146 @@ public void testGetMinPrimaryTermInRemoteNotFetched() throws IOException {
);
verify(translogTransferManager).listPrimaryTermsInRemote();
}

public void testTrimUnreferencedReadersStalePinnedTimestamps() throws Exception {
ArrayList<Translog.Operation> ops = new ArrayList<>();

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("0", 0, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 }));

// First reader is created at the init of translog
assertEquals(3, translog.readers.size());
assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());
assertBusy(() -> {
assertEquals(6, translog.allUploaded().size());
assertEquals(
6,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 3, primaryTerm.get(), new byte[] { 1 }));

assertBusy(() -> {
assertEquals(
10,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});

assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));

translog.setMinSeqNoToKeep(3);
translog.trimUnreferencedReaders();

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("5", 5, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("6", 6, primaryTerm.get(), new byte[] { 1 }));

assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
translog.setMinSeqNoToKeep(6);
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));

assertEquals(1, translog.readers.size());
assertBusy(() -> {
assertEquals(2, translog.allUploaded().size());
assertEquals(7, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());
assertEquals(
16,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
}, 30, TimeUnit.SECONDS);

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("7", 7, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("8", 8, primaryTerm.get(), new byte[] { 1 }));

assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));

assertEquals(3, translog.readers.size());
assertBusy(() -> {
assertEquals(6, translog.allUploaded().size());
assertEquals(9, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());
assertEquals(
20,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
}, 30, TimeUnit.SECONDS);
}

public void testTrimUnreferencedReadersNoPermits() throws Exception {
// Acquire the permits so that remote translog deletion will not happen
translog.remoteGenerationDeletionPermits.acquire(REMOTE_DELETION_PERMITS);

ArrayList<Translog.Operation> ops = new ArrayList<>();

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("0", 0, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 }));

// First reader is created at the init of translog
assertEquals(3, translog.readers.size());
assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());
assertBusy(() -> {
assertEquals(6, translog.allUploaded().size());
assertEquals(
6,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 3, primaryTerm.get(), new byte[] { 1 }));

assertBusy(() -> {
assertEquals(
10,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
// Fetch pinned timestamps so that it won't be stale
updatePinnedTimstampTask.run();
translog.setMinSeqNoToKeep(3);
translog.trimUnreferencedReaders();

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("5", 5, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("6", 6, primaryTerm.get(), new byte[] { 1 }));

// Fetch pinned timestamps so that it won't be stale
updatePinnedTimstampTask.run();
translog.setMinSeqNoToKeep(6);
translog.trimUnreferencedReaders();

assertEquals(1, translog.readers.size());
assertBusy(() -> {
assertEquals(2, translog.allUploaded().size());
assertEquals(7, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());
assertEquals(
16,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
}, 30, TimeUnit.SECONDS);

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("7", 7, primaryTerm.get(), new byte[] { 1 }));
addToTranslogAndListAndUpload(translog, ops, new Translog.Index("8", 8, primaryTerm.get(), new byte[] { 1 }));

// Fetch pinned timestamps so that it won't be stale
updatePinnedTimstampTask.run();
translog.trimUnreferencedReaders();

assertEquals(3, translog.readers.size());
assertBusy(() -> {
assertEquals(6, translog.allUploaded().size());
assertEquals(9, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());
assertEquals(
20,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
}, 30, TimeUnit.SECONDS);
}
}

0 comments on commit 0e0edea

Please sign in to comment.