Skip to content

Commit

Permalink
Preliminary commit for async repair endpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Miles-Garnsey committed Aug 22, 2023
1 parent c8ade80 commit e84942f
Showing 1 changed file with 27 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import java.io.IOException;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand Down Expand Up @@ -52,6 +53,7 @@
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.Pair;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -736,27 +738,34 @@ public void clearSnapshots(
public void repair(
@RpcParam(name = "keyspaceName") String keyspace,
@RpcParam(name = "tables") List<String> tables,
@RpcParam(name = "full") Boolean full)
@RpcParam(name = "full") Boolean full,
@RpcParam(name = "beginToken") BigInteger beginToken,
@RpcParam(name = "endToken") BigInteger endToken,
@RpcParam(name = "repairParallelism") RepairParallelism repairParallelism,
@RpcParam(name = "datacenters") Collection<String> datacenters,
@RpcParam(name = "associatedTokens") List<RingRange> associatedTokens,
@RpcParam(name = "repairThreadCount") int repairThreadCount)
throws IOException {
// At least one keyspace is required
if (keyspace != null) {
// create the repair spec
Map<String, String> repairSpec = new HashMap<>();

// add any specified tables to the repair spec
if (tables != null && !tables.isEmpty()) {
// set the tables/column families
repairSpec.put(RepairOption.COLUMNFAMILIES_KEY, String.join(",", tables));
}

// handle incremental vs full
boolean isIncremental = Boolean.FALSE.equals(full);
repairSpec.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(isIncremental));
if (isIncremental) {
// incremental repairs will fail if parallelism is not set
repairSpec.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.getName());
Map<String, String> options = new HashMap<>();
options.put(RepairOption.PARALLELISM_KEY, repairParallelism.getName());
options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full));
options.put(
RepairOption.JOB_THREADS_KEY,
Integer.toString(repairThreadCount == 0 ? 1 : repairThreadCount));
options.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE));
options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(tables, ","));
if (full) {
options.put(
RepairOption.RANGES_KEY,
StringUtils.join(
associatedTokens
.stream()
.map(token -> token.getStart() + ":" + token.getEnd())
.collect(Collectors.toList()),
","));
}
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec);
}
}

Expand All @@ -778,4 +787,4 @@ public String move(

return submitJob("move", moveOperation, async);
}
}
}

0 comments on commit e84942f

Please sign in to comment.