Skip to content

Commit

Permalink
fixes bug with bulk import RPC failure and retry (apache#4000)
Browse files Browse the repository at this point in the history
The bulk import code had a bug where if an exception was thrown in a
certain place in the tablet code that it would mark the bulk import as
complete.  After the exception, other code would retry and see the bulk
import was complete even though it was not.  This commit changes the
behavior to only note success when there is no exception. In addition
test were added that recreated this bug.  These test set a constraint on
the metadata table that cause bulk import writes to fail.
  • Loading branch information
keith-turner authored Nov 30, 2023
1 parent e34f7fe commit 751a110
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1790,6 +1790,11 @@ > getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIME
var storedTabletFile = getDatafileManager().importMapFiles(tid, entries, setTime);
lastMapFileImportTime = System.currentTimeMillis();

synchronized (this) {
// only mark the bulk import a success if no exception was thrown
bulkImported.computeIfAbsent(tid, k -> new ArrayList<>()).addAll(fileMap.keySet());
}

if (isSplitPossible()) {
getTabletServer().executeSplit(this);
} else {
Expand All @@ -1804,11 +1809,6 @@ > getTabletServer().getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIME
"Likely bug in code, always expect to remove something. Please open an Accumulo issue.");
}

try {
bulkImported.computeIfAbsent(tid, k -> new ArrayList<>()).addAll(fileMap.keySet());
} catch (Exception ex) {
log.info(ex.toString(), ex);
}
tabletServer.removeBulkImportState(files);
}
}
Expand Down
147 changes: 147 additions & 0 deletions test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.accumulo.test.functional;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -37,46 +40,62 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.LoadPlan.RangeType;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.constraints.Constraint;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.MemoryUnit;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.constraints.MetadataConstraints;
import org.apache.accumulo.server.constraints.SystemEnvironment;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -506,6 +525,43 @@ public void testEndOfFirstTablet() throws Exception {
}
}

@Test
public void testExceptionInMetadataUpdate() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {

// after setting this up, bulk imports should never succeed on a tablet server
setupBulkConstraint(getPrincipal(), c);

String dir = getDir("/testExceptionInMetadataUpdate-");

String h1 = writeData(dir + "/f1.", aconf, 0, 333);

var executor = Executors.newSingleThreadExecutor();
// With the constraint configured that makes tservers throw an exception on bulk import, the
// bulk import should never succeed. So run the bulk import in another thread.
var future = executor.submit(() -> {
c.tableOperations().importDirectory(dir).to(tableName).load();
return null;
});

Thread.sleep(10000);

// the bulk import should not be done
assertFalse(future.isDone());

// remove the constraint which should allow the bulk import running in the background thread
// to complete
removeBulkConstraint(getPrincipal(), c);

// wait for the future to complete and ensure it had no exceptions
future.get();

// verifty the data was bulk imported
verifyData(c, tableName, 0, 333, false);
verifyMetadata(c, tableName, Map.of("null", Set.of(h1)));
}
}

private void addSplits(AccumuloClient client, String tableName, String splitString)
throws Exception {
SortedSet<Text> splits = new TreeSet<>();
Expand Down Expand Up @@ -605,4 +661,95 @@ private String writeData(String file, AccumuloConfiguration aconf, int s, int e)

return hash(filename);
}

/**
* This constraint is used to simulate an error in the metadata write for a bulk import.
*/
public static class NoBulkConstratint implements Constraint {

public static final String CANARY_VALUE = "a!p@a#c$h%e^&*()";
public static final short CANARY_CODE = 31234;

@Override
public String getViolationDescription(short violationCode) {
if (violationCode == 1) {
return "Bulk import files are not allowed in this test";
} else if (violationCode == CANARY_CODE) {
return "Check used to see if constraint is active";
}

return null;
}

@Override
public List<Short> check(Environment env, Mutation mutation) {
for (var colUpdate : mutation.getUpdates()) {
var fam = new Text(colUpdate.getColumnFamily());
if (fam.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)) {
var stf = new StoredTabletFile(new String(colUpdate.getColumnQualifier(), UTF_8));
if (stf.getFileName().startsWith("I")) {
return List.of((short) 1);
}
}

if (new String(colUpdate.getValue(), UTF_8).equals(CANARY_VALUE)) {
return List.of(CANARY_CODE);
}

}

return null;
}
}

static void setupBulkConstraint(String principal, AccumuloClient c)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
// add a constraint to the metadata table that disallows bulk import files to be added
c.securityOperations().grantTablePermission(principal, MetadataTable.NAME,
TablePermission.WRITE);
c.securityOperations().grantTablePermission(principal, MetadataTable.NAME,
TablePermission.ALTER_TABLE);

c.tableOperations().addConstraint(MetadataTable.NAME, NoBulkConstratint.class.getName());

var metaConstraints = new MetadataConstraints();
SystemEnvironment env = EasyMock.createMock(SystemEnvironment.class);
ServerContext context = EasyMock.createMock(ServerContext.class);
EasyMock.expect(env.getServerContext()).andReturn(context);
EasyMock.replay(env);

// wait for the constraint to be active on the metadata table
Wait.waitFor(() -> {
try (var bw = c.createBatchWriter(MetadataTable.NAME)) {
Mutation m = new Mutation("~garbage");
m.put("", "", NoBulkConstratint.CANARY_VALUE);
// This test assume the metadata constraint check will not flag this mutation, the following
// validates this assumption.
assertNull(metaConstraints.check(env, m));
bw.addMutation(m);
return false;
} catch (MutationsRejectedException e) {
return e.getConstraintViolationSummaries().stream()
.anyMatch(cvs -> cvs.violationCode == NoBulkConstratint.CANARY_CODE);
}
});

// delete the junk added to the metadata table
try (var bw = c.createBatchWriter(MetadataTable.NAME)) {
Mutation m = new Mutation("~garbage");
m.putDelete("", "");
bw.addMutation(m);
}
}

static void removeBulkConstraint(String principal, AccumuloClient c)
throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
int constraintNum = c.tableOperations().listConstraints(MetadataTable.NAME)
.get(NoBulkConstratint.class.getName());
c.tableOperations().removeConstraint(MetadataTable.NAME, constraintNum);
c.securityOperations().revokeTablePermission(principal, MetadataTable.NAME,
TablePermission.WRITE);
c.securityOperations().revokeTablePermission(principal, MetadataTable.NAME,
TablePermission.ALTER_TABLE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.accumulo.test.functional;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
Expand All @@ -30,6 +32,7 @@
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
Expand Down Expand Up @@ -60,6 +63,9 @@ protected Duration defaultTimeout() {
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) {
cfg.setMemory(ServerType.TABLET_SERVER, 512, MemoryUnit.MEGABYTE);
// lowering this because the test testExceptionInMetadataUpdate() will cause retries and the
// default takes forever
cfg.setProperty(Property.TSERV_BULK_RETRY, "2");
}

// suppress importDirectory deprecated since this is the only test for legacy technique
Expand Down Expand Up @@ -107,6 +113,51 @@ public void testBulkFile() throws Exception {

}

// test case where the metadata data update throws an exception
@SuppressWarnings("deprecation")
@Test
public void testExceptionInMetadataUpdate() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {

// after setting this up, bulk imports should fail
BulkNewIT.setupBulkConstraint(getAdminPrincipal(), c);

String tableName = getUniqueNames(1)[0];

c.tableOperations().create(tableName);
Configuration conf = new Configuration();
AccumuloConfiguration aconf = getCluster().getServerContext().getConfiguration();
FileSystem fs = getCluster().getFileSystem();
String rootPath = cluster.getTemporaryPath().toString();

String dir = rootPath + "/bulk_test_diff_files_89723987592_" + getUniqueNames(1)[0];

fs.delete(new Path(dir), true);

writeData(conf, aconf, fs, dir, "f1", 0, 333);

String failDir = dir + "_failures";
Path failPath = new Path(failDir);
fs.delete(failPath, true);
fs.mkdirs(failPath);
fs.deleteOnExit(failPath);

// this should fail and it should copy the file to the fail dir
c.tableOperations().importDirectory(tableName, dir, failDir, false);

if (fs.listStatus(failPath).length < 1) {
throw new Exception("Expected files in failure directory");
}

try (var scanner = c.createScanner(tableName)) {
// verify the table is empty
assertEquals(0, scanner.stream().count());
}

BulkNewIT.removeBulkConstraint(getAdminPrincipal(), c);
}
}

private void writeData(Configuration conf, AccumuloConfiguration aconf, FileSystem fs, String dir,
String file, int start, int end) throws IOException, Exception {
FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder()
Expand Down

0 comments on commit 751a110

Please sign in to comment.