diff --git a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java index fe0d9ab5..6ea01acd 100644 --- a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java +++ b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/NodeOpsProvider.java @@ -38,6 +38,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import javax.management.NotificationFilter; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.TabularData; import org.apache.cassandra.auth.AuthenticatedUser; @@ -52,6 +53,7 @@ import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.progress.ProgressEventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -733,10 +735,11 @@ public void clearSnapshots( } @Rpc(name = "repair") - public void repair( + public String repair( @RpcParam(name = "keyspaceName") String keyspace, @RpcParam(name = "tables") List tables, - @RpcParam(name = "full") Boolean full) + @RpcParam(name = "full") Boolean full, + @RpcParam(name = "notifications") boolean notifications) throws IOException { // At least one keyspace is required if (keyspace != null) { @@ -756,8 +759,76 @@ public void repair( // incremental repairs will fail if parallelism is not set repairSpec.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.getName()); } - ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec); + + // Since Cassandra provides us with a async, we don't need to use our executor interface for + // this. + final int repairJobId = + ShimLoader.instance.get().getStorageService().repairAsync(keyspace, repairSpec); + + if (!notifications) { + return Integer.valueOf(repairJobId).toString(); + } + + String jobId = String.format("repair-%d", repairJobId); + final Job job = service.createJob("repair", jobId); + + if (repairJobId == 0) { + // Job is done and won't continue + job.setStatusChange(ProgressEventType.COMPLETE, ""); + job.setStatus(Job.JobStatus.COMPLETED); + job.setFinishedTime(System.currentTimeMillis()); + service.updateJob(job); + return job.getJobId(); + } + + ShimLoader.instance + .get() + .getStorageService() + .addNotificationListener( + (notification, handback) -> { + if (notification.getType().equals("progress")) { + Map data = (Map) notification.getUserData(); + ProgressEventType progress = ProgressEventType.values()[data.get("type")]; + + switch (progress) { + case START: + job.setStatusChange(progress, notification.getMessage()); + job.setStartTime(System.currentTimeMillis()); + break; + case NOTIFICATION: + case PROGRESS: + break; + case ERROR: + case ABORT: + job.setError(new RuntimeException(notification.getMessage())); + job.setStatus(Job.JobStatus.ERROR); + job.setFinishedTime(System.currentTimeMillis()); + break; + case SUCCESS: + job.setStatusChange(progress, notification.getMessage()); + // SUCCESS / ERROR does not mean the job has completed yet (COMPLETE is that) + break; + case COMPLETE: + job.setStatusChange(progress, notification.getMessage()); + job.setStatus(Job.JobStatus.COMPLETED); + job.setFinishedTime(System.currentTimeMillis()); + break; + } + service.updateJob(job); + } + }, + (NotificationFilter) + notification -> { + final int repairNo = + Integer.parseInt(((String) notification.getSource()).split(":")[1]); + return repairNo == repairJobId; + }, + null); + + return job.getJobId(); } + + throw new RuntimeException("At least one keyspace must be defined"); } @Rpc(name = "move") diff --git a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/Job.java b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/Job.java index 439304bd..20e4e9dd 100644 --- a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/Job.java +++ b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/Job.java @@ -6,7 +6,9 @@ package com.datastax.mgmtapi.util; import com.google.common.annotations.VisibleForTesting; -import java.util.UUID; +import java.util.ArrayList; +import java.util.List; +import org.apache.cassandra.utils.progress.ProgressEventType; public class Job { public enum JobStatus { @@ -19,14 +21,43 @@ public enum JobStatus { private String jobType; private JobStatus status; private long submitTime; + private long startTime; private long finishedTime; private Throwable error; - public Job(String jobType) { + public class StatusChange { + ProgressEventType status; + long changeTime; + + String message; + + public StatusChange(ProgressEventType type, String message) { + changeTime = System.currentTimeMillis(); + status = type; + this.message = message; + } + + public ProgressEventType getStatus() { + return status; + } + + public long getChangeTime() { + return changeTime; + } + + public String getMessage() { + return message; + } + } + + private List statusChanges; + + public Job(String jobType, String jobId) { this.jobType = jobType; - jobId = UUID.randomUUID().toString(); + this.jobId = jobId; submitTime = System.currentTimeMillis(); status = JobStatus.WAITING; + statusChanges = new ArrayList<>(); } @VisibleForTesting @@ -51,6 +82,14 @@ public void setStatus(JobStatus status) { this.status = status; } + public void setStatusChange(ProgressEventType type, String message) { + statusChanges.add(new StatusChange(type, message)); + } + + public List getStatusChanges() { + return statusChanges; + } + public long getSubmitTime() { return submitTime; } @@ -70,4 +109,8 @@ public Throwable getError() { public void setError(Throwable error) { this.error = error; } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } } diff --git a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java index 2e086a15..900fe87f 100644 --- a/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java +++ b/management-api-agent-common/src/main/java/com/datastax/mgmtapi/util/JobExecutor.java @@ -7,6 +7,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -20,8 +21,9 @@ public class JobExecutor { public Pair> submit(String jobType, Runnable runnable) { // Where do I create the job details? Here? Add it to the Cache first? // Update the status on the callbacks and do nothing else? - final Job job = new Job(jobType); - jobCache.put(job.getJobId(), job); + + String jobId = UUID.randomUUID().toString(); + final Job job = createJob(jobType, jobId); CompletableFuture submittedJob = CompletableFuture.runAsync(runnable, executorService) @@ -29,20 +31,30 @@ public Pair> submit(String jobType, Runnable run empty -> { job.setStatus(Job.JobStatus.COMPLETED); job.setFinishedTime(System.currentTimeMillis()); - jobCache.put(job.getJobId(), job); + updateJob(job); }) .exceptionally( t -> { job.setStatus(Job.JobStatus.ERROR); job.setError(t); job.setFinishedTime(System.currentTimeMillis()); - jobCache.put(job.getJobId(), job); + updateJob(job); return null; }); return Pair.create(job.getJobId(), submittedJob); } + public Job createJob(String jobType, String jobId) { + final Job job = new Job(jobType, jobId); + jobCache.put(jobId, job); + return job; + } + + public void updateJob(Job job) { + jobCache.put(job.getJobId(), job); + } + public Job getJobWithId(String jobId) { return jobCache.getIfPresent(jobId); } diff --git a/management-api-server/doc/openapi.json b/management-api-server/doc/openapi.json index eb258a07..165d23e2 100644 --- a/management-api-server/doc/openapi.json +++ b/management-api-server/doc/openapi.json @@ -1520,6 +1520,45 @@ "summary" : "Rebuild data by streaming data from other nodes. This operation returns immediately with a job id." } }, + "/api/v1/ops/node/repair" : { + "post" : { + "operationId" : "repair_1", + "requestBody" : { + "content" : { + "*/*" : { + "schema" : { + "$ref" : "#/components/schemas/RepairRequest" + } + } + } + }, + "responses" : { + "202" : { + "content" : { + "text/plain" : { + "example" : "repair-1234567", + "schema" : { + "type" : "string" + } + } + }, + "description" : "Job ID for successfully scheduled Cassandra repair request" + }, + "400" : { + "content" : { + "text/plain" : { + "example" : "keyspaceName must be specified", + "schema" : { + "type" : "string" + } + } + }, + "description" : "Repair request missing Keyspace name" + } + }, + "summary" : "Execute a nodetool repair operation" + } + }, "/api/v1/ops/node/schema/versions" : { "get" : { "operationId" : "getSchemaVersions", @@ -1805,6 +1844,12 @@ "type" : "string", "enum" : [ "ERROR", "COMPLETED", "WAITING" ] }, + "status_changes" : { + "type" : "array", + "items" : { + "$ref" : "#/components/schemas/StatusChange" + } + }, "submit_time" : { "type" : "integer", "format" : "int64" @@ -1951,6 +1996,21 @@ }, "required" : [ "entity" ] }, + "StatusChange" : { + "type" : "object", + "properties" : { + "change_time" : { + "type" : "integer", + "format" : "int64" + }, + "message" : { + "type" : "string" + }, + "status" : { + "type" : "string" + } + } + }, "StreamingInfo" : { "type" : "object", "properties" : { diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java index 53319d8f..bef1a627 100644 --- a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/NodeOpsResources.java @@ -503,10 +503,11 @@ public Response repair(RepairRequest repairRequest) { } app.cqlService.executePreparedStatement( app.dbUnixSocketFile, - "CALL NodeOps.repair(?, ?, ?)", + "CALL NodeOps.repair(?, ?, ?, ?)", repairRequest.keyspaceName, repairRequest.tables, - repairRequest.full); + repairRequest.full, + false); return Response.ok("OK").build(); }); diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/models/Job.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/models/Job.java index 9dbc82f4..d2ead968 100644 --- a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/models/Job.java +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/models/Job.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import java.io.Serializable; +import java.util.List; public class Job implements Serializable { public enum JobStatus { @@ -34,6 +35,38 @@ public enum JobStatus { @JsonProperty(value = "error") private String error; + public class StatusChange { + @JsonProperty(value = "status") + String status; + + @JsonProperty(value = "change_time") + long changeTime; + + @JsonProperty(value = "message") + String message; + + public StatusChange(String type, String message) { + changeTime = System.currentTimeMillis(); + status = type; + this.message = message; + } + + public String getStatus() { + return status; + } + + public long getChangeTime() { + return changeTime; + } + + public String getMessage() { + return message; + } + } + + @JsonProperty(value = "status_changes") + private List statusChanges; + @JsonCreator public Job( @JsonProperty(value = "id") String jobId, @@ -41,13 +74,15 @@ public Job( @JsonProperty(value = "status") String status, @JsonProperty(value = "submit_time") long submitTime, @JsonProperty(value = "end_time") long finishedTime, - @JsonProperty(value = "error") String error) { + @JsonProperty(value = "error") String error, + @JsonProperty(value = "status_changes") List changes) { this.jobId = jobId; this.jobType = jobType; this.status = JobStatus.valueOf(status); this.submitTime = submitTime; this.finishedTime = finishedTime; this.error = error; + this.statusChanges = changes; } public String getJobId() { @@ -70,6 +105,10 @@ public long getFinishedTime() { return finishedTime; } + public List getStatusChanges() { + return statusChanges; + } + public String getError() { return error; } diff --git a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v1/NodeOpsResources.java b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v1/NodeOpsResources.java index ed8f5d6c..cd7a016c 100644 --- a/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v1/NodeOpsResources.java +++ b/management-api-server/src/main/java/com/datastax/mgmtapi/resources/v1/NodeOpsResources.java @@ -8,6 +8,7 @@ import com.datastax.mgmtapi.ManagementApplication; import com.datastax.mgmtapi.resources.common.BaseResources; import com.datastax.mgmtapi.resources.helpers.ResponseTools; +import com.datastax.mgmtapi.resources.models.RepairRequest; import com.datastax.oss.driver.api.core.cql.Row; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.media.Content; @@ -120,4 +121,45 @@ public Response schemaVersions() { return Response.ok(schemaVersions).build(); }); } + + @POST + @Path("/repair") + @Produces(MediaType.TEXT_PLAIN) + @ApiResponse( + responseCode = "202", + description = "Job ID for successfully scheduled Cassandra repair request", + content = + @Content( + mediaType = MediaType.TEXT_PLAIN, + schema = @Schema(implementation = String.class), + examples = @ExampleObject(value = "repair-1234567"))) + @ApiResponse( + responseCode = "400", + description = "Repair request missing Keyspace name", + content = + @Content( + mediaType = MediaType.TEXT_PLAIN, + schema = @Schema(implementation = String.class), + examples = @ExampleObject(value = "keyspaceName must be specified"))) + @Operation(summary = "Execute a nodetool repair operation", operationId = "repair") + public Response repair(RepairRequest repairRequest) { + return handle( + () -> { + if (repairRequest.keyspaceName == null) { + return Response.status(Response.Status.BAD_REQUEST) + .entity("keyspaceName must be specified") + .build(); + } + return Response.accepted( + ResponseTools.getSingleRowStringResponse( + app.dbUnixSocketFile, + app.cqlService, + "CALL NodeOps.repair(?, ?, ?, ?)", + repairRequest.keyspaceName, + repairRequest.tables, + repairRequest.full, + true)) + .build(); + }); + } } diff --git a/management-api-server/src/test/java/com/datastax/mgmtapi/K8OperatorResourcesTest.java b/management-api-server/src/test/java/com/datastax/mgmtapi/K8OperatorResourcesTest.java index cc201e69..32469115 100644 --- a/management-api-server/src/test/java/com/datastax/mgmtapi/K8OperatorResourcesTest.java +++ b/management-api-server/src/test/java/com/datastax/mgmtapi/K8OperatorResourcesTest.java @@ -1657,7 +1657,51 @@ public void testRepair() throws Exception { assertThat(response.getStatus()).isEqualTo(HttpStatus.SC_OK); verify(context.cqlService) .executePreparedStatement( - any(), eq("CALL NodeOps.repair(?, ?, ?)"), eq("test_ks"), eq(null), eq(true)); + any(), + eq("CALL NodeOps.repair(?, ?, ?, ?)"), + eq("test_ks"), + eq(null), + eq(true), + eq(false)); + } + + @Test + public void testRepairAsync() throws Exception { + Context context = setup(); + when(context.cqlService.executePreparedStatement(any(), anyString())).thenReturn(null); + + RepairRequest repairRequest = new RepairRequest("test_ks", null, Boolean.TRUE); + String repairRequestAsJSON = WriterUtility.asString(repairRequest, MediaType.APPLICATION_JSON); + + ResultSet mockResultSet = mock(ResultSet.class); + Row mockRow = mock(Row.class); + + when(context.cqlService.executePreparedStatement(any(), any(), any())) + .thenReturn(mockResultSet); + + when(mockResultSet.one()).thenReturn(mockRow); + + when(mockRow.getString(0)).thenReturn("0fe65b47-98c2-47d8-9c3c-5810c9988e10"); + + MockHttpRequest request = + MockHttpRequest.post("/api/v1/ops/node/repair") + .content(repairRequestAsJSON.getBytes()) + .accept(MediaType.TEXT_PLAIN) + .contentType(MediaType.APPLICATION_JSON_TYPE); + + MockHttpResponse response = context.invoke(request); + + Assert.assertEquals(HttpStatus.SC_ACCEPTED, response.getStatus()); + Assert.assertEquals("0fe65b47-98c2-47d8-9c3c-5810c9988e10", response.getContentAsString()); + + verify(context.cqlService) + .executePreparedStatement( + any(), + eq("CALL NodeOps.repair(?, ?, ?, ?)"), + eq("test_ks"), + eq(null), + eq(true), + eq(true)); } @Test diff --git a/management-api-server/src/test/java/com/datastax/mgmtapi/NonDestructiveOpsIT.java b/management-api-server/src/test/java/com/datastax/mgmtapi/NonDestructiveOpsIT.java index 08aa855a..4e14d611 100644 --- a/management-api-server/src/test/java/com/datastax/mgmtapi/NonDestructiveOpsIT.java +++ b/management-api-server/src/test/java/com/datastax/mgmtapi/NonDestructiveOpsIT.java @@ -41,6 +41,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.util.IllegalReferenceCountException; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -551,7 +552,7 @@ public void testCreateKeyspace() throws IOException, URISyntaxException { .thenApply(this::responseAsString) .join(); - createKeyspace(client, localDc, "someTestKeyspace"); + createKeyspace(client, localDc, "someTestKeyspace", 1); } @Test @@ -567,7 +568,7 @@ public void testAlterKeyspace() throws IOException, URISyntaxException { .join(); String ks = "alteringKeyspaceTest"; - createKeyspace(client, localDc, ks); + createKeyspace(client, localDc, ks, 1); CreateOrAlterKeyspaceRequest request = new CreateOrAlterKeyspaceRequest(ks, Arrays.asList(new ReplicationSetting(localDc, 3))); @@ -594,7 +595,7 @@ public void testGetKeyspaces() throws IOException, URISyntaxException { .join(); String ks = "getkeyspacestest"; - createKeyspace(client, localDc, ks); + createKeyspace(client, localDc, ks, 1); URI uri = new URIBuilder(BASE_PATH + "/ops/keyspace").build(); String response = client.get(uri.toURL()).thenApply(this::responseAsString).join(); @@ -727,13 +728,22 @@ public void testRepair() throws IOException, URISyntaxException, InterruptedExce assumeTrue(IntegrationTestUtils.shouldRun()); ensureStarted(); + // create a keyspace with RF of at least 2 NettyHttpClient client = new NettyHttpClient(BASE_URL); + String localDc = + client + .get(new URIBuilder(BASE_PATH + "/metadata/localdc").build().toURL()) + .thenApply(this::responseAsString) + .join(); + + String ks = "someTestKeyspace"; + createKeyspace(client, localDc, ks, 2); URIBuilder uriBuilder = new URIBuilder(BASE_PATH + "/ops/node/repair"); URI repairUri = uriBuilder.build(); // execute repair - RepairRequest repairRequest = new RepairRequest("system_auth", null, Boolean.TRUE); + RepairRequest repairRequest = new RepairRequest(ks, null, Boolean.TRUE); String requestAsJSON = WriterUtility.asString(repairRequest, MediaType.APPLICATION_JSON); boolean repairSuccessful = @@ -744,6 +754,68 @@ public void testRepair() throws IOException, URISyntaxException, InterruptedExce assertTrue("Repair request was not successful", repairSuccessful); } + @Test + public void testAsyncRepair() throws IOException, URISyntaxException, InterruptedException { + assumeTrue(IntegrationTestUtils.shouldRun()); + ensureStarted(); + + // create a keyspace with RF of at least 2 + NettyHttpClient client = new NettyHttpClient(BASE_URL); + String localDc = + client + .get(new URIBuilder(BASE_PATH + "/metadata/localdc").build().toURL()) + .thenApply(this::responseAsString) + .join(); + + String ks = "someTestKeyspace"; + createKeyspace(client, localDc, ks, 2); + + URIBuilder uriBuilder = new URIBuilder("http://localhost:8080/api/v1/ops/node/repair"); + URI repairUri = uriBuilder.build(); + + // execute repair + RepairRequest repairRequest = new RepairRequest("someTestKeyspace", null, Boolean.TRUE); + String requestAsJSON = WriterUtility.asString(repairRequest, MediaType.APPLICATION_JSON); + + Pair repairResponse = + client.post(repairUri.toURL(), requestAsJSON).thenApply(this::responseAsCodeAndBody).join(); + assertThat(repairResponse.getLeft()).isEqualTo(HttpStatus.SC_ACCEPTED); + String jobId = repairResponse.getRight(); + assertThat(jobId).isNotEmpty(); + + URI getJobDetailsUri = + new URIBuilder(BASE_PATH + "/ops/executor/job").addParameter("job_id", jobId).build(); + + await() + .atMost(Duration.ofMinutes(5)) + .untilAsserted( + () -> { + Pair getJobDetailsResponse; + try { + getJobDetailsResponse = + client + .get(getJobDetailsUri.toURL()) + .thenApply(this::responseAsCodeAndBody) + .join(); + } catch (IllegalReferenceCountException e) { + // Just retry + assertFalse(true); + return; + } + assertThat(getJobDetailsResponse.getLeft()).isEqualTo(HttpStatus.SC_OK); + Map jobDetails = + new JsonMapper() + .readValue( + getJobDetailsResponse.getRight(), + new TypeReference>() {}); + assertThat(jobDetails) + .hasEntrySatisfying("id", value -> assertThat(value).isEqualTo(jobId)) + .hasEntrySatisfying("type", value -> assertThat(value).isEqualTo("repair")) + .hasEntrySatisfying( + "status", value -> assertThat(value).isIn("COMPLETED", "ERROR")); + }); + } + @Test public void testGetReplication() throws IOException, URISyntaxException { assumeTrue(IntegrationTestUtils.shouldRun()); @@ -757,7 +829,7 @@ public void testGetReplication() throws IOException, URISyntaxException { .join(); String ks = "getreplicationtest"; - createKeyspace(client, localDc, ks); + createKeyspace(client, localDc, ks, 1); // missing keyspace URI uri = new URIBuilder(BASE_PATH + "/ops/keyspace/replication").build(); @@ -840,7 +912,7 @@ public void testCreateTable() throws IOException, URISyntaxException { // this test also tests case sensitivity in CQL identifiers. String ks = "CreateTableTest"; - createKeyspace(client, localDc, ks); + createKeyspace(client, localDc, ks, 1); CreateTableRequest request = new CreateTableRequest( @@ -923,11 +995,11 @@ public void testMoveNode() throws IOException, URISyntaxException { }); } - private void createKeyspace(NettyHttpClient client, String localDc, String keyspaceName) + private void createKeyspace(NettyHttpClient client, String localDc, String keyspaceName, int rf) throws IOException, URISyntaxException { CreateOrAlterKeyspaceRequest request = new CreateOrAlterKeyspaceRequest( - keyspaceName, Arrays.asList(new ReplicationSetting(localDc, 1))); + keyspaceName, Arrays.asList(new ReplicationSetting(localDc, rf))); String requestAsJSON = WriterUtility.asString(request, MediaType.APPLICATION_JSON); URI uri = new URIBuilder(BASE_PATH + "/ops/keyspace/create").build(); @@ -951,6 +1023,11 @@ private String responseAsString(FullHttpResponse r) { } private Pair responseAsCodeAndBody(FullHttpResponse r) { - return Pair.of(r.status().code(), r.content().toString(UTF_8)); + FullHttpResponse copy = r.copy(); + if (copy.content().readableBytes() > 0) { + return Pair.of(copy.status().code(), copy.content().toString(UTF_8)); + } + + return Pair.of(copy.status().code(), null); } }