Skip to content

Commit

Permalink
Add StateStoreCommitterRunsBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Aug 13, 2024
1 parent 0c9f094 commit 5422fdc
Show file tree
Hide file tree
Showing 6 changed files with 386 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.systemtest.drivers.statestore;

import java.time.Instant;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class StateStoreCommitterLogEntry {

private static final Pattern MESSAGE_PATTERN = Pattern.compile("Lambda started at (.+)|Lambda finished at ([^ ]+) ");

private final String message;
private final Object event;

private StateStoreCommitterLogEntry(String message, Object event) {
this.message = message;
this.event = event;
}

public static StateStoreCommitterLogEntry from(String message) {
Matcher matcher = MESSAGE_PATTERN.matcher(message);
return new StateStoreCommitterLogEntry(message, readEvent(matcher));
}

private static Object readEvent(Matcher matcher) {
if (!matcher.find()) {
return null;
}
String startTime = matcher.group(1);
if (startTime != null) {
return new LambdaStarted(Instant.parse(startTime));
}
String finishTime = matcher.group(2);
if (finishTime != null) {
return new LambdaFinished(Instant.parse(finishTime));
}
return null;
}

public String getMessage() {
return message;
}

public Object getEvent() {
return event;
}

public static class LambdaStarted {
private final Instant startTime;

public LambdaStarted(Instant startTime) {
this.startTime = startTime;
}

public Instant getStartTime() {
return startTime;
}

@Override
public int hashCode() {
return Objects.hash(startTime);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof LambdaStarted)) {
return false;
}
LambdaStarted other = (LambdaStarted) obj;
return Objects.equals(startTime, other.startTime);
}

@Override
public String toString() {
return "LambdaStarted{startTime=" + startTime + "}";
}
}

public static class LambdaFinished {
private final Instant finishTime;

public LambdaFinished(Instant finishTime) {
this.finishTime = finishTime;
}

public Instant getFinishTime() {
return finishTime;
}

@Override
public int hashCode() {
return Objects.hash(finishTime);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof LambdaFinished)) {
return false;
}
LambdaFinished other = (LambdaFinished) obj;
return Objects.equals(finishTime, other.finishTime);
}

@Override
public String toString() {
return "LambdaFinished{finishTime=" + finishTime + "}";
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.systemtest.drivers.statestore;

import software.amazon.awssdk.services.cloudwatchlogs.model.ResultField;

import sleeper.systemtest.drivers.statestore.StateStoreCommitterLogEntry.LambdaFinished;
import sleeper.systemtest.drivers.statestore.StateStoreCommitterLogEntry.LambdaStarted;
import sleeper.systemtest.dsl.statestore.StateStoreCommitSummary;
import sleeper.systemtest.dsl.statestore.StateStoreCommitterRun;

import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toUnmodifiableList;

public class StateStoreCommitterRunsBuilder {

private final Map<String, LogStream> logStreamByName = new HashMap<>();

public void add(List<ResultField> entry) {
LogStream logStream = null;
String message = null;
for (ResultField field : entry) {
switch (field.field()) {
case "@logStream":
logStream = logStreamByName.computeIfAbsent(field.value(), name -> new LogStream());
case "@message":
message = field.value();
default:
}
}
Objects.requireNonNull(logStream, "Log stream not found");
Objects.requireNonNull(message, "Log message not found");
logStream.add(StateStoreCommitterLogEntry.from(message));
}

public List<StateStoreCommitterRun> buildRuns() {
return logStreamByName.values().stream()
.flatMap(LogStream::runs)
.map(LambdaRun::build)
.collect(toUnmodifiableList());
}

private static class LogStream {

private final List<LambdaRun> runs = new ArrayList<>();
private LambdaRun lastRun;

void add(StateStoreCommitterLogEntry entry) {
Object event = entry.getEvent();
if (event instanceof LambdaStarted) {
lastRun = new LambdaRun((LambdaStarted) event);
runs.add(lastRun);
} else if (event instanceof LambdaFinished) {
lastRun.finished((LambdaFinished) event);
}
}

Stream<LambdaRun> runs() {
return runs.stream();
}
}

private static class LambdaRun {
private final Instant startTime;
private Instant finishTime;
private List<StateStoreCommitSummary> commits = new ArrayList<>();

LambdaRun(LambdaStarted event) {
this.startTime = event.getStartTime();
}

void finished(LambdaFinished event) {
finishTime = event.getFinishTime();
}

StateStoreCommitterRun build() {
return new StateStoreCommitterRun(startTime, finishTime, commits);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.systemtest.drivers.statestore;

import org.junit.jupiter.api.Test;

import sleeper.systemtest.drivers.statestore.StateStoreCommitterLogEntry.LambdaFinished;
import sleeper.systemtest.drivers.statestore.StateStoreCommitterLogEntry.LambdaStarted;

import java.time.Instant;

import static org.assertj.core.api.Assertions.assertThat;

public class StateStoreCommitterLogTest {

@Test
void shouldReadLambdaStarted() {
// Given
String message = "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda started at 2024-08-13T12:12:00Z";

// When
StateStoreCommitterLogEntry log = StateStoreCommitterLogEntry.from(message);

// Then
assertThat(log.getEvent()).isEqualTo(
new LambdaStarted(Instant.parse("2024-08-13T12:12:00Z")));
}

@Test
void shouldReadLambdaFinished() {
// Given
String message = "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda finished at 2024-08-13T12:13:00Z (ran for 1 minute)";

// When
StateStoreCommitterLogEntry log = StateStoreCommitterLogEntry.from(message);

// Then
assertThat(log.getEvent()).isEqualTo(
new LambdaFinished(Instant.parse("2024-08-13T12:13:00Z")));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.systemtest.drivers.statestore;

import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.cloudwatchlogs.model.ResultField;

import sleeper.systemtest.dsl.statestore.StateStoreCommitterRun;

import java.time.Duration;
import java.time.Instant;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

public class StateStoreCommitterRunsBuilderTest {
StateStoreCommitterRunsBuilder builder = new StateStoreCommitterRunsBuilder();

@Test
void shouldBuildSingleRunNoCommits() {
add(
"[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda started at 2024-08-13T12:12:00Z",
"[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda finished at 2024-08-13T12:13:00Z (ran for 1 minute)");
assertThat(builder.buildRuns()).containsExactly(
new StateStoreCommitterRun(Instant.parse("2024-08-13T12:12:00Z"), Instant.parse("2024-08-13T12:13:00Z"), List.of()));
}

void add(String... messages) {
Instant startTime = Instant.now();
for (int i = 0; i < messages.length; i++) {
add(startTime.plus(Duration.ofMillis(i)), "test-logstream", messages[i]);
}
}

void add(Instant timestamp, String logStream, String message) {
builder.add(List.of(
ResultField.builder().field("@timestamp").value("" + timestamp.toEpochMilli()).build(),
ResultField.builder().field("@logStream").value(logStream).build(),
ResultField.builder().field("@message").value(message).build()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package sleeper.systemtest.dsl.statestore;

import java.time.Instant;
import java.util.Objects;

public class StateStoreCommitSummary {
private final String tableId;
Expand All @@ -39,4 +40,26 @@ public String getType() {
public Instant getFinishTime() {
return finishTime;
}

@Override
public int hashCode() {
return Objects.hash(tableId, type, finishTime);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof StateStoreCommitSummary)) {
return false;
}
StateStoreCommitSummary other = (StateStoreCommitSummary) obj;
return Objects.equals(tableId, other.tableId) && Objects.equals(type, other.type) && Objects.equals(finishTime, other.finishTime);
}

@Override
public String toString() {
return "StateStoreCommitSummary{tableId=" + tableId + ", type=" + type + ", finishTime=" + finishTime + "}";
}
}
Loading

0 comments on commit 5422fdc

Please sign in to comment.