Skip to content

Commit

Permalink
Refactor createtable and add ability to manysplits
Browse files Browse the repository at this point in the history
  • Loading branch information
DomGarguilo committed Dec 15, 2023
1 parent 506d275 commit 941e113
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 52 deletions.
8 changes: 4 additions & 4 deletions conf/accumulo-testing.properties
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ test.ci.bulk.reducers.max=1024
# Splits Scaling
# -----------
# The number of tables to create
test.ci.split.table.count=3
test.ci.split.table.count=10
# Minimum random row to generate
test.ci.split.ingest.row.min=0
# Maximum random row to generate
Expand All @@ -152,16 +152,16 @@ test.ci.split.ingest.row.max=9223372036854775807
test.ci.split.ingest.max.cf=32767
# Maximum number of random column qualifiers to generate
test.ci.split.ingest.max.cq=32767
# The number of splits to add to each table on creation
test.ci.split.initial.splits=0
# The number of tablets to create on each table on table creation
test.ci.split.initial.tablets=1
# The amount of data to write to each table
test.ci.split.write.size=10000000
# The split threshold to set for each table on creation
test.ci.split.threshold=1G
# The factor to reduce the split threshold by for each iteration of the test
test.ci.split.threshold.reduction.factor=10
# Number of rounds to run the test
test.ci.split.test.rounds=4
test.ci.split.test.rounds=3

###############################
# Garbage Collection Simulation
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/apache/accumulo/testing/TestProps.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public class TestProps {
public static final String CI_SPLIT_INGEST_ROW_MAX = CI_SPLIT + "ingest.row.max";
public static final String CI_SPLIT_INGEST_MAX_CF = CI_SPLIT + "ingest.max.cf";
public static final String CI_SPLIT_INGEST_MAX_CQ = CI_SPLIT + "ingest.max.cq";
public static final String CI_SPLIT_INITIAL_SPLITS = CI_SPLIT + "initial.splits";
public static final String CI_SPLIT_INITIAL_TABLETS = CI_SPLIT + "initial.tablets";
public static final String CI_SPLIT_WRITE_SIZE = CI_SPLIT + "write.size";
public static final String CI_SPLIT_THRESHOLD = CI_SPLIT + "threshold";
public static final String CI_SPLIT_THRESHOLD_REDUCTION_FACTOR =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,54 +41,67 @@ public static void main(String[] args) throws Exception {

try (ContinuousEnv env = new ContinuousEnv(args)) {
AccumuloClient client = env.getAccumuloClient();

String tableName = env.getAccumuloTableName();
if (client.tableOperations().exists(tableName)) {
log.error("Accumulo table {} already exists", tableName);
System.exit(-1);
}

int numTablets = Integer.parseInt(env.getTestProperty(CI_COMMON_ACCUMULO_NUM_TABLETS));
long rowMin = env.getRowMin();
long rowMax = env.getRowMax();
Map<String,String> serverProps = getProps(env, TestProps.CI_COMMON_ACCUMULO_SERVER_PROPS);
Map<String,String> tableProps = getProps(env, TestProps.CI_COMMON_ACCUMULO_TABLE_PROPS);

if (numTablets < 1) {
log.error("numTablets < 1");
System.exit(-1);
}
if (env.getRowMin() >= env.getRowMax()) {
log.error("min >= max");
System.exit(-1);
}
createTable(client, tableName, numTablets, rowMin, rowMax, serverProps, tableProps);
}
}

public static void createTable(AccumuloClient client, String tableName, int numTablets,
long rowMin, long rowMax, Map<String,String> serverProps, Map<String,String> tableProps)
throws Exception {
if (client.tableOperations().exists(tableName)) {
log.error("Accumulo table {} already exists", tableName);
System.exit(-1);
}

// retrieve and set tserver props
Map<String,String> props = getProps(env, TestProps.CI_COMMON_ACCUMULO_SERVER_PROPS);
if (numTablets < 1) {
log.error("numTablets < 1");
System.exit(-1);
}
if (rowMin >= rowMax) {
log.error("min >= max");
System.exit(-1);
}

// set tserver props
if (!serverProps.isEmpty()) {
try {
client.instanceOperations().modifyProperties(properties -> properties.putAll(props));
client.instanceOperations().modifyProperties(properties -> properties.putAll(serverProps));
} catch (AccumuloException | AccumuloSecurityException e) {
log.error("Failed to set tserver props");
throw new Exception(e);
}
}

NewTableConfiguration ntc = new NewTableConfiguration();

if (numTablets > 1) {
SortedSet<Text> splits = new TreeSet<>();
final int numSplits = numTablets - 1;
final long distance = ((env.getRowMax() - env.getRowMin()) / numTablets) + 1;
final long distance = ((rowMax - rowMin) / numTablets) + 1;
long split = distance;
for (int i = 0; i < numSplits; i++) {
String s = String.format("%016x", split + env.getRowMin());
String s = String.format("%016x", split + rowMin);
while (s.charAt(s.length() - 1) == '0') {
s = s.substring(0, s.length() - 1);
}
splits.add(new Text(s));
split += distance;
}

NewTableConfiguration ntc = new NewTableConfiguration();
ntc.withSplits(splits);
ntc.setProperties(getProps(env, TestProps.CI_COMMON_ACCUMULO_TABLE_PROPS));
}

client.tableOperations().create(tableName, ntc);
ntc.setProperties(tableProps);

log.info("Created Accumulo table {} with {} tablets", tableName, numTablets);
}
client.tableOperations().create(tableName, ntc);

log.info("Created Accumulo table {} with {} tablets", tableName, numTablets);
}

private static Map<String,String> getProps(ContinuousEnv env, String propType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CloneConfiguration;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
Expand Down Expand Up @@ -69,8 +67,8 @@ public static void main(String[] args) throws Exception {
final long rowMax = Long.parseLong(testProps.getProperty(TestProps.CI_SPLIT_INGEST_ROW_MAX));
final int maxColF = Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INGEST_MAX_CF));
final int maxColQ = Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INGEST_MAX_CQ));
final int initialSplits =
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INITIAL_SPLITS));
final int initialTabletCount =
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_INITIAL_TABLETS));
final int initialData =
Integer.parseInt(testProps.getProperty(TestProps.CI_SPLIT_WRITE_SIZE));
String initialSplitThresholdStr = testProps.getProperty(TestProps.CI_SPLIT_THRESHOLD);
Expand All @@ -94,24 +92,20 @@ public static void main(String[] args) throws Exception {
try {
client.namespaceOperations().create(NAMESPACE);
} catch (NamespaceExistsException e) {
log.info("The namespace '{}' already exists. Continuing with existing namespace.",
log.warn("The namespace '{}' already exists. Continuing with existing namespace.",
NAMESPACE);
}

final NewTableConfiguration ntc = new NewTableConfiguration();
ntc.setProperties(Map.of(Property.TABLE_SPLIT_THRESHOLD.getKey(), initialSplitThresholdStr));
final String firstTable = tableNames.get(0);

log.info("Properties being used to create tables for this test: {}",
ntc.getProperties().toString());
Map<String,String> tableProps =
Map.of(Property.TABLE_SPLIT_THRESHOLD.getKey(), initialSplitThresholdStr);

final String firstTable = tableNames.get(0);
log.info("Properties being used to create tables for this test: {}", tableProps);

log.info("Creating initial table: {}", firstTable);
try {
client.tableOperations().create(firstTable, ntc);
} catch (TableExistsException e) {
log.info("Test probably wont work if the table already exists with data present", e);
}
CreateTable.createTable(client, firstTable, initialTabletCount, rowMin, rowMax, tableProps,
Map.of());

log.info("Ingesting {} entries into first table, {}.", initialData, firstTable);
ContinuousIngest.doIngest(client, rowMin, rowMax, firstTable, testProps, maxColF, maxColQ,
Expand All @@ -123,17 +117,18 @@ public static void main(String[] args) throws Exception {
log.info("Creating {} more tables by cloning the first", tableCount - 1);
tableNames.stream().parallel().skip(1).forEach(tableName -> {
try {
client.tableOperations().clone(firstTable, tableName,
CloneConfiguration.builder().build());
client.tableOperations().clone(firstTable, tableName, true, null, null);
} catch (TableExistsException e) {
log.info(
"table {} already exists. Continuing with existing table. This might mess with the expected values",
log.warn(
"table {} already exists. Continuing with existing table. Previous data will affect splits",
tableName);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

SECONDS.sleep(5);

// main loop
// reduce the split threshold then wait for the expected file size per tablet to be reached
long previousSplitThreshold = initialSplitThreshold;
Expand Down Expand Up @@ -184,10 +179,11 @@ public static void main(String[] args) throws Exception {
if (elapsedMillis % SECONDS.toMillis(3) == 0) {
double averageFileSize =
offendingTabletSizes.stream().mapToLong(l -> l).average().orElse(0);
long diff = (long) (averageFileSize - splitThreshold);
log.info(
"{} has {} tablets whose file sizes are not yet <= {}. Avg. offending file size: {}",
tableName, offendingTabletSizes.size(), splitThreshold,
String.format("%.0f", averageFileSize));
"{} tablets have file sizes not yet <= {} on table {}. Diff of avg offending file(s): {}",
offendingTabletSizes.size(), splitThresholdStr, tableName,
bytesToMemoryString(diff));
}
MILLISECONDS.sleep(sleepMillis);
}
Expand All @@ -203,6 +199,8 @@ public static void main(String[] args) throws Exception {
splitThresholdStr, NANOSECONDS.toSeconds(timeTaken), NANOSECONDS.toMillis(timeTaken));
}

log.info("Test completed successfully.");

log.info("Deleting tables");
tableNames.stream().parallel().forEach(tableName -> {
try {
Expand Down

0 comments on commit 941e113

Please sign in to comment.