From 0e0edea84d1b2d40539c192479ff5e5d169552ad Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 9 Oct 2024 20:01:34 +0530 Subject: [PATCH] Add more tests Signed-off-by: Sachin Kale --- .../RemoteFsTimestampAwareTranslog.java | 8 +- .../RemoteFsTimestampAwareTranslogTests.java | 190 +++++++++++++++++- 2 files changed, 189 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 3ccacde22bbfc..db69537a25f75 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -240,6 +240,7 @@ public void onResponse(List blobMetadata) { remoteGenerationDeletionPermits.release(); } } catch (Exception e) { + logger.error("Exception in trimUnreferencedReaders", e); remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); } } @@ -441,7 +442,7 @@ protected static void deleteStaleRemotePrimaryTerms( } Optional minPrimaryTermFromMetadataFiles = metadataFilesNotToBeDeleted.stream().map(file -> { try { - return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap).v1(); + return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap, logger).v1(); } catch (IOException e) { return Long.MIN_VALUE; } @@ -482,7 +483,8 @@ protected static Long getMinPrimaryTermInRemote( protected static Tuple getMinMaxPrimaryTermFromMetadataFile( String metadataFile, TranslogTransferManager translogTransferManager, - Map> oldFormatMetadataFilePrimaryTermMap + Map> oldFormatMetadataFilePrimaryTermMap, + Logger logger ) throws IOException { Tuple minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile); if (minMaxPrimaryTermFromFileName != null) { @@ -504,6 +506,8 @@ protected static Tuple getMinMaxPrimaryTermFromMetadataFile( if (primaryTerm.isPresent()) { minPrimaryTem = primaryTerm.get(); } + } else { + logger.warn("No primary term found from GenerationToPrimaryTermMap for file [{}]", metadataFile); } Tuple minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem); oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java index 73db3314f4d1e..c66cb847ccd66 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java @@ -63,6 +63,7 @@ import org.mockito.Mockito; +import static org.opensearch.index.translog.RemoteFsTranslog.REMOTE_DELETION_PERMITS; import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED; @@ -1061,15 +1062,14 @@ public void testGetMinMaxTranslogGenerationFromMetadataFile() throws IOException public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException { TranslogTransferManager translogTransferManager = mock(TranslogTransferManager.class); - RemoteFsTimestampAwareTranslog translog = (RemoteFsTimestampAwareTranslog) this.translog; - // Fetch generations directly from the filename assertEquals( new Tuple<>(1L, 1008L), RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile( "metadata__9223372036854774799__9223372036854774799__9223370311919910393__31__9223372036854775106__1__1", translogTransferManager, - new HashMap<>() + new HashMap<>(), + logger ) ); assertEquals( @@ -1077,7 +1077,8 @@ public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException { RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile( "metadata__9223372036854775800__9223372036854775800__9223370311919910398__31__9223372036854775803__4__1", translogTransferManager, - new HashMap<>() + new HashMap<>(), + logger ) ); assertEquals( @@ -1085,7 +1086,8 @@ public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException { RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile( "metadata__9223372036854775797__9223372036854775800__9223370311919910398__31__9223372036854775803__10__1", translogTransferManager, - new HashMap<>() + new HashMap<>(), + logger ) ); @@ -1099,7 +1101,8 @@ public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException { RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile( "metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1", translogTransferManager, - new HashMap<>() + new HashMap<>(), + logger ) ); assertEquals( @@ -1107,7 +1110,8 @@ public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException { RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile( "metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1", translogTransferManager, - Map.of("metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1", new Tuple<>(4L, 7L)) + Map.of("metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1", new Tuple<>(4L, 7L)), + logger ) ); @@ -1115,6 +1119,36 @@ public void testGetMinMaxPrimaryTermFromMetadataFile() throws IOException { verify(translogTransferManager, times(0)).readMetadata( "metadata__9223372036438563903__9223372036854775800__9223370311919910398__31__1" ); + + // Older md files with empty GenerationToPrimaryTermMap + md1 = mock(TranslogTransferMetadata.class); + when(md1.getGenerationToPrimaryTermMapper()).thenReturn(Map.of()); + when(translogTransferManager.readMetadata("metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1")) + .thenReturn(md1); + assertEquals( + new Tuple<>(-1L, 2L), + RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile( + "metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1", + translogTransferManager, + new HashMap<>(), + logger + ) + ); + + // Older md files with empty GenerationToPrimaryTermMap + md1 = mock(TranslogTransferMetadata.class); + when(md1.getGenerationToPrimaryTermMapper()).thenReturn(null); + when(translogTransferManager.readMetadata("metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1")) + .thenReturn(md1); + assertEquals( + new Tuple<>(-1L, 2L), + RemoteFsTimestampAwareTranslog.getMinMaxPrimaryTermFromMetadataFile( + "metadata__9223372036854775805__9223372036854774799__9223370311919910393__31__1", + translogTransferManager, + new HashMap<>(), + logger + ) + ); } public void testDeleteStaleRemotePrimaryTerms() throws IOException { @@ -1332,4 +1366,146 @@ public void testGetMinPrimaryTermInRemoteNotFetched() throws IOException { ); verify(translogTransferManager).listPrimaryTermsInRemote(); } + + public void testTrimUnreferencedReadersStalePinnedTimestamps() throws Exception { + ArrayList ops = new ArrayList<>(); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("0", 0, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 })); + + // First reader is created at the init of translog + assertEquals(3, translog.readers.size()); + assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertBusy(() -> { + assertEquals(6, translog.allUploaded().size()); + assertEquals( + 6, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 3, primaryTerm.get(), new byte[] { 1 })); + + assertBusy(() -> { + assertEquals( + 10, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }); + + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + + translog.setMinSeqNoToKeep(3); + translog.trimUnreferencedReaders(); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("5", 5, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("6", 6, primaryTerm.get(), new byte[] { 1 })); + + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + translog.setMinSeqNoToKeep(6); + translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + + assertEquals(1, translog.readers.size()); + assertBusy(() -> { + assertEquals(2, translog.allUploaded().size()); + assertEquals(7, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertEquals( + 16, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }, 30, TimeUnit.SECONDS); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("7", 7, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("8", 8, primaryTerm.get(), new byte[] { 1 })); + + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + + assertEquals(3, translog.readers.size()); + assertBusy(() -> { + assertEquals(6, translog.allUploaded().size()); + assertEquals(9, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertEquals( + 20, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }, 30, TimeUnit.SECONDS); + } + + public void testTrimUnreferencedReadersNoPermits() throws Exception { + // Acquire the permits so that remote translog deletion will not happen + translog.remoteGenerationDeletionPermits.acquire(REMOTE_DELETION_PERMITS); + + ArrayList ops = new ArrayList<>(); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("0", 0, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 })); + + // First reader is created at the init of translog + assertEquals(3, translog.readers.size()); + assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertBusy(() -> { + assertEquals(6, translog.allUploaded().size()); + assertEquals( + 6, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 3, primaryTerm.get(), new byte[] { 1 })); + + assertBusy(() -> { + assertEquals( + 10, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + // Fetch pinned timestamps so that it won't be stale + updatePinnedTimstampTask.run(); + translog.setMinSeqNoToKeep(3); + translog.trimUnreferencedReaders(); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("5", 5, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("6", 6, primaryTerm.get(), new byte[] { 1 })); + + // Fetch pinned timestamps so that it won't be stale + updatePinnedTimstampTask.run(); + translog.setMinSeqNoToKeep(6); + translog.trimUnreferencedReaders(); + + assertEquals(1, translog.readers.size()); + assertBusy(() -> { + assertEquals(2, translog.allUploaded().size()); + assertEquals(7, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertEquals( + 16, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }, 30, TimeUnit.SECONDS); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("7", 7, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("8", 8, primaryTerm.get(), new byte[] { 1 })); + + // Fetch pinned timestamps so that it won't be stale + updatePinnedTimstampTask.run(); + translog.trimUnreferencedReaders(); + + assertEquals(3, translog.readers.size()); + assertBusy(() -> { + assertEquals(6, translog.allUploaded().size()); + assertEquals(9, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertEquals( + 20, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }, 30, TimeUnit.SECONDS); + } }