Skip to content

Commit

Permalink
Allow repair() RPC method to take more arguments (as required by Re…
Browse files Browse the repository at this point in the history
…aper). Get the old NodeOpsResources class calling the new RPC method correctly.
  • Loading branch information
Miles-Garnsey committed Aug 23, 2023
1 parent 61d14c2 commit ea1c395
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.datastax.mgmtapi.rpc.Rpc;
import com.datastax.mgmtapi.rpc.RpcParam;
import com.datastax.mgmtapi.rpc.RpcRegistry;
import com.datastax.mgmtapi.rpc.models.RingRange;
import com.datastax.mgmtapi.util.Job;
import com.datastax.mgmtapi.util.JobExecutor;
import com.datastax.oss.driver.api.core.CqlIdentifier;
Expand Down Expand Up @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -741,39 +743,49 @@ public String repair(
@RpcParam(name = "keyspaceName") String keyspace,
@RpcParam(name = "tables") List<String> tables,
@RpcParam(name = "full") Boolean full,
@RpcParam(name = "notifications") boolean notifications)
@RpcParam(name = "beginToken") BigInteger beginToken,
@RpcParam(name = "endToken") BigInteger endToken,
@RpcParam(name = "notifications") boolean notifications,
@RpcParam(name = "repairParallelism") RepairParallelism repairParallelism,
@RpcParam(name = "datacenters") Collection<String> datacenters,
@RpcParam(name = "associatedTokens") List<RingRange> associatedTokens,
@RpcParam(name = "repairThreadCount") int repairThreadCount)
throws IOException {
// At least one keyspace is required
if (keyspace != null) {
Map<String, String> options = new HashMap<>();
options.put(RepairOption.PARALLELISM_KEY, repairParallelism.getName());
options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full));
options.put(
RepairOption.JOB_THREADS_KEY,
Integer.toString(repairThreadCount == 0 ? 1 : repairThreadCount));
options.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE));
options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(tables, ","));
if (full) {
assert(keyspace != null);
Map<String, String> options = new HashMap<>();
repairParallelism.map(rPar ->
options.put(
RepairOption.PARALLELISM_KEY, rPar.getName()
)
);
options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full));
repairThreadCount.map(tCount ->
options.put(
RepairOption.JOB_THREADS_KEY,
Integer.toString(tCount == 0 ? 1 : tCount))
);
options.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE));
options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(tables, ","));
if (full) {
associatedTokens.map(aTokens ->
options.put(
RepairOption.RANGES_KEY,
StringUtils.join(
associatedTokens
aTokens
.stream()
.map(token -> token.getStart() + ":" + token.getEnd())
.collect(Collectors.toList()),
","));
}
",")
)
));
}
datacenters.map( dcs ->
options.put(RepairOption.DATACENTERS_KEY, StringUtils.join(dcs, ","))
);

// Since Cassandra provides us with a async, we don't need to use our executor interface for
// this.
final int repairJobId =
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec);
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, options);

if (!notifications) {
return Integer.valueOf(repairJobId).toString();
Expand Down Expand Up @@ -839,6 +851,7 @@ public String repair(
}

throw new RuntimeException("At least one keyspace must be defined");

}

@Rpc(name = "move")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
Expand Down Expand Up @@ -503,11 +504,15 @@ public Response repair(RepairRequest repairRequest) {
}
app.cqlService.executePreparedStatement(
app.dbUnixSocketFile,
"CALL NodeOps.repair(?, ?, ?, ?)",
"CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?)",
repairRequest.keyspaceName,
repairRequest.tables,
repairRequest.full,
false);
// The default repair does not allow for specifying things like parallelism, threadCounts, source DCs or ranges etc.
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());

return Response.ok("OK").build();
});
Expand Down Expand Up @@ -637,4 +642,4 @@ public Response move(@QueryParam(value = "newToken") String newToken) {
+ " \"language\": null,\n"
+ " \"encoding\": null\n"
+ "}";
}
}

0 comments on commit ea1c395

Please sign in to comment.