From 2e19341d9611c6787d5a706143556b167f4bf721 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Sun, 17 Nov 2024 21:46:08 +0000 Subject: [PATCH 1/2] limits tablets and offers bulk import as option for ingest Two new continuous ingest features are introduced in this changes. First options were added to limit the number of tablets written. Second an options to use bulk ingest was added instead of a batch writer. These features support running a test like the following. * create a continuous ingest table with 1000 tablets * start 100 continuous ingest clients * have each client continually bulk import data to 10 random tablets This test situation will create a lot of bulk import and subsequent compaction activity for Accumulo to handle. These changes add bulk import to the `cingest ingest` command. There is an existing `cingest bulk` command that runs a map reduce job to create bulk files. These changes do not remove the need for the existing map reduce job, they fill a different purpose. The map reduce job can generate really large amount of data to bulk import. These changes allow generating lots of bulk imports w/ small amounts of data. These changes could never generate the amount of data for a single bulk import that the map reduce job could. The following is an example of test scenario that could use both. * create a continuous ingest table with 1000 tablets * use map reduce bulk job to create an initial 10 billion entries in the table * start 100 continuous ingest clients * have each client continually bulk import data to 10 random tablets * stop clients after 12 hours and verify data --- conf/accumulo-testing.properties | 9 + .../apache/accumulo/testing/TestProps.java | 8 + .../testing/continuous/BulkBatchWriter.java | 183 ++++++++++++++++++ .../testing/continuous/ContinuousIngest.java | 166 ++++++++++++++-- .../testing/continuous/ManySplits.java | 7 +- 5 files changed, 360 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties index c938163d..1872491e 100644 --- a/conf/accumulo-testing.properties +++ b/conf/accumulo-testing.properties @@ -61,6 +61,10 @@ test.ci.ingest.row.max=9223372036854775807 test.ci.ingest.max.cf=32767 # Maximum number of random column qualifiers to generate test.ci.ingest.max.cq=32767 +# Maximum number of tablets that will be written to for a single flush. For each iteration of flush the tablets to +# write to are randomly chosen. When this is set to Integer.MAX_VALUE no limiting is done. This must be set to +# a number in the range [2,Integer.MAX_VALUE]. +test.ci.ingest.max.tablets=2147483647 # Optional visibilities (in CSV format) that if specified will be randomly selected by ingesters for # each linked list test.ci.ingest.visibilities= @@ -80,6 +84,11 @@ test.ci.ingest.pause.duration.max=120 # The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous ingest # To disable deletes, set probability to 0.0 test.ci.ingest.delete.probability=0.1 +# If set to a path in hdfs will use bulk import instead of batch writer to ingest data +test.ci.ingest.bulk.workdir= +# When using bulk import to ingest data this determines how much memory can be used to buffer mutations before creating +# rfiles and importing them. +test.ci.ingest.bulk.memory.limit=512000000 # Batch walker # ------------ diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java b/src/main/java/org/apache/accumulo/testing/TestProps.java index aa8e9e63..1e851816 100644 --- a/src/main/java/org/apache/accumulo/testing/TestProps.java +++ b/src/main/java/org/apache/accumulo/testing/TestProps.java @@ -106,6 +106,14 @@ public class TestProps { // The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous // ingest public static final String CI_INGEST_DELETE_PROBABILITY = CI_INGEST + "delete.probability"; + // The max number of tablets that will be written to between flushes of the batch writer. Randomly + // selects the tablets when starting a new flush iteration. + public static final String CI_INGEST_MAX_TABLETS = CI_INGEST + "max.tablets"; + // If set to a path in hdfs will use bulk import instead of batch writer to ingest data + public static final String CI_INGEST_BULK_WORK_DIR = CI_INGEST + "bulk.workdir"; + // When using bulk import to ingest data this determines how much memory can be used to buffer + // mutations before creating rfiles and importing them. + public static final String CI_INGEST_BULK_MEM_LIMIT = CI_INGEST + "bulk.memory.limit"; /** Batch Walker **/ // Sleep time between batch scans (in ms) diff --git a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java new file mode 100644 index 00000000..fe50575a --- /dev/null +++ b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.testing.continuous; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.client.rfile.RFileWriter; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; +import org.apache.accumulo.core.data.Mutation; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; + +public class BulkBatchWriter implements BatchWriter { + + private static final Logger log = LoggerFactory.getLogger(BulkBatchWriter.class); + + private final List mutations = new ArrayList<>(); + private final AccumuloClient client; + private final String tableName; + private final FileSystem fileSystem; + private final Path workPath; + private final long memLimit; + private final Supplier> splitSupplier; + + private long memUsed; + private boolean closed = false; + + public BulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileSystem, + Path workPath, long memLimit) { + this.client = client; + this.tableName = tableName; + this.fileSystem = fileSystem; + this.workPath = workPath; + this.memLimit = memLimit; + this.splitSupplier = Suppliers.memoizeWithExpiration(() -> { + try { + var splits = client.tableOperations().listSplits(tableName); + return new TreeSet<>(splits); + } catch (Exception e) { + throw new IllegalStateException(e); + } + + }, 10, TimeUnit.MINUTES); + } + + @Override + public synchronized void addMutation(Mutation mutation) throws MutationsRejectedException { + Preconditions.checkState(!closed); + mutation = new Mutation(mutation); + mutations.add(mutation); + memUsed += mutation.estimatedMemoryUsed(); + if (memUsed > memLimit) { + flush(); + } + } + + @Override + public synchronized void addMutations(Iterable iterable) + throws MutationsRejectedException { + for (var mutation : iterable) { + addMutation(mutation); + } + } + + @Override + public synchronized void flush() throws MutationsRejectedException { + Preconditions.checkState(!closed); + + try { + var splits = splitSupplier.get(); + + Path tmpDir = new Path(workPath, UUID.randomUUID().toString()); + fileSystem.mkdirs(tmpDir); + mutations.sort((m1, m2) -> Arrays.compare(m1.getRow(), m2.getRow())); + + RFileWriter writer = null; + byte[] currEndRow = null; + int nextFileNameCounter = 0; + + var loadPlanBuilder = LoadPlan.builder(); + + for (var mutation : mutations) { + if (writer == null + || (currEndRow != null && Arrays.compare(mutation.getRow(), currEndRow) > 0)) { + if (writer != null) { + writer.close(); + } + + var row = new Text(mutation.getRow()); + var headSet = splits.headSet(row); + var tabletPrevRow = headSet.isEmpty() ? null : headSet.last(); + var tailSet = splits.tailSet(row); + var tabletEndRow = tailSet.isEmpty() ? null : tailSet.first(); + currEndRow = tabletEndRow == null ? null : tabletEndRow.copyBytes(); + + String filename = String.format("bbw-%05d.rf", nextFileNameCounter++); + writer = RFile.newWriter().to(tmpDir + "/" + filename).withFileSystem(fileSystem).build(); + loadPlanBuilder = loadPlanBuilder.loadFileTo(filename, LoadPlan.RangeType.TABLE, + tabletPrevRow, tabletEndRow); + + log.debug("Created new file {} for range {} {}", filename, tabletPrevRow, tabletEndRow); + } + + for (var colUpdate : mutation.getUpdates()) { + var key = new Key(mutation.getRow(), colUpdate.getColumnFamily(), + colUpdate.getColumnQualifier(), colUpdate.getColumnVisibility()); + if (colUpdate.hasTimestamp()) { + key.setTimestamp(colUpdate.getTimestamp()); + } + if (colUpdate.isDeleted()) { + key.setDeleted(true); + } + writer.append(key, colUpdate.getValue()); + } + } + + if (writer != null) { + writer.close(); + } + + // TODO make table time configurable? + var loadPlan = loadPlanBuilder.build(); + + long t1 = System.nanoTime(); + client.tableOperations().importDirectory(tmpDir.toString()).to(tableName).plan(loadPlan) + .tableTime(true).load(); + long t2 = System.nanoTime(); + + log.debug("Bulk imported dir {} destinations:{} mutations:{} memUsed:{} time:{}ms", tmpDir, + loadPlan.getDestinations().size(), mutations.size(), memUsed, + TimeUnit.NANOSECONDS.toMillis(t2 - t1)); + + fileSystem.delete(tmpDir, true); + + mutations.clear(); + memUsed = 0; + } catch (Exception e) { + closed = true; + throw new MutationsRejectedException(client, List.of(), Map.of(), List.of(), 1, e); + } + } + + @Override + public synchronized void close() throws MutationsRejectedException { + flush(); + closed = true; + } +} diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java index 1bb32a5c..ec2ef566 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java @@ -20,13 +20,18 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.Random; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -38,10 +43,15 @@ import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.testing.TestProps; import org.apache.accumulo.testing.util.FastFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; public class ContinuousIngest { @@ -56,6 +66,137 @@ public class ContinuousIngest { private static int pauseMin; private static int pauseMax; + public interface RandomGeneratorFactory extends Supplier { + static RandomGeneratorFactory create(ContinuousEnv env, AccumuloClient client, Random random) { + final long rowMin = env.getRowMin(); + final long rowMax = env.getRowMax(); + Properties testProps = env.getTestProperties(); + final int maxTablets = + Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_MAX_TABLETS)); + + if (maxTablets == Integer.MAX_VALUE) { + return new MinMaxRandomGeneratorFactory(rowMin, rowMax, random); + } else { + var tableName = env.getAccumuloTableName(); + Supplier> splitSupplier = Suppliers.memoizeWithExpiration(() -> { + try { + var splits = client.tableOperations().listSplits(tableName); + return new TreeSet<>(splits); + } catch (Exception e) { + throw new IllegalStateException(e); + } + + }, 10, TimeUnit.MINUTES); + return new MaxTabletsRandomGeneratorFactory(rowMin, rowMax, maxTablets, splitSupplier, + random); + } + } + } + + public static class MinMaxRandomGeneratorFactory implements RandomGeneratorFactory { + private final LongSupplier generator; + + public MinMaxRandomGeneratorFactory(long rowMin, long rowMax, Random random) { + Preconditions.checkState(0 <= rowMin && rowMin <= rowMax, + "Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax"); + generator = () -> ContinuousIngest.genLong(rowMin, rowMax, random); + } + + @Override + public LongSupplier get() { + return generator; + } + } + + /** + * Chooses X random tablets and only generates random rows that fall within those tablets. + */ + public static class MaxTabletsRandomGeneratorFactory implements RandomGeneratorFactory { + private final int maxTablets; + private final Supplier> splitSupplier; + private final Random random; + private final long minRow; + private final long maxRow; + + public MaxTabletsRandomGeneratorFactory(long minRow, long maxRow, int maxTablets, + Supplier> splitSupplier, Random random) { + // writing to a single tablet does not make much sense because this test it predicated on + // having rows in tablets point to rows in other tablet to detect errors + Preconditions.checkState(maxTablets > 1, "max tablets config must be > 1"); + this.maxTablets = maxTablets; + this.splitSupplier = splitSupplier; + this.random = random; + this.minRow = minRow; + this.maxRow = maxRow; + } + + @Override + public LongSupplier get() { + var splits = splitSupplier.get(); + if (splits.size() < maxTablets) { + // There are less tablets so generate within the tablet range + return new MinMaxRandomGeneratorFactory(minRow, maxRow, random).get(); + } else { + long prev = minRow; + List allGenerators = new ArrayList<>(splits.size() + 1); + for (var split : splits) { + // splits are derived from inspecting rfile indexes and rfile indexes can shorten rows + // introducing non-hex chars so need to handle non-hex chars in the splits + // TODO this handling may not be correct, but it will not introduce errors but may cause + // writing a small amount of data to an extra tablet. + byte[] bytes = split.copyBytes(); + int len = bytes.length; + int last = bytes.length - 1; + if (bytes[last] < '0') { + len = last; + } else if (bytes[last] > '9' && bytes[last] < 'a') { + bytes[last] = '9'; + } else if (bytes[last] > 'f') { + bytes[last] = 'f'; + } + + var splitStr = new String(bytes, 0, len, UTF_8); + var splitNum = Long.parseLong(splitStr, 16) << (64 - splitStr.length() * 4); + allGenerators.add(new MinMaxRandomGeneratorFactory(prev, splitNum, random).get()); + prev = splitNum; + } + allGenerators.add(new MinMaxRandomGeneratorFactory(prev, maxRow, random).get()); + + Collections.shuffle(allGenerators, random); + var generators = List.copyOf(allGenerators.subList(0, maxTablets)); + + return () -> { + // pick a generator for random tablet + var generator = generators.get(random.nextInt(generators.size())); + // pick a random long that falls within that tablet + return generator.getAsLong(); + }; + } + } + } + + public interface BatchWriterFactory { + BatchWriter create(String tableName) throws TableNotFoundException; + + static BatchWriterFactory create(AccumuloClient client, ContinuousEnv env) { + Properties testProps = env.getTestProperties(); + final String bulkWorkDir = testProps.getProperty(TestProps.CI_INGEST_BULK_WORK_DIR); + if (bulkWorkDir == null || bulkWorkDir.isBlank()) { + return client::createBatchWriter; + } else { + try { + var conf = new Configuration(); + var workDir = new Path(bulkWorkDir); + var filesystem = workDir.getFileSystem(conf); + var memLimit = Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_BULK_MEM_LIMIT)); + return tableName -> new BulkBatchWriter(client, tableName, filesystem, workDir, memLimit); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + } + private static ColumnVisibility getVisibility(Random rand) { return visibilities.get(rand.nextInt(visibilities.size())); } @@ -104,8 +245,6 @@ public static void main(String[] args) throws Exception { AccumuloClient client = env.getAccumuloClient(); - final long rowMin = env.getRowMin(); - final long rowMax = env.getRowMax(); String tableName = env.getAccumuloTableName(); Properties testProps = env.getTestProperties(); final int maxColF = env.getMaxColF(); @@ -116,17 +255,17 @@ public static void main(String[] args) throws Exception { final boolean checksum = Boolean.parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM)); - doIngest(client, rowMin, rowMax, tableName, testProps, maxColF, maxColQ, numEntries, checksum, - random); + var randomFactory = RandomGeneratorFactory.create(env, client, random); + var batchWriterFactory = BatchWriterFactory.create(client, env); + doIngest(client, randomFactory, batchWriterFactory, tableName, testProps, maxColF, maxColQ, + numEntries, checksum, random); } } - protected static void doIngest(AccumuloClient client, long rowMin, long rowMax, String tableName, - Properties testProps, int maxColF, int maxColQ, long numEntries, boolean checksum, - Random random) + protected static void doIngest(AccumuloClient client, RandomGeneratorFactory randomFactory, + BatchWriterFactory batchWriterFactory, String tableName, Properties testProps, int maxColF, + int maxColQ, long numEntries, boolean checksum, Random random) throws TableNotFoundException, MutationsRejectedException, InterruptedException { - Preconditions.checkState(0 <= rowMin && rowMin <= rowMax, - "Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax"); if (!client.tableOperations().exists(tableName)) { throw new TableNotFoundException(null, tableName, @@ -173,14 +312,16 @@ protected static void doIngest(AccumuloClient client, long rowMin, long rowMax, log.info("DELETES will occur with a probability of {}", String.format("%.02f", deleteProbability)); - try (BatchWriter bw = client.createBatchWriter(tableName)) { + try (BatchWriter bw = batchWriterFactory.create(tableName)) { out: while (true) { ColumnVisibility cv = getVisibility(random); // generate sets nodes that link to previous set of nodes for (int depth = 0; depth < maxDepth; depth++) { + // use the same random generator for each flush interval + LongSupplier randomRowGenerator = randomFactory.get(); for (int index = 0; index < flushInterval; index++) { - long rowLong = genLong(rowMin, rowMax, random); + long rowLong = randomRowGenerator.getAsLong(); byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 1][index].row); @@ -303,6 +444,9 @@ public static byte[] genCol(int cfInt) { return FastFormat.toZeroPaddedString(cfInt, 4, 16, EMPTY_BYTES); } + /** + * Generates a random long within the range [min,max) + */ public static long genLong(long min, long max, Random r) { return ((r.nextLong() & 0x7fffffffffffffffL) % (max - min)) + min; } diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java b/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java index 6a965742..b590e1a8 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java @@ -45,6 +45,7 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.testing.TestProps; +import org.apache.accumulo.testing.continuous.ContinuousIngest.RandomGeneratorFactory; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,8 +109,10 @@ public static void main(String[] args) throws Exception { Map.of()); log.info("Ingesting {} entries into first table, {}.", initialData, firstTable); - ContinuousIngest.doIngest(client, rowMin, rowMax, firstTable, testProps, maxColF, maxColQ, - initialData, false, random); + var randomFactory = RandomGeneratorFactory.create(env, client, random); + var batchWriterFactory = ContinuousIngest.BatchWriterFactory.create(client, env); + ContinuousIngest.doIngest(client, randomFactory, batchWriterFactory, firstTable, testProps, + maxColF, maxColQ, initialData, false, random); client.tableOperations().flush(firstTable); From 50ab83b215602521255c7deed16ae42d8cde8878 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Sun, 17 Nov 2024 22:41:55 +0000 Subject: [PATCH 2/2] consolidate getting table splits in the code --- .../testing/continuous/BulkBatchWriter.java | 14 +------ .../testing/continuous/ContinuousIngest.java | 39 ++++++++++++------- .../testing/continuous/ManySplits.java | 6 ++- 3 files changed, 30 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java index fe50575a..6e763b31 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.SortedSet; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -43,7 +42,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; public class BulkBatchWriter implements BatchWriter { @@ -61,21 +59,13 @@ public class BulkBatchWriter implements BatchWriter { private boolean closed = false; public BulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileSystem, - Path workPath, long memLimit) { + Path workPath, long memLimit, Supplier> splitSupplier) { this.client = client; this.tableName = tableName; this.fileSystem = fileSystem; this.workPath = workPath; this.memLimit = memLimit; - this.splitSupplier = Suppliers.memoizeWithExpiration(() -> { - try { - var splits = client.tableOperations().listSplits(tableName); - return new TreeSet<>(splits); - } catch (Exception e) { - throw new IllegalStateException(e); - } - - }, 10, TimeUnit.MINUTES); + this.splitSupplier = splitSupplier; } @Override diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java index ec2ef566..cf34eeb4 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java @@ -67,7 +67,8 @@ public class ContinuousIngest { private static int pauseMax; public interface RandomGeneratorFactory extends Supplier { - static RandomGeneratorFactory create(ContinuousEnv env, AccumuloClient client, Random random) { + static RandomGeneratorFactory create(ContinuousEnv env, AccumuloClient client, + Supplier> splitSupplier, Random random) { final long rowMin = env.getRowMin(); final long rowMax = env.getRowMax(); Properties testProps = env.getTestProperties(); @@ -78,15 +79,6 @@ static RandomGeneratorFactory create(ContinuousEnv env, AccumuloClient client, R return new MinMaxRandomGeneratorFactory(rowMin, rowMax, random); } else { var tableName = env.getAccumuloTableName(); - Supplier> splitSupplier = Suppliers.memoizeWithExpiration(() -> { - try { - var splits = client.tableOperations().listSplits(tableName); - return new TreeSet<>(splits); - } catch (Exception e) { - throw new IllegalStateException(e); - } - - }, 10, TimeUnit.MINUTES); return new MaxTabletsRandomGeneratorFactory(rowMin, rowMax, maxTablets, splitSupplier, random); } @@ -134,7 +126,7 @@ public MaxTabletsRandomGeneratorFactory(long minRow, long maxRow, int maxTablets public LongSupplier get() { var splits = splitSupplier.get(); if (splits.size() < maxTablets) { - // There are less tablets so generate within the tablet range + // There are less tablets so generate within the entire range return new MinMaxRandomGeneratorFactory(minRow, maxRow, random).get(); } else { long prev = minRow; @@ -178,7 +170,8 @@ public LongSupplier get() { public interface BatchWriterFactory { BatchWriter create(String tableName) throws TableNotFoundException; - static BatchWriterFactory create(AccumuloClient client, ContinuousEnv env) { + static BatchWriterFactory create(AccumuloClient client, ContinuousEnv env, + Supplier> splitSupplier) { Properties testProps = env.getTestProperties(); final String bulkWorkDir = testProps.getProperty(TestProps.CI_INGEST_BULK_WORK_DIR); if (bulkWorkDir == null || bulkWorkDir.isBlank()) { @@ -189,7 +182,8 @@ static BatchWriterFactory create(AccumuloClient client, ContinuousEnv env) { var workDir = new Path(bulkWorkDir); var filesystem = workDir.getFileSystem(conf); var memLimit = Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_BULK_MEM_LIMIT)); - return tableName -> new BulkBatchWriter(client, tableName, filesystem, workDir, memLimit); + return tableName -> new BulkBatchWriter(client, tableName, filesystem, workDir, memLimit, + splitSupplier); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -239,6 +233,20 @@ private static void pauseCheck(Random rand) throws InterruptedException { } } + static Supplier> createSplitSupplier(AccumuloClient client, String tableName) { + + Supplier> splitSupplier = Suppliers.memoizeWithExpiration(() -> { + try { + var splits = client.tableOperations().listSplits(tableName); + return new TreeSet<>(splits); + } catch (Exception e) { + throw new IllegalStateException(e); + } + + }, 10, TimeUnit.MINUTES); + return splitSupplier; + } + public static void main(String[] args) throws Exception { try (ContinuousEnv env = new ContinuousEnv(args)) { @@ -255,8 +263,9 @@ public static void main(String[] args) throws Exception { final boolean checksum = Boolean.parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM)); - var randomFactory = RandomGeneratorFactory.create(env, client, random); - var batchWriterFactory = BatchWriterFactory.create(client, env); + var splitSupplier = createSplitSupplier(client, tableName); + var randomFactory = RandomGeneratorFactory.create(env, client, splitSupplier, random); + var batchWriterFactory = BatchWriterFactory.create(client, env, splitSupplier); doIngest(client, randomFactory, batchWriterFactory, tableName, testProps, maxColF, maxColQ, numEntries, checksum, random); } diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java b/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java index b590e1a8..d28f557b 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java @@ -109,8 +109,10 @@ public static void main(String[] args) throws Exception { Map.of()); log.info("Ingesting {} entries into first table, {}.", initialData, firstTable); - var randomFactory = RandomGeneratorFactory.create(env, client, random); - var batchWriterFactory = ContinuousIngest.BatchWriterFactory.create(client, env); + var splitSupplier = ContinuousIngest.createSplitSupplier(client, firstTable); + var randomFactory = RandomGeneratorFactory.create(env, client, splitSupplier, random); + var batchWriterFactory = + ContinuousIngest.BatchWriterFactory.create(client, env, splitSupplier); ContinuousIngest.doIngest(client, randomFactory, batchWriterFactory, firstTable, testProps, maxColF, maxColQ, initialData, false, random);