diff --git a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java index 7e110ec0..6fc701ba 100644 --- a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java +++ b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java @@ -8,6 +8,7 @@ import com.datastax.mgmtapi.rpc.Rpc; import com.datastax.mgmtapi.rpc.RpcParam; import com.datastax.mgmtapi.rpc.RpcRegistry; +import com.datastax.mgmtapi.rpc.models.RingRange; import com.datastax.mgmtapi.util.Job; import com.datastax.mgmtapi.util.JobExecutor; import com.datastax.oss.driver.api.core.CqlIdentifier; @@ -35,6 +36,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -741,39 +743,49 @@ public String repair( @RpcParam(name = "keyspaceName") String keyspace, @RpcParam(name = "tables") List tables, @RpcParam(name = "full") Boolean full, - @RpcParam(name = "notifications") boolean notifications) - @RpcParam(name = "beginToken") BigInteger beginToken, - @RpcParam(name = "endToken") BigInteger endToken, + @RpcParam(name = "notifications") boolean notifications, @RpcParam(name = "repairParallelism") RepairParallelism repairParallelism, @RpcParam(name = "datacenters") Collection datacenters, @RpcParam(name = "associatedTokens") List associatedTokens, @RpcParam(name = "repairThreadCount") int repairThreadCount) throws IOException { // At least one keyspace is required - if (keyspace != null) { - Map options = new HashMap<>(); - options.put(RepairOption.PARALLELISM_KEY, repairParallelism.getName()); - options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full)); - options.put( - RepairOption.JOB_THREADS_KEY, - Integer.toString(repairThreadCount == 0 ? 1 : repairThreadCount)); - options.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE)); - options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(tables, ",")); - if (full) { + assert(keyspace != null); + Map options = new HashMap<>(); + repairParallelism.map(rPar -> + options.put( + RepairOption.PARALLELISM_KEY, rPar.getName() + ) + ); + options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!full)); + repairThreadCount.map(tCount -> options.put( + RepairOption.JOB_THREADS_KEY, + Integer.toString(tCount == 0 ? 1 : tCount)) + ); + options.put(RepairOption.TRACE_KEY, Boolean.toString(Boolean.FALSE)); + options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(tables, ",")); + if (full) { + associatedTokens.map(aTokens -> + options.put( RepairOption.RANGES_KEY, StringUtils.join( - associatedTokens + aTokens .stream() .map(token -> token.getStart() + ":" + token.getEnd()) .collect(Collectors.toList()), - ",")); - } + ",") + ) + )); + } + datacenters.map( dcs -> + options.put(RepairOption.DATACENTERS_KEY, StringUtils.join(dcs, ",")) + ); // Since Cassandra provides us with a async, we don't need to use our executor interface for // this. final int repairJobId = - ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec); + ShimLoader.instance.get().getStorageService().repairAsync(keyspace, options); if (!notifications) { return Integer.valueOf(repairJobId).toString(); @@ -839,6 +851,7 @@ public String repair( } throw new RuntimeException("At least one keyspace must be defined"); + } @Rpc(name = "move") diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java index bef1a627..2337d548 100644 --- a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java @@ -24,6 +24,7 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse; import java.util.List; import java.util.Map; +import java.util.Optional; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.GET; @@ -503,11 +504,15 @@ public Response repair(RepairRequest repairRequest) { } app.cqlService.executePreparedStatement( app.dbUnixSocketFile, - "CALL NodeOps.repair(?, ?, ?, ?)", + "CALL NodeOps.repair(?, ?, ?, ?, ?, ?, ?)", repairRequest.keyspaceName, repairRequest.tables, repairRequest.full, - false); + // The default repair does not allow for specifying things like parallelism, threadCounts, source DCs or ranges etc. + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty()); return Response.ok("OK").build(); }); @@ -637,4 +642,4 @@ public Response move(@QueryParam(value = "newToken") String newToken) { + " \"language\": null,\n" + " \"encoding\": null\n" + "}"; -} +} \ No newline at end of file