From 5422fdc8b3b31e1672be700683fa5ff613d278d3 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Tue, 13 Aug 2024 11:49:35 +0000 Subject: [PATCH] Add StateStoreCommitterRunsBuilder --- .../StateStoreCommitterLogEntry.java | 131 ++++++++++++++++++ .../StateStoreCommitterRunsBuilder.java | 101 ++++++++++++++ .../StateStoreCommitterLogTest.java | 55 ++++++++ .../StateStoreCommitterRunsBuilderTest.java | 55 ++++++++ .../statestore/StateStoreCommitSummary.java | 23 +++ .../statestore/StateStoreCommitterRun.java | 28 +++- 6 files changed, 386 insertions(+), 7 deletions(-) create mode 100644 java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterLogEntry.java create mode 100644 java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterRunsBuilder.java create mode 100644 java/system-test/system-test-drivers/src/test/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterLogTest.java create mode 100644 java/system-test/system-test-drivers/src/test/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterRunsBuilderTest.java diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterLogEntry.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterLogEntry.java new file mode 100644 index 0000000000..305f320678 --- /dev/null +++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterLogEntry.java @@ -0,0 +1,131 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.systemtest.drivers.statestore; + +import java.time.Instant; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class StateStoreCommitterLogEntry { + + private static final Pattern MESSAGE_PATTERN = Pattern.compile("Lambda started at (.+)|Lambda finished at ([^ ]+) "); + + private final String message; + private final Object event; + + private StateStoreCommitterLogEntry(String message, Object event) { + this.message = message; + this.event = event; + } + + public static StateStoreCommitterLogEntry from(String message) { + Matcher matcher = MESSAGE_PATTERN.matcher(message); + return new StateStoreCommitterLogEntry(message, readEvent(matcher)); + } + + private static Object readEvent(Matcher matcher) { + if (!matcher.find()) { + return null; + } + String startTime = matcher.group(1); + if (startTime != null) { + return new LambdaStarted(Instant.parse(startTime)); + } + String finishTime = matcher.group(2); + if (finishTime != null) { + return new LambdaFinished(Instant.parse(finishTime)); + } + return null; + } + + public String getMessage() { + return message; + } + + public Object getEvent() { + return event; + } + + public static class LambdaStarted { + private final Instant startTime; + + public LambdaStarted(Instant startTime) { + this.startTime = startTime; + } + + public Instant getStartTime() { + return startTime; + } + + @Override + public int hashCode() { + return Objects.hash(startTime); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof LambdaStarted)) { + return false; + } + LambdaStarted other = (LambdaStarted) obj; + return Objects.equals(startTime, other.startTime); + } + + @Override + public String toString() { + return "LambdaStarted{startTime=" + startTime + "}"; + } + } + + public static class LambdaFinished { + private final Instant finishTime; + + public LambdaFinished(Instant finishTime) { + this.finishTime = finishTime; + } + + public Instant getFinishTime() { + return finishTime; + } + + @Override + public int hashCode() { + return Objects.hash(finishTime); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof LambdaFinished)) { + return false; + } + LambdaFinished other = (LambdaFinished) obj; + return Objects.equals(finishTime, other.finishTime); + } + + @Override + public String toString() { + return "LambdaFinished{finishTime=" + finishTime + "}"; + } + } + +} diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterRunsBuilder.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterRunsBuilder.java new file mode 100644 index 0000000000..1c4331bac7 --- /dev/null +++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterRunsBuilder.java @@ -0,0 +1,101 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.systemtest.drivers.statestore; + +import software.amazon.awssdk.services.cloudwatchlogs.model.ResultField; + +import sleeper.systemtest.drivers.statestore.StateStoreCommitterLogEntry.LambdaFinished; +import sleeper.systemtest.drivers.statestore.StateStoreCommitterLogEntry.LambdaStarted; +import sleeper.systemtest.dsl.statestore.StateStoreCommitSummary; +import sleeper.systemtest.dsl.statestore.StateStoreCommitterRun; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toUnmodifiableList; + +public class StateStoreCommitterRunsBuilder { + + private final Map logStreamByName = new HashMap<>(); + + public void add(List entry) { + LogStream logStream = null; + String message = null; + for (ResultField field : entry) { + switch (field.field()) { + case "@logStream": + logStream = logStreamByName.computeIfAbsent(field.value(), name -> new LogStream()); + case "@message": + message = field.value(); + default: + } + } + Objects.requireNonNull(logStream, "Log stream not found"); + Objects.requireNonNull(message, "Log message not found"); + logStream.add(StateStoreCommitterLogEntry.from(message)); + } + + public List buildRuns() { + return logStreamByName.values().stream() + .flatMap(LogStream::runs) + .map(LambdaRun::build) + .collect(toUnmodifiableList()); + } + + private static class LogStream { + + private final List runs = new ArrayList<>(); + private LambdaRun lastRun; + + void add(StateStoreCommitterLogEntry entry) { + Object event = entry.getEvent(); + if (event instanceof LambdaStarted) { + lastRun = new LambdaRun((LambdaStarted) event); + runs.add(lastRun); + } else if (event instanceof LambdaFinished) { + lastRun.finished((LambdaFinished) event); + } + } + + Stream runs() { + return runs.stream(); + } + } + + private static class LambdaRun { + private final Instant startTime; + private Instant finishTime; + private List commits = new ArrayList<>(); + + LambdaRun(LambdaStarted event) { + this.startTime = event.getStartTime(); + } + + void finished(LambdaFinished event) { + finishTime = event.getFinishTime(); + } + + StateStoreCommitterRun build() { + return new StateStoreCommitterRun(startTime, finishTime, commits); + } + } + +} diff --git a/java/system-test/system-test-drivers/src/test/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterLogTest.java b/java/system-test/system-test-drivers/src/test/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterLogTest.java new file mode 100644 index 0000000000..4036b81350 --- /dev/null +++ b/java/system-test/system-test-drivers/src/test/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterLogTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.systemtest.drivers.statestore; + +import org.junit.jupiter.api.Test; + +import sleeper.systemtest.drivers.statestore.StateStoreCommitterLogEntry.LambdaFinished; +import sleeper.systemtest.drivers.statestore.StateStoreCommitterLogEntry.LambdaStarted; + +import java.time.Instant; + +import static org.assertj.core.api.Assertions.assertThat; + +public class StateStoreCommitterLogTest { + + @Test + void shouldReadLambdaStarted() { + // Given + String message = "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda started at 2024-08-13T12:12:00Z"; + + // When + StateStoreCommitterLogEntry log = StateStoreCommitterLogEntry.from(message); + + // Then + assertThat(log.getEvent()).isEqualTo( + new LambdaStarted(Instant.parse("2024-08-13T12:12:00Z"))); + } + + @Test + void shouldReadLambdaFinished() { + // Given + String message = "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda finished at 2024-08-13T12:13:00Z (ran for 1 minute)"; + + // When + StateStoreCommitterLogEntry log = StateStoreCommitterLogEntry.from(message); + + // Then + assertThat(log.getEvent()).isEqualTo( + new LambdaFinished(Instant.parse("2024-08-13T12:13:00Z"))); + } + +} diff --git a/java/system-test/system-test-drivers/src/test/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterRunsBuilderTest.java b/java/system-test/system-test-drivers/src/test/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterRunsBuilderTest.java new file mode 100644 index 0000000000..f5d6a6f6d1 --- /dev/null +++ b/java/system-test/system-test-drivers/src/test/java/sleeper/systemtest/drivers/statestore/StateStoreCommitterRunsBuilderTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.systemtest.drivers.statestore; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.cloudwatchlogs.model.ResultField; + +import sleeper.systemtest.dsl.statestore.StateStoreCommitterRun; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class StateStoreCommitterRunsBuilderTest { + StateStoreCommitterRunsBuilder builder = new StateStoreCommitterRunsBuilder(); + + @Test + void shouldBuildSingleRunNoCommits() { + add( + "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda started at 2024-08-13T12:12:00Z", + "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda finished at 2024-08-13T12:13:00Z (ran for 1 minute)"); + assertThat(builder.buildRuns()).containsExactly( + new StateStoreCommitterRun(Instant.parse("2024-08-13T12:12:00Z"), Instant.parse("2024-08-13T12:13:00Z"), List.of())); + } + + void add(String... messages) { + Instant startTime = Instant.now(); + for (int i = 0; i < messages.length; i++) { + add(startTime.plus(Duration.ofMillis(i)), "test-logstream", messages[i]); + } + } + + void add(Instant timestamp, String logStream, String message) { + builder.add(List.of( + ResultField.builder().field("@timestamp").value("" + timestamp.toEpochMilli()).build(), + ResultField.builder().field("@logStream").value(logStream).build(), + ResultField.builder().field("@message").value(message).build())); + } + +} diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/StateStoreCommitSummary.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/StateStoreCommitSummary.java index 2ba8810192..844559da80 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/StateStoreCommitSummary.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/StateStoreCommitSummary.java @@ -16,6 +16,7 @@ package sleeper.systemtest.dsl.statestore; import java.time.Instant; +import java.util.Objects; public class StateStoreCommitSummary { private final String tableId; @@ -39,4 +40,26 @@ public String getType() { public Instant getFinishTime() { return finishTime; } + + @Override + public int hashCode() { + return Objects.hash(tableId, type, finishTime); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof StateStoreCommitSummary)) { + return false; + } + StateStoreCommitSummary other = (StateStoreCommitSummary) obj; + return Objects.equals(tableId, other.tableId) && Objects.equals(type, other.type) && Objects.equals(finishTime, other.finishTime); + } + + @Override + public String toString() { + return "StateStoreCommitSummary{tableId=" + tableId + ", type=" + type + ", finishTime=" + finishTime + "}"; + } } diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/StateStoreCommitterRun.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/StateStoreCommitterRun.java index 27010627d1..b35a291b6f 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/StateStoreCommitterRun.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/statestore/StateStoreCommitterRun.java @@ -17,10 +17,7 @@ import java.time.Instant; import java.util.List; -import java.util.Map; - -import static java.util.stream.Collectors.groupingBy; -import static java.util.stream.Collectors.summingInt; +import java.util.Objects; public class StateStoreCommitterRun { @@ -46,8 +43,25 @@ public List getCommits() { return commits; } - public Map countCommitsByTableId() { - return commits.stream() - .collect(groupingBy(StateStoreCommitSummary::getTableId, summingInt(commit -> 1))); + @Override + public int hashCode() { + return Objects.hash(startTime, finishTime, commits); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof StateStoreCommitterRun)) { + return false; + } + StateStoreCommitterRun other = (StateStoreCommitterRun) obj; + return Objects.equals(startTime, other.startTime) && Objects.equals(finishTime, other.finishTime) && Objects.equals(commits, other.commits); + } + + @Override + public String toString() { + return "StateStoreCommitterRun{startTime=" + startTime + ", finishTime=" + finishTime + ", commits=" + commits + "}"; } }