Skip to content

Commit

Permalink
Documents bug in bulk batch writer and renames class to indicate its …
Browse files Browse the repository at this point in the history
…buggy (#288)
  • Loading branch information
keith-turner authored Dec 15, 2024
1 parent 9f597c6 commit 807b62c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ static BatchWriterFactory create(AccumuloClient client, ContinuousEnv env,
var workDir = new Path(bulkWorkDir);
var filesystem = workDir.getFileSystem(conf);
var memLimit = Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_BULK_MEM_LIMIT));
return tableName -> new BulkBatchWriter(client, tableName, filesystem, workDir, memLimit,
splitSupplier);
return tableName -> new FlakyBulkBatchWriter(client, tableName, filesystem, workDir,
memLimit, splitSupplier);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,14 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;

public class BulkBatchWriter implements BatchWriter {
/**
* BatchWriter that bulk imports in its implementation. The implementation contains a bug that was
* found to be useful for testing Accumulo. The bug was left and this class was renamed to add Flaky
* to indicate its danger for other uses.
*/
public class FlakyBulkBatchWriter implements BatchWriter {

private static final Logger log = LoggerFactory.getLogger(BulkBatchWriter.class);
private static final Logger log = LoggerFactory.getLogger(FlakyBulkBatchWriter.class);

private final Deque<Mutation> mutations = new ArrayDeque<>();
private final AccumuloClient client;
Expand All @@ -62,7 +67,7 @@ public class BulkBatchWriter implements BatchWriter {
private long memUsed;
private boolean closed = false;

public BulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileSystem,
public FlakyBulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileSystem,
Path workPath, long memLimit, Supplier<SortedSet<Text>> splitSupplier) {
this.client = client;
this.tableName = tableName;
Expand Down Expand Up @@ -119,23 +124,38 @@ public synchronized void flush() throws MutationsRejectedException {
}
}

Comparator<KeyValue> kvComparator = (kv1, kv2) -> kv1.getKey().compareTo(kv2.getKey());
keysValues.sort(kvComparator);
keysValues.sort(Map.Entry.comparingByKey());

RFileWriter writer = null;
byte[] currEndRow = null;
int nextFileNameCounter = 0;

var loadPlanBuilder = LoadPlan.builder();

// This code is broken because Arrays.compare will compare bytes as signed integers. Accumulo
// treats bytes as unsigned 8 bit integers for sorting purposes. This incorrect comparator
// causes this code to sometimes prematurely close rfiles, which can lead to lots of files
// being bulk imported into a single tablet. The files still go to the correct tablet, so this
// does not cause data loss. This bug was found to be useful in testing as it introduces
// stress on bulk import+compactions and it was decided to keep this bug. If copying this code
// elsewhere then this bug should probably be fixed.
Comparator<byte[]> comparator = Arrays::compare;
// To fix the code above it should be replaced with the following
// Comparator<byte[]> comparator = UnsignedBytes.lexicographicalComparator();

for (var keyValue : keysValues) {
var key = keyValue.getKey();
if (writer == null
|| (currEndRow != null && Arrays.compare(key.getRowData().toArray(), currEndRow) > 0)) {
if (writer == null || (currEndRow != null
&& comparator.compare(key.getRowData().toArray(), currEndRow) > 0)) {
if (writer != null) {
writer.close();
}

// When the above code prematurely closes a rfile because of the incorrect comparator, the
// following code will find a new Tablet. Since the following code uses the Text
// comparator its comparisons are correct and it will just find the same tablet for the
// file that was just closed. This is what cause multiple files to added to the same
// tablet.
var row = key.getRow();
var headSet = splits.headSet(row);
var tabletPrevRow = headSet.isEmpty() ? null : headSet.last();
Expand Down

0 comments on commit 807b62c

Please sign in to comment.