Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limits tablets and offers bulk import as option for ingest #287

Merged
merged 2 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions conf/accumulo-testing.properties
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ test.ci.ingest.row.max=9223372036854775807
test.ci.ingest.max.cf=32767
# Maximum number of random column qualifiers to generate
test.ci.ingest.max.cq=32767
# Maximum number of tablets that will be written to for a single flush. For each iteration of flush the tablets to
# write to are randomly chosen. When this is set to Integer.MAX_VALUE no limiting is done. This must be set to
# a number in the range [2,Integer.MAX_VALUE].
test.ci.ingest.max.tablets=2147483647
# Optional visibilities (in CSV format) that if specified will be randomly selected by ingesters for
# each linked list
test.ci.ingest.visibilities=
Expand All @@ -80,6 +84,11 @@ test.ci.ingest.pause.duration.max=120
# The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous ingest
# To disable deletes, set probability to 0.0
test.ci.ingest.delete.probability=0.1
# If set to a path in hdfs will use bulk import instead of batch writer to ingest data
test.ci.ingest.bulk.workdir=
# When using bulk import to ingest data this determines how much memory can be used to buffer mutations before creating
# rfiles and importing them.
test.ci.ingest.bulk.memory.limit=512000000

# Batch walker
# ------------
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/apache/accumulo/testing/TestProps.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ public class TestProps {
// The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous
// ingest
public static final String CI_INGEST_DELETE_PROBABILITY = CI_INGEST + "delete.probability";
// The max number of tablets that will be written to between flushes of the batch writer. Randomly
// selects the tablets when starting a new flush iteration.
public static final String CI_INGEST_MAX_TABLETS = CI_INGEST + "max.tablets";
// If set to a path in hdfs will use bulk import instead of batch writer to ingest data
public static final String CI_INGEST_BULK_WORK_DIR = CI_INGEST + "bulk.workdir";
// When using bulk import to ingest data this determines how much memory can be used to buffer
// mutations before creating rfiles and importing them.
public static final String CI_INGEST_BULK_MEM_LIMIT = CI_INGEST + "bulk.memory.limit";

/** Batch Walker **/
// Sleep time between batch scans (in ms)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.testing.continuous;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.Mutation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;

public class BulkBatchWriter implements BatchWriter {

private static final Logger log = LoggerFactory.getLogger(BulkBatchWriter.class);

private final List<Mutation> mutations = new ArrayList<>();
private final AccumuloClient client;
private final String tableName;
private final FileSystem fileSystem;
private final Path workPath;
private final long memLimit;
private final Supplier<SortedSet<Text>> splitSupplier;

private long memUsed;
private boolean closed = false;

public BulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileSystem,
Path workPath, long memLimit, Supplier<SortedSet<Text>> splitSupplier) {
this.client = client;
this.tableName = tableName;
this.fileSystem = fileSystem;
this.workPath = workPath;
this.memLimit = memLimit;
this.splitSupplier = splitSupplier;
}

@Override
public synchronized void addMutation(Mutation mutation) throws MutationsRejectedException {
Preconditions.checkState(!closed);
mutation = new Mutation(mutation);
mutations.add(mutation);
memUsed += mutation.estimatedMemoryUsed();
if (memUsed > memLimit) {
flush();
}
}

@Override
public synchronized void addMutations(Iterable<Mutation> iterable)
throws MutationsRejectedException {
for (var mutation : iterable) {
addMutation(mutation);
}
}

@Override
public synchronized void flush() throws MutationsRejectedException {
Preconditions.checkState(!closed);

try {
var splits = splitSupplier.get();

Path tmpDir = new Path(workPath, UUID.randomUUID().toString());
fileSystem.mkdirs(tmpDir);
mutations.sort((m1, m2) -> Arrays.compare(m1.getRow(), m2.getRow()));

RFileWriter writer = null;
byte[] currEndRow = null;
int nextFileNameCounter = 0;

var loadPlanBuilder = LoadPlan.builder();

for (var mutation : mutations) {
if (writer == null
|| (currEndRow != null && Arrays.compare(mutation.getRow(), currEndRow) > 0)) {
if (writer != null) {
writer.close();
}

var row = new Text(mutation.getRow());
var headSet = splits.headSet(row);
var tabletPrevRow = headSet.isEmpty() ? null : headSet.last();
var tailSet = splits.tailSet(row);
var tabletEndRow = tailSet.isEmpty() ? null : tailSet.first();
currEndRow = tabletEndRow == null ? null : tabletEndRow.copyBytes();

String filename = String.format("bbw-%05d.rf", nextFileNameCounter++);
writer = RFile.newWriter().to(tmpDir + "/" + filename).withFileSystem(fileSystem).build();
loadPlanBuilder = loadPlanBuilder.loadFileTo(filename, LoadPlan.RangeType.TABLE,
tabletPrevRow, tabletEndRow);

log.debug("Created new file {} for range {} {}", filename, tabletPrevRow, tabletEndRow);
}

for (var colUpdate : mutation.getUpdates()) {
var key = new Key(mutation.getRow(), colUpdate.getColumnFamily(),
colUpdate.getColumnQualifier(), colUpdate.getColumnVisibility());
if (colUpdate.hasTimestamp()) {
key.setTimestamp(colUpdate.getTimestamp());
}
if (colUpdate.isDeleted()) {
key.setDeleted(true);
}
writer.append(key, colUpdate.getValue());
}
}

if (writer != null) {
writer.close();
}

// TODO make table time configurable?
var loadPlan = loadPlanBuilder.build();

long t1 = System.nanoTime();
client.tableOperations().importDirectory(tmpDir.toString()).to(tableName).plan(loadPlan)
.tableTime(true).load();
long t2 = System.nanoTime();

log.debug("Bulk imported dir {} destinations:{} mutations:{} memUsed:{} time:{}ms", tmpDir,
loadPlan.getDestinations().size(), mutations.size(), memUsed,
TimeUnit.NANOSECONDS.toMillis(t2 - t1));

fileSystem.delete(tmpDir, true);

mutations.clear();
memUsed = 0;
} catch (Exception e) {
closed = true;
throw new MutationsRejectedException(client, List.of(), Map.of(), List.of(), 1, e);
}
}

@Override
public synchronized void close() throws MutationsRejectedException {
flush();
closed = true;
}
}
Loading
Loading