Skip to content

Commit

Permalink
Read state store commit applied event
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Aug 13, 2024
1 parent 5422fdc commit 72b487e
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ public StateStoreCommitRequest applyFromJson(String json) throws StateStoreExcep
*/
public void apply(StateStoreCommitRequest request) throws StateStoreException {
request.apply(this);
LOGGER.info("Applied request to table ID {} with type {}",
request.getTableId(), request.getRequest().getClass().getSimpleName());
LOGGER.info("Applied request to table ID {} with type {} at time {}",
request.getTableId(), request.getRequest().getClass().getSimpleName(), Instant.now());
}

void commitCompaction(CompactionJobCommitRequest request) throws StateStoreException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,22 @@
*/
package sleeper.systemtest.drivers.statestore;

import sleeper.systemtest.dsl.statestore.StateStoreCommitSummary;

import java.time.Instant;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class StateStoreCommitterLogEntry {

private static final Pattern MESSAGE_PATTERN = Pattern.compile("Lambda started at (.+)|Lambda finished at ([^ ]+) ");

private final String message;
private final Object event;
private static final Pattern MESSAGE_PATTERN = Pattern.compile("Lambda started at (.+)|Lambda finished at ([^ ]+) |Applied request to table ID ([^ ]+) with type ([^ ]+) at time ([^ ]+)");

private StateStoreCommitterLogEntry(String message, Object event) {
this.message = message;
this.event = event;
private StateStoreCommitterLogEntry() {
}

public static StateStoreCommitterLogEntry from(String message) {
public static Object readEvent(String message) {
Matcher matcher = MESSAGE_PATTERN.matcher(message);
return new StateStoreCommitterLogEntry(message, readEvent(matcher));
}

private static Object readEvent(Matcher matcher) {
if (!matcher.find()) {
return null;
}
Expand All @@ -49,17 +42,15 @@ private static Object readEvent(Matcher matcher) {
if (finishTime != null) {
return new LambdaFinished(Instant.parse(finishTime));
}
String tableId = matcher.group(3);
if (tableId != null) {
String type = matcher.group(4);
String commitTime = matcher.group(5);
return new StateStoreCommitSummary(tableId, type, Instant.parse(commitTime));
}
return null;
}

public String getMessage() {
return message;
}

public Object getEvent() {
return event;
}

public static class LambdaStarted {
private final Instant startTime;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -34,7 +34,7 @@

public class StateStoreCommitterRunsBuilder {

private final Map<String, LogStream> logStreamByName = new HashMap<>();
private final Map<String, LogStream> logStreamByName = new LinkedHashMap<>();

public void add(List<ResultField> entry) {
LogStream logStream = null;
Expand All @@ -50,7 +50,7 @@ public void add(List<ResultField> entry) {
}
Objects.requireNonNull(logStream, "Log stream not found");
Objects.requireNonNull(message, "Log message not found");
logStream.add(StateStoreCommitterLogEntry.from(message));
logStream.add(StateStoreCommitterLogEntry.readEvent(message));
}

public List<StateStoreCommitterRun> buildRuns() {
Expand All @@ -65,13 +65,14 @@ private static class LogStream {
private final List<LambdaRun> runs = new ArrayList<>();
private LambdaRun lastRun;

void add(StateStoreCommitterLogEntry entry) {
Object event = entry.getEvent();
void add(Object event) {
if (event instanceof LambdaStarted) {
lastRun = new LambdaRun((LambdaStarted) event);
runs.add(lastRun);
} else if (event instanceof LambdaFinished) {
lastRun.finished((LambdaFinished) event);
} else if (event instanceof StateStoreCommitSummary) {
lastRun.committed((StateStoreCommitSummary) event);
}
}

Expand All @@ -93,6 +94,10 @@ void finished(LambdaFinished event) {
finishTime = event.getFinishTime();
}

void committed(StateStoreCommitSummary commit) {
commits.add(commit);
}

StateStoreCommitterRun build() {
return new StateStoreCommitterRun(startTime, finishTime, commits);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,21 @@

import sleeper.systemtest.drivers.statestore.StateStoreCommitterLogEntry.LambdaFinished;
import sleeper.systemtest.drivers.statestore.StateStoreCommitterLogEntry.LambdaStarted;
import sleeper.systemtest.dsl.statestore.StateStoreCommitSummary;

import java.time.Instant;

import static org.assertj.core.api.Assertions.assertThat;

public class StateStoreCommitterLogTest {
public class StateStoreCommitterLogEntryTest {

@Test
void shouldReadLambdaStarted() {
// Given
String message = "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda started at 2024-08-13T12:12:00Z";

// When
StateStoreCommitterLogEntry log = StateStoreCommitterLogEntry.from(message);

// Then
assertThat(log.getEvent()).isEqualTo(
// When / Then
assertThat(StateStoreCommitterLogEntry.readEvent(message)).isEqualTo(
new LambdaStarted(Instant.parse("2024-08-13T12:12:00Z")));
}

Expand All @@ -44,12 +42,18 @@ void shouldReadLambdaFinished() {
// Given
String message = "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda finished at 2024-08-13T12:13:00Z (ran for 1 minute)";

// When
StateStoreCommitterLogEntry log = StateStoreCommitterLogEntry.from(message);

// Then
assertThat(log.getEvent()).isEqualTo(
// When / Then
assertThat(StateStoreCommitterLogEntry.readEvent(message)).isEqualTo(
new LambdaFinished(Instant.parse("2024-08-13T12:13:00Z")));
}

@Test
void shouldReadCommitApplied() {
// Given
String message = "[main] sleeper.commit.StateStoreCommitter INFO - Applied request to table ID test-table with type TestRequest at time 2024-08-13T12:12:30Z";

// When / Then
assertThat(StateStoreCommitterLogEntry.readEvent(message)).isEqualTo(
new StateStoreCommitSummary("test-table", "TestRequest", Instant.parse("2024-08-13T12:12:30Z")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.cloudwatchlogs.model.ResultField;

import sleeper.systemtest.dsl.statestore.StateStoreCommitSummary;
import sleeper.systemtest.dsl.statestore.StateStoreCommitterRun;

import java.time.Duration;
import java.time.Instant;
import java.util.List;

Expand All @@ -31,23 +31,44 @@ public class StateStoreCommitterRunsBuilderTest {

@Test
void shouldBuildSingleRunNoCommits() {
add(
"[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda started at 2024-08-13T12:12:00Z",
"[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda finished at 2024-08-13T12:13:00Z (ran for 1 minute)");
// Given
add("test-logstream", "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda started at 2024-08-13T12:12:00Z");
add("test-logstream", "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda finished at 2024-08-13T12:13:00Z (ran for 1 minute)");

// When / Then
assertThat(builder.buildRuns()).containsExactly(
new StateStoreCommitterRun(Instant.parse("2024-08-13T12:12:00Z"), Instant.parse("2024-08-13T12:13:00Z"), List.of()));
}

void add(String... messages) {
Instant startTime = Instant.now();
for (int i = 0; i < messages.length; i++) {
add(startTime.plus(Duration.ofMillis(i)), "test-logstream", messages[i]);
}
@Test
void shouldBuildOverlappingRunsOnTwoLogStreams() {
// Given
add("stream-1", "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda started at 2024-08-13T12:12:00Z");
add("stream-2", "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda started at 2024-08-13T12:12:30Z");
add("stream-1", "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda finished at 2024-08-13T12:13:00Z (ran for 1 minute)");
add("stream-2", "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda finished at 2024-08-13T12:13:30Z (ran for 1 minute)");

// When / Then
assertThat(builder.buildRuns()).containsExactly(
new StateStoreCommitterRun(Instant.parse("2024-08-13T12:12:00Z"), Instant.parse("2024-08-13T12:13:00Z"), List.of()),
new StateStoreCommitterRun(Instant.parse("2024-08-13T12:12:30Z"), Instant.parse("2024-08-13T12:13:30Z"), List.of()));
}

@Test
void shouldBuildSingleRunWithOneCommit() {
// Given
add("test-logstream", "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda started at 2024-08-13T12:12:00Z");
add("test-logstream", "[main] sleeper.commit.StateStoreCommitter INFO - Applied request to table ID test-table with type TestRequest at time 2024-08-13T12:12:30Z");
add("test-logstream", "[main] committer.lambda.StateStoreCommitterLambda INFO - Lambda finished at 2024-08-13T12:13:00Z (ran for 1 minute)");

// When / Then
assertThat(builder.buildRuns()).containsExactly(
new StateStoreCommitterRun(Instant.parse("2024-08-13T12:12:00Z"), Instant.parse("2024-08-13T12:13:00Z"),
List.of(new StateStoreCommitSummary("test-table", "TestRequest", Instant.parse("2024-08-13T12:12:30Z")))));
}

void add(Instant timestamp, String logStream, String message) {
void add(String logStream, String message) {
builder.add(List.of(
ResultField.builder().field("@timestamp").value("" + timestamp.toEpochMilli()).build(),
ResultField.builder().field("@logStream").value(logStream).build(),
ResultField.builder().field("@message").value(message).build()));
}
Expand Down

0 comments on commit 72b487e

Please sign in to comment.