Skip to content

Commit

Permalink
Remove count of commits in SystemTestStateStoreFakeCommits
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Aug 12, 2024
1 parent e3d92d7 commit 80c3276
Showing 1 changed file with 2 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,29 @@
import sleeper.systemtest.dsl.SystemTestContext;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Stream;

public class SystemTestStateStoreFakeCommits {

private final SystemTestInstanceContext instance;
private final StateStoreCommitterDriver driver;
private final AtomicInteger commitsSent = new AtomicInteger();

public SystemTestStateStoreFakeCommits(SystemTestContext context) {
instance = context.instance();
driver = context.instance().adminDrivers().stateStoreCommitter(context);
}

public SystemTestStateStoreFakeCommits sendBatched(Function<StateStoreCommitMessageFactory, Stream<StateStoreCommitMessage>> buildCommits) {
send(buildCommits.apply(messageFactory()));
driver.sendCommitMessages(buildCommits.apply(messageFactory()));
return this;
}

public SystemTestStateStoreFakeCommits send(Function<StateStoreCommitMessageFactory, StateStoreCommitMessage> buildCommit) {
send(Stream.of(buildCommit.apply(messageFactory())));
driver.sendCommitMessages(Stream.of(buildCommit.apply(messageFactory())));
return this;
}

private void send(Stream<StateStoreCommitMessage> messages) {
driver.sendCommitMessages(messages
.peek(message -> commitsSent.incrementAndGet()));
}

private StateStoreCommitMessageFactory messageFactory() {
return new StateStoreCommitMessageFactory(instance.getTableStatus().getTableUniqueId());
}
Expand Down

0 comments on commit 80c3276

Please sign in to comment.