Skip to content

Commit

Permalink
Add java-driver-3.x async benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
Bouncheck committed Aug 2, 2022
1 parent eee94a9 commit 9aed68a
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 0 deletions.
15 changes: 15 additions & 0 deletions benchmarks/basic/java-driver-3.x-async/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM ubuntu:18.04
RUN apt update

# Install java 8
RUN apt install -y openjdk-8-jdk
RUN apt install -y maven
#RUN update-alternatives --set java /usr/lib/jvm/jdk1.8.0_version/bin/java
RUN export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_version

# Copy benchmark code into the container
COPY source /source
WORKDIR /source

# Compile the code
RUN mvn clean package
2 changes: 2 additions & 0 deletions benchmarks/basic/java-driver-3.x-async/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
docker build . -t rust-driver-benchmarks-basic-java-driver-3.x-async
3 changes: 3 additions & 0 deletions benchmarks/basic/java-driver-3.x-async/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash
docker run --rm -it --network host rust-driver-benchmarks-basic-java-driver-3.x-async \
java -cp /source/target/source-1.0-SNAPSHOT.jar MainClass "$@"
50 changes: 50 additions & 0 deletions benchmarks/basic/java-driver-3.x-async/source/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>source</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/commons-cli/commons-cli -->
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.5.0</version>
</dependency>

<dependency>
<groupId>com.scylladb</groupId>
<artifactId>scylla-driver-core</artifactId>
<version>3.11.2.0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>



</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import java.util.Arrays;

import org.apache.commons.cli.*;

class Config {

enum Workload {
Inserts, Selects, Mixed,
}

String[] node_addresses;
Workload workload;
long tasks;
long concurrency;
boolean dont_prepare;

Config(String[] args) {

this.node_addresses = new String[]{"127.0.0.1"};
this.workload = Workload.Inserts;
this.tasks = 1000 * 1000;
this.concurrency = 1024;
this.dont_prepare = false;

Options options = new Options();

options.addOption("d", "dont-prepare", false, "Don't create tables and insert into them before the benchmark");
options.addOption("n", "nodes", true, "Addresses of database nodes to connect to separated by a comma");
options.addOption("w", "workload", true, "Type of work to perform (Inserts, Selects, Mixed)");
options.addOption("t", "tasks", true, "Total number of tasks (requests) to perform the during benchmark. In case of mixed workload there will be tasks inserts and tasks selects");
options.addOption("c", "concurrency", true, "Maximum number of requests performed at once");

try {
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);

if (cmd.hasOption("dont-prepare")) {
this.dont_prepare = true;
}

if (cmd.hasOption("nodes")) {
String value = cmd.getOptionValue("nodes");
node_addresses = value.split(",");
}

if (cmd.hasOption("workload")) {
String workloadValue = cmd.getOptionValue("workload");
this.workload = Workload.valueOf(workloadValue);
}

if (cmd.hasOption("tasks")) {
this.tasks = Integer.parseInt(cmd.getOptionValue("tasks"));
}

if (cmd.hasOption("concurrency")) {
this.concurrency = Integer.parseInt(cmd.getOptionValue("concurrency"));
}

} catch (ParseException e) {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp("./run.sh [OPTION]...", options);
System.out.println();
System.out.println("Unexpected exception: " + e.getMessage());
}
}

@Override
public String toString() {
return "Config{" +
"node_addresses=" + Arrays.toString(node_addresses) +
", workload=" + workload +
", tasks=" + tasks +
", concurrency=" + concurrency +
", dont_prepare=" + dont_prepare +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import com.datastax.driver.core.*;
import com.google.common.util.concurrent.*;

import java.util.ArrayList;
import java.util.concurrent.*;

public class MainClass {

private static Cluster cluster;
private static Config config;
private static final String INSERT_STRING = "INSERT INTO benchks.benchtab (pk, v1, v2) VALUES(?, ?, ?)";
private static final String SELECT_STRING = "SELECT v1, v2 FROM benchks.benchtab WHERE pk = ?";

private static PreparedStatement INSERT_PS;
private static PreparedStatement SELECT_PS;

public static void main(String[] args) throws InterruptedException, ExecutionException {

config = new Config(args);
System.out.println("Parsed config: ");
System.out.println(config.toString());

cluster = Cluster.builder().addContactPoints(config.node_addresses).withProtocolVersion(ProtocolVersion.V4).build();
cluster.getConfiguration().getPoolingOptions().setMaxQueueSize((int) Math.max(2048, 2 * config.concurrency));
Session session = cluster.connect();

prepareKeyspaceAndTable(session);

if (!config.dont_prepare) {
prepareKeyspaceAndTable(session);

if (config.workload.equals(Config.Workload.Selects)) {
prepareSelectsBenchmark(session);
}
}


ArrayList<CompletableFuture<ResultSet>> arr = new ArrayList<>();

System.out.println("Starting the benchmark");

long benchmarkStart = System.nanoTime();

INSERT_PS = session.prepare(INSERT_STRING);
SELECT_PS = session.prepare(SELECT_STRING);

for (int i = 0; i < config.concurrency; i++) {
if (i + 1 == config.concurrency) {
arr.add(execute(session, i * (config.tasks / config.concurrency), config.tasks));
} else {
arr.add(execute(session, i * (config.tasks / config.concurrency), (i + 1) * (config.tasks / config.concurrency)));
}
}

for (Future<?> f : arr) {
f.get(); // make sure nothing has thrown and everything finished
}

long benchmarkEnd = System.nanoTime();
System.out.println(String.format("Finished\nBenchmark time: %d ms\n", (benchmarkEnd - benchmarkStart) / 1_000_000));

session.close();
if (cluster != null) cluster.close();
}

static void prepareKeyspaceAndTable(Session session) {
session.execute("DROP KEYSPACE IF EXISTS benchks");
session.execute("CREATE KEYSPACE IF NOT EXISTS benchks WITH REPLICATION = {'class' " + ": 'SimpleStrategy', 'replication_factor' : 1}");
session.execute("CREATE TABLE IF NOT EXISTS benchks.benchtab (pk " + "bigint PRIMARY KEY, v1 bigint, v2 bigint)");
if (!cluster.getMetadata().checkSchemaAgreement()) {
throw new RuntimeException("Schema not in agreement after preparing keyspace and table.");
}
}

private static void prepareSelectsBenchmark(Session session) throws InterruptedException, ExecutionException {
System.out.println("Preparing a selects benchmark (inserting values)...");

ArrayList<CompletableFuture<ResultSet>> arr = new ArrayList<>();
INSERT_PS = session.prepare(INSERT_STRING);

Config.Workload originalWorkload = config.workload;
config.workload = Config.Workload.Inserts; // Switch for setup purposes

for (int i = 0; i < config.concurrency; i++) {
arr.add(execute(session, i * (config.tasks / config.concurrency), (i + 1) * (config.tasks / config.concurrency)));
}
for (Future<?> f : arr) {
f.get(); // make sure nothing has thrown and everything finished
}

config.workload = originalWorkload;
}

public static CompletableFuture<ResultSet> execute(Session s, long currentIter, long maxIter) {
if (currentIter >= maxIter) {
// No more iterations
return CompletableFuture.completedFuture(null);
}

ListenableFuture<ResultSet> fut = null;
if (config.workload.equals(Config.Workload.Inserts) || config.workload.equals(Config.Workload.Mixed)) {
fut = s.executeAsync(INSERT_PS.bind(currentIter, 2L * currentIter, 3L * currentIter));
}

if (config.workload.equals(Config.Workload.Selects)) {
fut = s.executeAsync(SELECT_PS.bind(currentIter));

} else if (config.workload.equals(Config.Workload.Mixed)) {
fut = Futures.transform(fut, new AsyncFunction<ResultSet, ResultSet>() {
public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {
return (s.executeAsync(SELECT_PS.bind(currentIter)));
}
});
}

if (config.workload.equals(Config.Workload.Selects) || config.workload.equals(Config.Workload.Mixed)) {
fut = Futures.transform(fut, new AsyncFunction<ResultSet, ResultSet>() {
public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {
Row r = rs.one();
if ((r.getLong("v1") != 2L * currentIter) || (r.getLong("v2") != 3L * currentIter)) {
throw new RuntimeException(String.format("Received incorrect data. " + "Expected: (%s, %s, %s). " + "Received: (%s, %s ,%s).", currentIter, 2L * currentIter, 3L * currentIter, r.getLong("pk"), r.getLong("v1"), r.getLong("v2")));
}
return Futures.immediateFuture(rs);
}
});
}

// Convert ResultSetFuture to CompletableFuture
CompletableFuture<ResultSet> futCompletable = new CompletableFuture<>();
Futures.addCallback(fut, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
futCompletable.complete(result);
}

@Override
public void onFailure(Throwable t) {
futCompletable.completeExceptionally(t);
}
});

// Execute next iteration after that
return futCompletable.thenCompose(rs -> execute(s, currentIter + 1, maxIter));
}
}


0 comments on commit 9aed68a

Please sign in to comment.