Skip to content

Commit

Permalink
[proxima-direct-transaction-manager] #324 wait for watermark before c…
Browse files Browse the repository at this point in the history
…ommit
  • Loading branch information
je-ik committed Feb 7, 2024
1 parent 7f51d93 commit 4b55809
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ private void exitThreads() {

@Override
public boolean onNext(StreamElement element, OnNextContext context) {
if (log.isDebugEnabled()) {
log.debug("Processing input element {} at watermark {}", element, context.getWatermark());
}
return !ExceptionUtils.ignoringInterrupted(
() ->
workQueues
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.com.google.common.collect.Lists;
import cz.o2.proxima.internal.com.google.common.collect.Sets;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -65,6 +63,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -183,7 +182,8 @@ CompletableFuture<Response> open(List<KeyAttribute> inputAttrs) {
transactionId,
attributeToFamily.get(responseDesc),
(reqId, response) ->
Objects.requireNonNull(requestFutures.remove(reqId)).complete(response));
Optional.ofNullable(requestFutures.remove(reqId))
.ifPresent(c -> c.complete(response)));
return sendRequest(
Request.builder().flags(Flags.OPEN).inputAttributes(inputAttrs).build(), request)
.thenCompose(ign -> res);
Expand Down Expand Up @@ -333,6 +333,7 @@ public void withHandle(ObserveHandle handle) {
@Getter private final Wildcard<Response> responseDesc;
@Getter private final Regular<State> stateDesc;
@Getter private final Regular<Commit> commitDesc;
private final Random random = new Random();
private final Map<String, CachedTransaction> openTransactionMap = new ConcurrentHashMap<>();
private final Map<AttributeFamilyDescriptor, CachedWriters> cachedAccessors =
new ConcurrentHashMap<>();
Expand Down Expand Up @@ -450,6 +451,7 @@ public void runObservations(
getDeclaredParallelism(requestObserver)
.orElse(Runtime.getRuntime().availableProcessors()));
}
log.debug("Running transaction observation with observer {}", effectiveObserver);

List<Set<String>> families =
direct
Expand Down Expand Up @@ -609,27 +611,17 @@ private void addTransactionResponseConsumer(
k -> {
log.debug("Starting to observe family {} with URI {}", k, k.getDesc().getStorageUri());
HandleWithAssignment assignment = new HandleWithAssignment();
CommitLogReader reader = Optionals.get(k.getCommitLogReader());
List<Partition> partitions = reader.getPartitions();
Partition partition = partitions.get(random.nextInt(partitions.size()));
assignment.withHandle(
Optionals.get(k.getCommitLogReader())
.observe(
newResponseObserverNameFor(k), newTransactionResponseObserver(assignment)));
reader.observePartitions(
Collections.singleton(partition), newTransactionResponseObserver(assignment)));
ExceptionUtils.ignoringInterrupted(() -> assignment.getObserveHandle().waitUntilReady());
return assignment;
});
}

protected String newResponseObserverNameFor(DirectAttributeFamilyDescriptor k) {
String localhost = "localhost";
try {
localhost = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
log.warn("Error getting name of localhost, using {} instead.", localhost, e);
}
return "transaction-response-observer-"
+ k.getDesc().getName()
+ (localhost.hashCode() & Integer.MAX_VALUE);
}

private CommitLogObserver newTransactionResponseObserver(HandleWithAssignment assignment) {
Preconditions.checkArgument(assignment != null);
return new CommitLogObserver() {
Expand Down Expand Up @@ -814,7 +806,9 @@ private CachedTransaction getOrCreateCachedTransaction(String transactionId, Sta

TransactionMode mode = attributes.get(0).getTransactionMode();
Preconditions.checkArgument(
attributes.stream().allMatch(a -> a.getTransactionMode() == mode),
attributes.stream()
.filter(a -> a.getTransactionMode() != TransactionMode.NONE)
.allMatch(a -> a.getTransactionMode() == mode),
"All passed attributes must have the same transaction mode. Got attributes %s.",
attributes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,17 @@ public static class TransactionRejectedException extends Exception {
private final String transactionId;
private final Response.Flags responseFlags;

protected TransactionRejectedException(String transactionId, Response.Flags flags) {
public TransactionRejectedException(String transactionId, Response.Flags flags) {
this(transactionId, flags, null);
}

public TransactionRejectedException(
String transactionId, Response.Flags flags, Throwable cause) {
super(
String.format(
"Transaction %s rejected with flags %s. Please restart the transaction.",
transactionId, flags));
transactionId, flags),
cause);
this.transactionId = transactionId;
this.responseFlags = flags;
}
Expand Down Expand Up @@ -205,6 +211,8 @@ public void update(List<KeyAttribute> addedInputs) {
switch (state) {
case UNKNOWN:
future = manager.begin(transactionId, addedInputs);
// update to open, though technically will be open after receiving response
state = State.Flags.OPEN;
break;
case OPEN:
future = manager.updateTransaction(transactionId, addedInputs);
Expand All @@ -222,24 +230,29 @@ public void commitWrite(List<StreamElement> outputs, CommitCallback callback) {
// FIXME: this should be removed once we write through coordinator
final CommitContext commitContext = new CommitContext(outputs);
this.commitAttempted = true;
waitForInflight()
waitForInFlight()
.thenCompose(ign -> runTransforms(commitContext))
// need to wait for any requests added during transforms
.thenCompose(ign -> waitForInflight())
.thenCompose(ign -> waitForInFlight())
.thenCompose(ign -> sendCommitRequest(commitContext))
.thenCompose(response -> handleCommitResponse(response, commitContext))
.completeOnTimeout(
false, manager.getCfg().getTransactionTimeoutMs(), TimeUnit.MILLISECONDS)
.whenComplete(
(ign, err) -> {
manager.release(transactionId);
if (err instanceof CompletionException) {
(finished, err) -> {
if (err instanceof CompletionException && err.getCause() != null) {
callback.commit(false, err.getCause());
} else {
} else if (Boolean.TRUE.equals(finished)) {
callback.commit(err == null, err);
} else {
// timeout
callback.commit(
false, new TransactionRejectedException(transactionId, Flags.ABORTED));
}
});
}

private CompletableFuture<?> waitForInflight() {
private CompletableFuture<?> waitForInFlight() {
final CompletableFuture<?> unfinished;
synchronized (runningUpdates) {
unfinished = CompletableFuture.allOf(runningUpdates.toArray(new CompletableFuture<?>[] {}));
Expand Down Expand Up @@ -274,15 +287,12 @@ private CompletableFuture<Response> sendCommitRequest(CommitContext context) {
Collection<StreamElement> transformed = context.getTransformedElements();
List<KeyAttribute> keyAttributes =
transformed.stream().map(KeyAttributes::ofStreamElement).collect(Collectors.toList());
return manager
.commit(transactionId, keyAttributes)
.completeOnTimeout(
Response.empty().aborted(),
manager.getCfg().getTransactionTimeoutMs(),
TimeUnit.MILLISECONDS);
return manager.commit(transactionId, keyAttributes);
}

private CompletableFuture<Void> handleCommitResponse(Response response, CommitContext context) {
private CompletableFuture<Boolean> handleCommitResponse(
Response response, CommitContext context) {

if (response.getFlags() != Flags.COMMITTED) {
if (response.getFlags() == Flags.ABORTED) {
state = State.Flags.ABORTED;
Expand All @@ -292,7 +302,7 @@ private CompletableFuture<Void> handleCommitResponse(Response response, CommitCo
}
Collection<StreamElement> transformed = context.getTransformedElements();
StreamElement toWrite = context.getToWrite();
CompletableFuture<Void> result = new CompletableFuture<>();
CompletableFuture<Boolean> result = new CompletableFuture<>();
CommitCallback compositeCallback =
(succ, exc) -> {
if (!succ) {
Expand All @@ -309,7 +319,8 @@ private CompletableFuture<Void> handleCommitResponse(Response response, CommitCo
if (exc != null) {
result.completeExceptionally(exc);
} else {
result.complete(null);
// true = completed without timeout
result.complete(true);
}
};
state = State.Flags.COMMITTED;
Expand Down Expand Up @@ -466,23 +477,24 @@ public void close() {
}

public CompletableFuture<Void> rollback() {
return manager.rollback(transactionId).thenAccept(ign -> manager.release(transactionId));
return manager.rollback(transactionId).thenApply(r -> null);
}

public void sync() throws TransactionRejectedException {
try {
waitForInflight().get();
waitForInFlight().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TransactionRejectedException(getTransactionId(), Flags.ABORTED);
} catch (ExecutionException e) {
if (e.getCause() instanceof TransactionRejectedException) {
throw (TransactionRejectedException) e.getCause();
}
TransactionRejectedException exc =
new TransactionRejectedException(getTransactionId(), Flags.ABORTED);
exc.addSuppressed(e);
throw exc;
if (e.getCause() != null) {
throw new TransactionRejectedException(getTransactionId(), Flags.ABORTED, e.getCause());
} else {
throw new TransactionRejectedException(getTransactionId(), Flags.ABORTED, e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public class InMemStorage implements DataAccessorFactory {
private static final Partition SINGLE_PARTITION = Partition.of(0);
private static final OnNextContext CONTEXT = BatchLogObservers.defaultContext(SINGLE_PARTITION);
private static final long IDLE_FLUSH_TIME = 500L;
private static final long BOUNDED_OUT_OF_ORDERNESS = 5000L;
private static final long BOUNDED_OUT_OF_ORDERNESS = 2000L;

public static class ConsumedOffset implements Offset {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public void testTransactionCreateUpdateCommitMultipleOutputs()
toReturn.add(Response.forRequest(anyRequest()).updated());
toReturn.add(Response.forRequest(anyRequest()).committed());
KeyAttribute ka = KeyAttributes.ofAttributeDescriptor(gateway, "key", status, 1L);
CountDownLatch latch = new CountDownLatch(1);
try (TransactionalOnlineAttributeWriter.Transaction t = writer.transactional().begin()) {
t.update(Collections.singletonList(ka));
t.update(Collections.singletonList(ka));
Expand All @@ -215,8 +216,10 @@ public void testTransactionCreateUpdateCommitMultipleOutputs()
(succ, exc) -> {
assertTrue(succ);
assertNull(exc);
latch.countDown();
});
}
latch.await();
while (!view.get("key", status).isPresent()) {
// need to wait for the transformation
TimeUnit.MILLISECONDS.sleep(100);
Expand Down
1 change: 1 addition & 0 deletions direct/ingest-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies {
testImplementation project(path: ':proxima-scheme-proto-testing')
testImplementation project(path: ':proxima-direct-transaction-manager')
testImplementation libraries.slf4j_log4j
testImplementation libraries.log4j_core
testImplementation libraries.junit4
testImplementation libraries.mockito_core
compileAnnotationProcessor libraries.lombok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/** Test {@link IngestService} and {@link RetrieveService} with regards to transactions. */
/** Test {@link IngestService} and {@link RetrieveService} regarding transactions. */
@Slf4j
public class TransactionsTest {

private final Repository repo =
Expand Down Expand Up @@ -161,7 +163,7 @@ public void testTransactionReadWrite() {
assertEquals(3, getResponse.getValue().size());
}

@Test(timeout = 10000)
@Test(timeout = 20000)
public void testTransactionCommitRejected() {
// we need two simultaneous transactions
String firstTransaction = begin().getTransactionId();
Expand Down
2 changes: 1 addition & 1 deletion direct/ingest-server/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} %level %c{1}:%L - %msg%n

# Root logger option
rootLogger.level = ${sys:LOG_LEVEL:-INFO}
rootLogger.level = ${env:LOG_LEVEL:-INFO}
rootLogger.appenderRef.stdout.ref = STDOUT
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,8 @@ public void waitUntilReady() throws InterruptedException {
}

private OffsetCommitter<TopicPartition> createOffsetCommitter() {
return new OffsetCommitter<>(accessor.getLogStaleCommitIntervalMs(), accessor.getAutoCommitIntervalMs());
return new OffsetCommitter<>(
accessor.getLogStaleCommitIntervalMs(), accessor.getAutoCommitIntervalMs());
}

// create rebalance listener from consumer
Expand Down
Loading

0 comments on commit 4b55809

Please sign in to comment.