diff --git a/application/pom.xml b/application/pom.xml index a461cf35..1d87ec09 100644 --- a/application/pom.xml +++ b/application/pom.xml @@ -170,6 +170,11 @@ 1.64 test + + org.testcontainers + cassandra + test + \ No newline at end of file diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/repair/RepairHistoryService.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/repair/RepairHistoryService.java new file mode 100644 index 00000000..c594b26d --- /dev/null +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/config/repair/RepairHistoryService.java @@ -0,0 +1,357 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.config.repair; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.google.common.base.Preconditions; +import java.time.Instant; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; + +public final class RepairHistoryService +{ + + private static final Logger LOG = LoggerFactory.getLogger(RepairHistoryService.class); + + private static final String KEYSPACE_NAME = "ecchronos"; + private static final String TABLE_NAME = "repair_history"; + private static final String COLUMN_NODE_ID = "node_id"; + private static final String COLUMN_TABLE_ID = "table_id"; + private static final String COLUMN_REPAIR_ID = "repair_id"; + private static final String COLUMN_JOB_ID = "job_id"; + private static final String COLUMN_COORDINATOR_ID = "coordinator_id"; + private static final String COLUMN_RANGE_BEGIN = "range_begin"; + private static final String COLUMN_RANGE_END = "range_end"; + private static final String COLUMN_PARTICIPANTS = "participants"; + private static final String COLUMN_STATUS = "status"; + private static final String COLUMN_STARTED_AT = "started_at"; + private static final String COLUMN_FINISHED_AT = "finished_at"; + + private final PreparedStatement myCreateStatement; + private final PreparedStatement myUpdateStatement; + private final PreparedStatement mySelectStatement; + private final CqlSession myCqlSession; + + public RepairHistoryService(final CqlSession cqlSession) + { + myCqlSession = Preconditions.checkNotNull(cqlSession, "CqlSession cannot be null"); + myCreateStatement = myCqlSession + .prepare(QueryBuilder.insertInto(KEYSPACE_NAME, TABLE_NAME) + .value(COLUMN_TABLE_ID, bindMarker()) + .value(COLUMN_NODE_ID, bindMarker()) + .value(COLUMN_REPAIR_ID, bindMarker()) + .value(COLUMN_JOB_ID, bindMarker()) + .value(COLUMN_COORDINATOR_ID, bindMarker()) + .value(COLUMN_RANGE_BEGIN, bindMarker()) + .value(COLUMN_RANGE_END, bindMarker()) + .value(COLUMN_PARTICIPANTS, bindMarker()) + .value(COLUMN_STATUS, bindMarker()) + .value(COLUMN_STARTED_AT, bindMarker()) + .value(COLUMN_FINISHED_AT, bindMarker()) + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)); + myUpdateStatement = myCqlSession + .prepare(QueryBuilder.update(KEYSPACE_NAME, TABLE_NAME) + .setColumn(COLUMN_JOB_ID, bindMarker()) + .setColumn(COLUMN_COORDINATOR_ID, bindMarker()) + .setColumn(COLUMN_RANGE_BEGIN, bindMarker()) + .setColumn(COLUMN_RANGE_END, bindMarker()) + .setColumn(COLUMN_PARTICIPANTS, bindMarker()) + .setColumn(COLUMN_STATUS, bindMarker()) + .setColumn(COLUMN_STARTED_AT, bindMarker()) + .setColumn(COLUMN_FINISHED_AT, bindMarker()) + .whereColumn(COLUMN_TABLE_ID) + .isEqualTo(bindMarker()) + .whereColumn(COLUMN_NODE_ID) + .isEqualTo(bindMarker()) + .whereColumn(COLUMN_REPAIR_ID) + .isEqualTo(bindMarker()) + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)); + mySelectStatement = myCqlSession + .prepare(selectFrom(KEYSPACE_NAME, TABLE_NAME) + .columns(COLUMN_NODE_ID, COLUMN_TABLE_ID, COLUMN_REPAIR_ID, COLUMN_JOB_ID, COLUMN_COORDINATOR_ID, + COLUMN_RANGE_BEGIN, COLUMN_RANGE_END, COLUMN_PARTICIPANTS, COLUMN_STATUS, COLUMN_STARTED_AT, + COLUMN_FINISHED_AT) + .whereColumn(COLUMN_TABLE_ID) + .isEqualTo(bindMarker()) + .whereColumn(COLUMN_NODE_ID) + .isEqualTo(bindMarker()) + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)); + } + + public ResultSet getRepairHistoryInfo(final RepairHistoryContext repairHistoryContext) + { + BoundStatement boundStatement = mySelectStatement.bind(repairHistoryContext.getMyTableId(), + repairHistoryContext.getMyNodeId()); + return myCqlSession.execute(boundStatement); + } + + public ResultSet insertRepairHistoryInfo(final RepairHistoryContext repairHistoryContext) + { + LOG.info("Preparing to insert repair history {} with tableId {} and nodeId {}", + repairHistoryContext.getMyTableId(), repairHistoryContext.getMyNodeId()); + BoundStatement repairHistoryInfo = myCreateStatement.bind(repairHistoryContext.getMyTableId(), + repairHistoryContext.getMyNodeId(), + repairHistoryContext.getMyRepairId(), + repairHistoryContext.getMyJobId(), + repairHistoryContext.getMyCoordinatorId(), + repairHistoryContext.getMyRangeBegin(), + repairHistoryContext.getMyRangeEnd(), + repairHistoryContext.getMyParticipants(), + repairHistoryContext.getMyStatus(), + repairHistoryContext.getMyStartedAt(), + repairHistoryContext.getMyFinishedAt()); + ResultSet tmpResultSet = myCqlSession.execute(repairHistoryInfo); + if (tmpResultSet.wasApplied()) + { + LOG.info("RepairHistory inserted successfully with tableId {} and nodeId {}", + repairHistoryContext.getMyTableId(), repairHistoryContext.getMyNodeId()); + } + else + { + LOG.error("Unable to insert repairHistory with tableId {} and nodeId {}", + repairHistoryContext.getMyTableId(), + repairHistoryContext.getMyNodeId()); + } + return tmpResultSet; + } + + public ResultSet updateRepairHistoryInfo(final RepairHistoryContext repairHistoryContext) + { + BoundStatement updateRepairHistoryInfo = myUpdateStatement.bind(repairHistoryContext.getMyJobId(), + repairHistoryContext.getMyCoordinatorId(), + repairHistoryContext.getMyRangeBegin(), + repairHistoryContext.getMyRangeEnd(), + repairHistoryContext.getMyParticipants(), + repairHistoryContext.getMyStatus(), + repairHistoryContext.getMyStartedAt(), + repairHistoryContext.getMyFinishedAt(), + repairHistoryContext.getMyTableId(), + repairHistoryContext.getMyNodeId(), + repairHistoryContext.getMyRepairId()); + ResultSet tmpResultSet = myCqlSession.execute(updateRepairHistoryInfo); + if (tmpResultSet.wasApplied()) + { + LOG.info("RepairHistory successfully updated for tableId {} and nodeId {}", + repairHistoryContext.getMyTableId(), repairHistoryContext.getMyNodeId()); + } + else + { + LOG.error("RepairHistory updated failed for tableId {} and nodeId {}", + repairHistoryContext.getMyTableId(), repairHistoryContext.getMyNodeId()); + } + return tmpResultSet; + } + + public static final class RepairHistoryContext + { + private final UUID myTableId; + private final UUID myNodeId; + private final UUID myRepairId; + private final UUID myJobId; + private final UUID myCoordinatorId; + private final String myRangeBegin; + private final String myRangeEnd; + private final Set myParticipants; + private final String myStatus; + private final Instant myStartedAt; + private final Instant myFinishedAt; + + private RepairHistoryContext(final Builder builder) + { + this.myTableId = builder.myTableId; + this.myNodeId = builder.myNodeId; + this.myRepairId = builder.myRepairId; + this.myJobId = builder.myJobId; + this.myCoordinatorId = builder.myCoordinatorId; + this.myRangeBegin = builder.myRangeBegin; + this.myRangeEnd = builder.myRangeEnd; + this.myParticipants = builder.myParticipants == null ? Set.of() : Set.copyOf(builder.myParticipants); + this.myStatus = builder.myStatus; + this.myStartedAt = builder.myStartedAt; + this.myFinishedAt = builder.myFinishedAt; + } + + public UUID getMyTableId() + { + return myTableId; + } + + public UUID getMyNodeId() + { + return myNodeId; + } + + public UUID getMyRepairId() + { + return myRepairId; + } + + public UUID getMyJobId() + { + return myJobId; + } + + public UUID getMyCoordinatorId() + { + return myCoordinatorId; + } + + public String getMyRangeBegin() + { + return myRangeBegin; + } + + public String getMyRangeEnd() + { + return myRangeEnd; + } + + public Set getMyParticipants() + { + return myParticipants; + } + + public String getMyStatus() + { + return myStatus; + } + + public Instant getMyStartedAt() + { + return myStartedAt; + } + + public Instant getMyFinishedAt() + { + return myFinishedAt; + } + + public static Builder copyOf(final RepairHistoryContext repairHistoryContext) + { + return new RepairHistoryContext.Builder() + .withTableId(repairHistoryContext.getMyTableId()) + .withNodeId(repairHistoryContext.myNodeId) + .withRepairId(repairHistoryContext.myRepairId) + .withJobId(repairHistoryContext.myJobId) + .withCoordinatorId(repairHistoryContext.myCoordinatorId) + .withRangeBegin(repairHistoryContext.myRangeBegin) + .withRangeEnd(repairHistoryContext.myRangeEnd) + .withParticipants(repairHistoryContext.myParticipants) + .withStatus(repairHistoryContext.myStatus) + .withStartedAt(repairHistoryContext.myStartedAt) + .withFinishedAt(repairHistoryContext.myFinishedAt); + } + + public static final class Builder + { + private UUID myTableId; + private UUID myNodeId; + private UUID myRepairId; + private UUID myJobId; + private UUID myCoordinatorId; + private String myRangeBegin; + private String myRangeEnd; + private Set myParticipants; + private String myStatus; + private Instant myStartedAt; + private Instant myFinishedAt; + + public Builder withTableId(final UUID tableId) + { + this.myTableId = tableId; + return this; + } + + public Builder withNodeId(final UUID nodeId) + { + this.myNodeId = nodeId; + return this; + } + + public Builder withRepairId(final UUID repairId) + { + this.myRepairId = repairId; + return this; + } + + public Builder withJobId(final UUID jobId) + { + this.myJobId = jobId; + return this; + } + + public Builder withCoordinatorId(final UUID coordinatorId) + { + this.myCoordinatorId = coordinatorId; + return this; + } + + public Builder withRangeBegin(final String rangeBegin) + { + this.myRangeBegin = rangeBegin; + return this; + } + + public Builder withRangeEnd(final String rangeEnd) + { + this.myRangeEnd = rangeEnd; + return this; + } + + public Builder withParticipants(final Set participants) + { + this.myParticipants = (participants == null) ? Set.of() : new HashSet<>(participants); + return this; + } + + public Builder withStatus(final String status) + { + this.myStatus = status; + return this; + } + + public Builder withStartedAt(final Instant startedAt) + { + this.myStartedAt = startedAt; + return this; + } + + public Builder withFinishedAt(final Instant finishedAt) + { + this.myFinishedAt = finishedAt; + return this; + } + + public RepairHistoryContext build() + { + return new RepairHistoryContext(this); + } + } + } +} diff --git a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/repair/TestRepairHistoryService.java b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/repair/TestRepairHistoryService.java new file mode 100644 index 00000000..1b848dd9 --- /dev/null +++ b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/config/repair/TestRepairHistoryService.java @@ -0,0 +1,123 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.config.repair; + +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.uuid.Uuids; +import com.ericsson.bss.cassandra.ecchronos.application.utils.AbstractCassandraTest; +import java.io.IOException; +import java.time.Instant; +import java.util.Collections; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.ericsson.bss.cassandra.ecchronos.application.config.repair.RepairHistoryService.RepairHistoryContext.Builder; +import static com.ericsson.bss.cassandra.ecchronos.application.config.repair.RepairHistoryService.RepairHistoryContext; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestRepairHistoryService extends AbstractCassandraTest +{ + + private static final String ECCHRONOS_KEYSPACE = "ecchronos"; + private RepairHistoryService myRepairHistoryService; + + @Before + public void setup() throws IOException + { + mySession.execute(String.format( + "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1': 1}", + ECCHRONOS_KEYSPACE)); + String query = String.format( + "CREATE TABLE IF NOT EXISTS %s.repair_history(" + + "table_id UUID, " + + "node_id UUID, " + + "repair_id timeuuid, " + + "job_id UUID, " + + "coordinator_id UUID, " + + "range_begin text, " + + "range_end text, " + + "participants set, " + + "status text, " + + "started_at timestamp, " + + "finished_at timestamp, " + + "PRIMARY KEY((table_id,node_id), repair_id)) " + + "WITH CLUSTERING ORDER BY (repair_id DESC);", + ECCHRONOS_KEYSPACE); + mySession.execute(query); + myRepairHistoryService = new RepairHistoryService(mySession); + } + + @After + public void testCleanup() + { + mySession.execute(SimpleStatement.newInstance( + String.format("TRUNCATE %s.%s", ECCHRONOS_KEYSPACE, "repair_history"))); + } + + @Test + public void testWriteReadUpdateRepairHistoryInfo() + { + UUID tableId = UUID.randomUUID(); + UUID nodeId = UUID.randomUUID(); + UUID repairId = Uuids.timeBased(); + UUID coordinatorId = UUID.randomUUID(); + Builder builder = new Builder(); + + RepairHistoryContext repairHistoryContext = builder.withTableId(tableId) + .withNodeId(nodeId) + .withRepairId(repairId) + .build(); + + myRepairHistoryService.insertRepairHistoryInfo(repairHistoryContext); + + ResultSet result = myRepairHistoryService.getRepairHistoryInfo(repairHistoryContext); + assertNotNull(result); + Row row = result.one(); + UUID expectedTableId = row.get("table_id", UUID.class); + UUID expectedNodeId = row.get("node_id", UUID.class); + UUID expectedRepairId = row.get("repair_id", UUID.class); + UUID expectedCoordinatorId = row.get("coordinator_id", UUID.class); + + assertEquals(expectedTableId, tableId); + assertEquals(expectedNodeId, nodeId); + assertEquals(expectedRepairId, repairId); + assertEquals(expectedCoordinatorId, null); + + RepairHistoryContext updatedRepairHistoryContext = RepairHistoryContext.copyOf(repairHistoryContext) + .withJobId(UUID.randomUUID()) + .withCoordinatorId(coordinatorId) + .withRangeBegin("123") + .withRangeEnd("1000") + .withParticipants(Collections.EMPTY_SET) + .withStatus("complete") + .withStartedAt(Instant.now()) + .withFinishedAt(Instant.now().now()) + .build(); + + myRepairHistoryService.updateRepairHistoryInfo(updatedRepairHistoryContext); + + ResultSet updatedResult = myRepairHistoryService.getRepairHistoryInfo(repairHistoryContext); + Row updatedRow = updatedResult.one(); + expectedCoordinatorId = updatedRow.get("coordinator_id", UUID.class); + String expectedStatus = updatedRow.get("status", String.class); + assertEquals(expectedCoordinatorId, coordinatorId); + assertEquals(expectedStatus, "complete"); + } +} diff --git a/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/utils/AbstractCassandraTest.java b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/utils/AbstractCassandraTest.java new file mode 100644 index 00000000..a21a0399 --- /dev/null +++ b/application/src/test/java/com/ericsson/bss/cassandra/ecchronos/application/utils/AbstractCassandraTest.java @@ -0,0 +1,80 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.application.utils; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.utility.DockerImageName; + +import java.util.ArrayList; +import java.util.List; + +public class AbstractCassandraTest +{ + private static final List> nodes = new ArrayList<>(); + protected static CqlSession mySession; + + private static DistributedNativeConnectionProvider myNativeConnectionProvider; + + @BeforeClass + public static void setUpCluster() + { + CassandraContainer node = new CassandraContainer<>(DockerImageName.parse("cassandra:4.1.5")) + .withExposedPorts(9042, 7000) + .withEnv("CASSANDRA_DC", "DC1") + .withEnv("CASSANDRA_ENDPOINT_SNITCH", "GossipingPropertyFileSnitch") + .withEnv("CASSANDRA_CLUSTER_NAME", "TestCluster"); + nodes.add(node); + node.start(); + mySession = CqlSession.builder() + .addContactPoint(node.getContactPoint()) + .withLocalDatacenter(node.getLocalDatacenter()) + .build(); + List nodesList = new ArrayList<>(mySession.getMetadata().getNodes().values()); + myNativeConnectionProvider = new DistributedNativeConnectionProvider() + { + @Override + public CqlSession getCqlSession() + { + return mySession; + } + + @Override + public List getNodes() + { + return nodesList; + } + }; + } + + @AfterClass + public static void tearDownCluster() + { + // Stop all nodes + for (CassandraContainer node : nodes) + { + node.stop(); + } + } + + public static DistributedNativeConnectionProvider getNativeConnectionProvider() + { + return myNativeConnectionProvider; + } +} diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 343e19d1..bdc61601 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -104,6 +104,28 @@ CREATE TABLE ecchronos.nodes_sync node_id DESC ); ``` +### Repair History + +A RepairHistory table to store relevant repair operation data. Data can be retrieved for auditing or debugging purposes. + +```cql +CREATE TABLE ecchronos.repair_history( + table_id uuid, + node_id uuid, + repair_id timeuuid, + job_id uuid, + coordinator_id uuid, + range_begin text, + range_end text, + participants set, + status text, + started_at timestamp, + finished_at timestamp, + PRIMARY KEY((table_id,node_id), repair_id)) + WITH compaction = {'class': 'TimeWindowCompactionStrategy'} + AND default_time_to_live = 2592000 + AND CLUSTERING ORDER BY (repair_id DESC); +``` ### Leases