From 61d14c2b801e7f66ee7e456dce7593cd0eaab1d5 Mon Sep 17 00:00:00 2001 From: Miles-Garnsey Date: Tue, 22 Aug 2023 16:53:32 +1000 Subject: [PATCH] Preliminary commit for async repair endpoint. --- .../com/datastax/mgmtapi/NodeOpsProvider.java | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java index 6ea01acd..7e110ec0 100644 --- a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java +++ b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java @@ -24,6 +24,7 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import java.io.IOException; +import java.math.BigInteger; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -54,6 +55,7 @@ import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.progress.ProgressEventType; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -740,24 +742,32 @@ public String repair( @RpcParam(name = "tables") List tables, @RpcParam(name = "full") Boolean full, @RpcParam(name = "notifications") boolean notifications) + @RpcParam(name = "beginToken") BigInteger beginToken, + @RpcParam(name = "endToken") BigInteger endToken, + @RpcParam(name = "repairParallelism") RepairParallelism repairParallelism, + @RpcParam(name = "datacenters") Collection datacenters, + @RpcParam(name = "associatedTokens") List associatedTokens, + @RpcParam(name = "repairThreadCount") int repairThreadCount) throws IOException { // At least one keyspace is required if (keyspace != null) { - // create the repair spec - Map repairSpec = new HashMap<>(); - - // add any specified tables to the repair spec - if (tables != null && !tables.isEmpty()) { - // set the tables/column families - repairSpec.put(RepairOption.COLUMNFAMILIES_KEY, String.join(",", tables)); - } - - // handle incremental vs full - boolean isIncremental = Boolean.FALSE.equals(full); - repairSpec.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(isIncremental)); - if (isIncremental) { - // incremental repairs will fail if parallelism is not set - repairSpec.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.getName()); + Map options = new HashMap<>(); + options.put(RepairOption.PARALLELISM_KEY, repairParallelism.getName()); + options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full)); + options.put( + RepairOption.JOB_THREADS_KEY, + Integer.toString(repairThreadCount == 0 ? 1 : repairThreadCount)); + options.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE)); + options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(tables, ",")); + if (full) { + options.put( + RepairOption.RANGES_KEY, + StringUtils.join( + associatedTokens + .stream() + .map(token -> token.getStart() + ":" + token.getEnd()) + .collect(Collectors.toList()), + ",")); } // Since Cassandra provides us with a async, we don't need to use our executor interface for @@ -849,4 +859,4 @@ public String move( return submitJob("move", moveOperation, async); } -} +} \ No newline at end of file