Skip to content

Commit

Permalink
Add notification details to the repair process (#338)
Browse files Browse the repository at this point in the history
* Add repair job version which follows notifications

* Add new getStatusChanges and message

* Fix formatting

* Add tests for the async endpoint in v1 also

* Change setStatusChange

* Fix test indentation

* Rename RpcParam

* Fix rebase

* Add back the payload in the IT test to the request

* Update openapi.json, make special case for return id 0 to the Rpc call - should fix the test also which can not be run otherwise on single node cluster

* Disable the test, throw exception if Cassandra refuses to repair

* Fix repair test to run repair for a keyspace with RF 2 (#351)

* Update openapi.json after rebase

---------

Co-authored-by: Erik Merkle <[email protected]>
  • Loading branch information
burmanm and emerkle826 committed Oct 12, 2023
1 parent 8be160c commit 2b55cda
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.management.NotificationFilter;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularData;
import org.apache.cassandra.auth.AuthenticatedUser;
Expand All @@ -52,6 +53,7 @@
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.progress.ProgressEventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -733,10 +735,11 @@ public void clearSnapshots(
}

@Rpc(name = "repair")
public void repair(
public String repair(
@RpcParam(name = "keyspaceName") String keyspace,
@RpcParam(name = "tables") List<String> tables,
@RpcParam(name = "full") Boolean full)
@RpcParam(name = "full") Boolean full,
@RpcParam(name = "notifications") boolean notifications)
throws IOException {
// At least one keyspace is required
if (keyspace != null) {
Expand All @@ -756,8 +759,76 @@ public void repair(
// incremental repairs will fail if parallelism is not set
repairSpec.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.getName());
}
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec);

// Since Cassandra provides us with a async, we don't need to use our executor interface for
// this.
final int repairJobId =
ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec);

if (!notifications) {
return Integer.valueOf(repairJobId).toString();
}

String jobId = String.format("repair-%d", repairJobId);
final Job job = service.createJob("repair", jobId);

if (repairJobId == 0) {
// Job is done and won't continue
job.setStatusChange(ProgressEventType.COMPLETE, "");
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
service.updateJob(job);
return job.getJobId();
}

ShimLoader.instance
.get()
.getStorageService()
.addNotificationListener(
(notification, handback) -> {
if (notification.getType().equals("progress")) {
Map<String, Integer> data = (Map<String, Integer>) notification.getUserData();
ProgressEventType progress = ProgressEventType.values()[data.get("type")];

switch (progress) {
case START:
job.setStatusChange(progress, notification.getMessage());
job.setStartTime(System.currentTimeMillis());
break;
case NOTIFICATION:
case PROGRESS:
break;
case ERROR:
case ABORT:
job.setError(new RuntimeException(notification.getMessage()));
job.setStatus(Job.JobStatus.ERROR);
job.setFinishedTime(System.currentTimeMillis());
break;
case SUCCESS:
job.setStatusChange(progress, notification.getMessage());
// SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that)
break;
case COMPLETE:
job.setStatusChange(progress, notification.getMessage());
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
break;
}
service.updateJob(job);
}
},
(NotificationFilter)
notification -> {
final int repairNo =
Integer.parseInt(((String) notification.getSource()).split(":")[1]);
return repairNo == repairJobId;
},
null);

return job.getJobId();
}

throw new RuntimeException("At least one keyspace must be defined");
}

@Rpc(name = "move")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
package com.datastax.mgmtapi.util;

import com.google.common.annotations.VisibleForTesting;
import java.util.UUID;
import java.util.ArrayList;
import java.util.List;
import org.apache.cassandra.utils.progress.ProgressEventType;

public class Job {
public enum JobStatus {
Expand All @@ -19,14 +21,43 @@ public enum JobStatus {
private String jobType;
private JobStatus status;
private long submitTime;
private long startTime;
private long finishedTime;
private Throwable error;

public Job(String jobType) {
public class StatusChange {
ProgressEventType status;
long changeTime;

String message;

public StatusChange(ProgressEventType type, String message) {
changeTime = System.currentTimeMillis();
status = type;
this.message = message;
}

public ProgressEventType getStatus() {
return status;
}

public long getChangeTime() {
return changeTime;
}

public String getMessage() {
return message;
}
}

private List<StatusChange> statusChanges;

public Job(String jobType, String jobId) {
this.jobType = jobType;
jobId = UUID.randomUUID().toString();
this.jobId = jobId;
submitTime = System.currentTimeMillis();
status = JobStatus.WAITING;
statusChanges = new ArrayList<>();
}

@VisibleForTesting
Expand All @@ -51,6 +82,14 @@ public void setStatus(JobStatus status) {
this.status = status;
}

public void setStatusChange(ProgressEventType type, String message) {
statusChanges.add(new StatusChange(type, message));
}

public List<StatusChange> getStatusChanges() {
return statusChanges;
}

public long getSubmitTime() {
return submitTime;
}
Expand All @@ -70,4 +109,8 @@ public Throwable getError() {
public void setError(Throwable error) {
this.error = error;
}

public void setStartTime(long startTime) {
this.startTime = startTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -20,29 +21,40 @@ public class JobExecutor {
public Pair<String, CompletableFuture<Void>> submit(String jobType, Runnable runnable) {
// Where do I create the job details? Here? Add it to the Cache first?
// Update the status on the callbacks and do nothing else?
final Job job = new Job(jobType);
jobCache.put(job.getJobId(), job);

String jobId = UUID.randomUUID().toString();
final Job job = createJob(jobType, jobId);

CompletableFuture<Void> submittedJob =
CompletableFuture.runAsync(runnable, executorService)
.thenAccept(
empty -> {
job.setStatus(Job.JobStatus.COMPLETED);
job.setFinishedTime(System.currentTimeMillis());
jobCache.put(job.getJobId(), job);
updateJob(job);
})
.exceptionally(
t -> {
job.setStatus(Job.JobStatus.ERROR);
job.setError(t);
job.setFinishedTime(System.currentTimeMillis());
jobCache.put(job.getJobId(), job);
updateJob(job);
return null;
});

return Pair.create(job.getJobId(), submittedJob);
}

public Job createJob(String jobType, String jobId) {
final Job job = new Job(jobType, jobId);
jobCache.put(jobId, job);
return job;
}

public void updateJob(Job job) {
jobCache.put(job.getJobId(), job);
}

public Job getJobWithId(String jobId) {
return jobCache.getIfPresent(jobId);
}
Expand Down
60 changes: 60 additions & 0 deletions management-api-server/doc/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,45 @@
"summary" : "Rebuild data by streaming data from other nodes. This operation returns immediately with a job id."
}
},
"/api/v1/ops/node/repair" : {
"post" : {
"operationId" : "repair_1",
"requestBody" : {
"content" : {
"*/*" : {
"schema" : {
"$ref" : "#/components/schemas/RepairRequest"
}
}
}
},
"responses" : {
"202" : {
"content" : {
"text/plain" : {
"example" : "repair-1234567",
"schema" : {
"type" : "string"
}
}
},
"description" : "Job ID for successfully scheduled Cassandra repair request"
},
"400" : {
"content" : {
"text/plain" : {
"example" : "keyspaceName must be specified",
"schema" : {
"type" : "string"
}
}
},
"description" : "Repair request missing Keyspace name"
}
},
"summary" : "Execute a nodetool repair operation"
}
},
"/api/v1/ops/node/schema/versions" : {
"get" : {
"operationId" : "getSchemaVersions",
Expand Down Expand Up @@ -1805,6 +1844,12 @@
"type" : "string",
"enum" : [ "ERROR", "COMPLETED", "WAITING" ]
},
"status_changes" : {
"type" : "array",
"items" : {
"$ref" : "#/components/schemas/StatusChange"
}
},
"submit_time" : {
"type" : "integer",
"format" : "int64"
Expand Down Expand Up @@ -1951,6 +1996,21 @@
},
"required" : [ "entity" ]
},
"StatusChange" : {
"type" : "object",
"properties" : {
"change_time" : {
"type" : "integer",
"format" : "int64"
},
"message" : {
"type" : "string"
},
"status" : {
"type" : "string"
}
}
},
"StreamingInfo" : {
"type" : "object",
"properties" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,11 @@ public Response repair(RepairRequest repairRequest) {
}
app.cqlService.executePreparedStatement(
app.dbUnixSocketFile,
"CALL NodeOps.repair(?, ?, ?)",
"CALL NodeOps.repair(?, ?, ?, ?)",
repairRequest.keyspaceName,
repairRequest.tables,
repairRequest.full);
repairRequest.full,
false);

return Response.ok("OK").build();
});
Expand Down
Loading

0 comments on commit 2b55cda

Please sign in to comment.