Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zipfian distribution option to vary value size for continuous ingest #276

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions conf/accumulo-testing.properties
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ test.ci.ingest.pause.duration.max=120
# The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous ingest
# To disable deletes, set probability to 0.0
test.ci.ingest.delete.probability=0.1
# Enables Zipfian distribution for value size. If set to true, the value will have random bytes inserted into it with a size generated based on a Zipfian distribution.
test.ci.ingest.zipfian.enabled=true
# Minimum size to insert into the value when Zipfian distribution is enabled
test.ci.ingest.zipfian.min.size=0
# Maximum size to insert into the value when Zipfian distribution is enabled
test.ci.ingest.zipfian.max.size=10000
# Exponent of the Zipfian distribution
test.ci.ingest.zipfian.exponent=1.5

# Batch walker
# ------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.testing.TestProps;
import org.apache.accumulo.testing.util.FastFormat;
import org.apache.commons.math3.random.RandomDataGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,6 +57,13 @@ public class ContinuousIngest {
private static int pauseMin;
private static int pauseMax;

private static boolean zipfianEnabled;
private static int minSize;
private static int maxSize;
private static double exponent;

private static RandomDataGenerator rnd;

private static ColumnVisibility getVisibility(Random rand) {
return visibilities.get(rand.nextInt(visibilities.size()));
}
Expand Down Expand Up @@ -173,6 +181,18 @@ protected static void doIngest(AccumuloClient client, long rowMin, long rowMax,
log.info("DELETES will occur with a probability of {}",
String.format("%.02f", deleteProbability));

zipfianEnabled = Boolean.parseBoolean(testProps.getProperty("test.ci.ingest.zipfian.enabled"));

if (zipfianEnabled) {
minSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.min.size"));
maxSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.max.size"));
ctubbsii marked this conversation as resolved.
Show resolved Hide resolved
exponent = Double.parseDouble(testProps.getProperty("test.ci.ingest.zipfian.exponent"));
rnd = new RandomDataGenerator();

log.info("Zipfian distribution enabled with min size: {}, max size: {}, exponent: {}",
minSize, maxSize, exponent);
}

try (BatchWriter bw = client.createBatchWriter(tableName)) {
out: while (true) {
ColumnVisibility cv = getVisibility(random);
Expand Down Expand Up @@ -317,25 +337,54 @@ public static byte[] genRow(long rowLong) {

public static byte[] createValue(byte[] ingestInstanceId, long entriesWritten, byte[] prevRow,
Checksum cksum) {
int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3;
final int numOfSeparators = 4;
int dataLen =
ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + numOfSeparators;
if (cksum != null)
dataLen += 8;

int zipfLength = 0;
if (zipfianEnabled) {
// add the length of the zipfian data to the value
int range = maxSize - minSize;
zipfLength = rnd.nextZipf(range, exponent) + minSize;
dataLen += zipfLength;
}

byte[] val = new byte[dataLen];

// add the ingest instance id to the value
System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length);
int index = ingestInstanceId.length;

val[index++] = ':';

// add the count of entries written to the value
int added = FastFormat.toZeroPaddedString(val, index, entriesWritten, 16, 16, EMPTY_BYTES);
if (added != 16)
throw new RuntimeException(" " + added);
index += 16;

val[index++] = ':';

// add the previous row to the value
if (prevRow != null) {
System.arraycopy(prevRow, 0, val, index, prevRow.length);
index += prevRow.length;
}

val[index++] = ':';

if (zipfianEnabled) {
// add random data to the value of length zipfLength
for (int i = 0; i < zipfLength; i++) {
val[index++] = (byte) rnd.nextInt(0, 256);
}

val[index++] = ':';
}

// add the checksum to the value
if (cksum != null) {
cksum.update(val, 0, index);
cksum.getValue();
Expand Down
Loading