Skip to content

Commit

Permalink
Preliminary commit for async repair endpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Miles-Garnsey committed Aug 23, 2023
1 parent 4d2a772 commit 61d14c2
Showing 1 changed file with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import java.io.IOException;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.progress.ProgressEventType;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -740,24 +742,32 @@ public String repair(
@RpcParam(name = "tables") List<String> tables,
@RpcParam(name = "full") Boolean full,
@RpcParam(name = "notifications") boolean notifications)
@RpcParam(name = "beginToken") BigInteger beginToken,
@RpcParam(name = "endToken") BigInteger endToken,
@RpcParam(name = "repairParallelism") RepairParallelism repairParallelism,
@RpcParam(name = "datacenters") Collection<String> datacenters,
@RpcParam(name = "associatedTokens") List<RingRange> associatedTokens,
@RpcParam(name = "repairThreadCount") int repairThreadCount)
throws IOException {
// At least one keyspace is required
if (keyspace != null) {
// create the repair spec
Map<String, String> repairSpec = new HashMap<>();

// add any specified tables to the repair spec
if (tables != null && !tables.isEmpty()) {
// set the tables/column families
repairSpec.put(RepairOption.COLUMNFAMILIES_KEY, String.join(",", tables));
}

// handle incremental vs full
boolean isIncremental = Boolean.FALSE.equals(full);
repairSpec.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(isIncremental));
if (isIncremental) {
// incremental repairs will fail if parallelism is not set
repairSpec.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.getName());
Map<String, String> options = new HashMap<>();
options.put(RepairOption.PARALLELISM_KEY, repairParallelism.getName());
options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full));
options.put(
RepairOption.JOB_THREADS_KEY,
Integer.toString(repairThreadCount == 0 ? 1 : repairThreadCount));
options.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE));
options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(tables, ","));
if (full) {
options.put(
RepairOption.RANGES_KEY,
StringUtils.join(
associatedTokens
.stream()
.map(token -> token.getStart() + ":" + token.getEnd())
.collect(Collectors.toList()),
","));
}

// Since Cassandra provides us with a async, we don't need to use our executor interface for
Expand Down Expand Up @@ -849,4 +859,4 @@ public String move(

return submitJob("move", moveOperation, async);
}
}
}

0 comments on commit 61d14c2

Please sign in to comment.