From 751a110efe6ecf43c0aa6662160fa3766d881936 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 30 Nov 2023 16:13:51 -0500 Subject: [PATCH] fixes bug with bulk import RPC failure and retry (#4000) The bulk import code had a bug where if an exception was thrown in a certain place in the tablet code that it would mark the bulk import as complete. After the exception, other code would retry and see the bulk import was complete even though it was not. This commit changes the behavior to only note success when there is no exception. In addition test were added that recreated this bug. These test set a constraint on the metadata table that cause bulk import writes to fail. --- .../accumulo/tserver/tablet/Tablet.java | 10 +- .../accumulo/test/functional/BulkNewIT.java | 147 ++++++++++++++++++ .../accumulo/test/functional/BulkOldIT.java | 51 ++++++ 3 files changed, 203 insertions(+), 5 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 175f3a1f276..3f7eebe95fe 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -1790,6 +1790,11 @@ > getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIME var storedTabletFile = getDatafileManager().importMapFiles(tid, entries, setTime); lastMapFileImportTime = System.currentTimeMillis(); + synchronized (this) { + // only mark the bulk import a success if no exception was thrown + bulkImported.computeIfAbsent(tid, k -> new ArrayList<>()).addAll(fileMap.keySet()); + } + if (isSplitPossible()) { getTabletServer().executeSplit(this); } else { @@ -1804,11 +1809,6 @@ > getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIME "Likely bug in code, always expect to remove something. Please open an Accumulo issue."); } - try { - bulkImported.computeIfAbsent(tid, k -> new ArrayList<>()).addAll(fileMap.keySet()); - } catch (Exception ex) { - log.info(ex.toString(), ex); - } tabletServer.removeBulkImportState(files); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index afa0afc677b..a8aebd2e251 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -18,10 +18,13 @@ */ package org.apache.accumulo.test.functional; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -37,18 +40,23 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -56,20 +64,30 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.LoadPlan.RangeType; +import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.constraints.Constraint; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.MemoryUnit; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.constraints.MetadataConstraints; +import org.apache.accumulo.server.constraints.SystemEnvironment; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -77,6 +95,7 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Text; +import org.easymock.EasyMock; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -506,6 +525,43 @@ public void testEndOfFirstTablet() throws Exception { } } + @Test + public void testExceptionInMetadataUpdate() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + // after setting this up, bulk imports should never succeed on a tablet server + setupBulkConstraint(getPrincipal(), c); + + String dir = getDir("/testExceptionInMetadataUpdate-"); + + String h1 = writeData(dir + "/f1.", aconf, 0, 333); + + var executor = Executors.newSingleThreadExecutor(); + // With the constraint configured that makes tservers throw an exception on bulk import, the + // bulk import should never succeed. So run the bulk import in another thread. + var future = executor.submit(() -> { + c.tableOperations().importDirectory(dir).to(tableName).load(); + return null; + }); + + Thread.sleep(10000); + + // the bulk import should not be done + assertFalse(future.isDone()); + + // remove the constraint which should allow the bulk import running in the background thread + // to complete + removeBulkConstraint(getPrincipal(), c); + + // wait for the future to complete and ensure it had no exceptions + future.get(); + + // verifty the data was bulk imported + verifyData(c, tableName, 0, 333, false); + verifyMetadata(c, tableName, Map.of("null", Set.of(h1))); + } + } + private void addSplits(AccumuloClient client, String tableName, String splitString) throws Exception { SortedSet splits = new TreeSet<>(); @@ -605,4 +661,95 @@ private String writeData(String file, AccumuloConfiguration aconf, int s, int e) return hash(filename); } + + /** + * This constraint is used to simulate an error in the metadata write for a bulk import. + */ + public static class NoBulkConstratint implements Constraint { + + public static final String CANARY_VALUE = "a!p@a#c$h%e^&*()"; + public static final short CANARY_CODE = 31234; + + @Override + public String getViolationDescription(short violationCode) { + if (violationCode == 1) { + return "Bulk import files are not allowed in this test"; + } else if (violationCode == CANARY_CODE) { + return "Check used to see if constraint is active"; + } + + return null; + } + + @Override + public List check(Environment env, Mutation mutation) { + for (var colUpdate : mutation.getUpdates()) { + var fam = new Text(colUpdate.getColumnFamily()); + if (fam.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)) { + var stf = new StoredTabletFile(new String(colUpdate.getColumnQualifier(), UTF_8)); + if (stf.getFileName().startsWith("I")) { + return List.of((short) 1); + } + } + + if (new String(colUpdate.getValue(), UTF_8).equals(CANARY_VALUE)) { + return List.of(CANARY_CODE); + } + + } + + return null; + } + } + + static void setupBulkConstraint(String principal, AccumuloClient c) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + // add a constraint to the metadata table that disallows bulk import files to be added + c.securityOperations().grantTablePermission(principal, MetadataTable.NAME, + TablePermission.WRITE); + c.securityOperations().grantTablePermission(principal, MetadataTable.NAME, + TablePermission.ALTER_TABLE); + + c.tableOperations().addConstraint(MetadataTable.NAME, NoBulkConstratint.class.getName()); + + var metaConstraints = new MetadataConstraints(); + SystemEnvironment env = EasyMock.createMock(SystemEnvironment.class); + ServerContext context = EasyMock.createMock(ServerContext.class); + EasyMock.expect(env.getServerContext()).andReturn(context); + EasyMock.replay(env); + + // wait for the constraint to be active on the metadata table + Wait.waitFor(() -> { + try (var bw = c.createBatchWriter(MetadataTable.NAME)) { + Mutation m = new Mutation("~garbage"); + m.put("", "", NoBulkConstratint.CANARY_VALUE); + // This test assume the metadata constraint check will not flag this mutation, the following + // validates this assumption. + assertNull(metaConstraints.check(env, m)); + bw.addMutation(m); + return false; + } catch (MutationsRejectedException e) { + return e.getConstraintViolationSummaries().stream() + .anyMatch(cvs -> cvs.violationCode == NoBulkConstratint.CANARY_CODE); + } + }); + + // delete the junk added to the metadata table + try (var bw = c.createBatchWriter(MetadataTable.NAME)) { + Mutation m = new Mutation("~garbage"); + m.putDelete("", ""); + bw.addMutation(m); + } + } + + static void removeBulkConstraint(String principal, AccumuloClient c) + throws AccumuloException, TableNotFoundException, AccumuloSecurityException { + int constraintNum = c.tableOperations().listConstraints(MetadataTable.NAME) + .get(NoBulkConstratint.class.getName()); + c.tableOperations().removeConstraint(MetadataTable.NAME, constraintNum); + c.securityOperations().revokeTablePermission(principal, MetadataTable.NAME, + TablePermission.WRITE); + c.securityOperations().revokeTablePermission(principal, MetadataTable.NAME, + TablePermission.ALTER_TABLE); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkOldIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkOldIT.java index 093277a5dd5..d8df93be07b 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkOldIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkOldIT.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.test.functional; +import static org.junit.jupiter.api.Assertions.assertEquals; + import java.io.IOException; import java.time.Duration; import java.util.Iterator; @@ -30,6 +32,7 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; @@ -60,6 +63,9 @@ protected Duration defaultTimeout() { @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) { cfg.setMemory(ServerType.TABLET_SERVER, 512, MemoryUnit.MEGABYTE); + // lowering this because the test testExceptionInMetadataUpdate() will cause retries and the + // default takes forever + cfg.setProperty(Property.TSERV_BULK_RETRY, "2"); } // suppress importDirectory deprecated since this is the only test for legacy technique @@ -107,6 +113,51 @@ public void testBulkFile() throws Exception { } + // test case where the metadata data update throws an exception + @SuppressWarnings("deprecation") + @Test + public void testExceptionInMetadataUpdate() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + // after setting this up, bulk imports should fail + BulkNewIT.setupBulkConstraint(getAdminPrincipal(), c); + + String tableName = getUniqueNames(1)[0]; + + c.tableOperations().create(tableName); + Configuration conf = new Configuration(); + AccumuloConfiguration aconf = getCluster().getServerContext().getConfiguration(); + FileSystem fs = getCluster().getFileSystem(); + String rootPath = cluster.getTemporaryPath().toString(); + + String dir = rootPath + "/bulk_test_diff_files_89723987592_" + getUniqueNames(1)[0]; + + fs.delete(new Path(dir), true); + + writeData(conf, aconf, fs, dir, "f1", 0, 333); + + String failDir = dir + "_failures"; + Path failPath = new Path(failDir); + fs.delete(failPath, true); + fs.mkdirs(failPath); + fs.deleteOnExit(failPath); + + // this should fail and it should copy the file to the fail dir + c.tableOperations().importDirectory(tableName, dir, failDir, false); + + if (fs.listStatus(failPath).length < 1) { + throw new Exception("Expected files in failure directory"); + } + + try (var scanner = c.createScanner(tableName)) { + // verify the table is empty + assertEquals(0, scanner.stream().count()); + } + + BulkNewIT.removeBulkConstraint(getAdminPrincipal(), c); + } + } + private void writeData(Configuration conf, AccumuloConfiguration aconf, FileSystem fs, String dir, String file, int start, int end) throws IOException, Exception { FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder()