From 4b5580927058bd0bdf3f7ec7a5a84bbc448b555f Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Tue, 6 Feb 2024 14:05:29 +0100 Subject: [PATCH] [proxima-direct-transaction-manager] #324 wait for watermark before commit --- .../transaction/ThreadPooledObserver.java | 3 + .../TransactionResourceManager.java | 32 ++-- .../TransactionalOnlineAttributeWriter.java | 60 ++++--- .../direct/core/storage/InMemStorage.java | 2 +- ...ransactionalOnlineAttributeWriterTest.java | 3 + direct/ingest-server/build.gradle | 1 + .../server/transaction/TransactionsTest.java | 6 +- .../src/test/resources/log4j2.properties | 2 +- .../direct/io/kafka/KafkaLogReader.java | 3 +- .../manager/TransactionLogObserver.java | 146 ++++++++++++------ 10 files changed, 164 insertions(+), 94 deletions(-) diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/ThreadPooledObserver.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/ThreadPooledObserver.java index 3ecd474b7..2cc9b59ec 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/ThreadPooledObserver.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/ThreadPooledObserver.java @@ -102,6 +102,9 @@ private void exitThreads() { @Override public boolean onNext(StreamElement element, OnNextContext context) { + if (log.isDebugEnabled()) { + log.debug("Processing input element {} at watermark {}", element, context.getWatermark()); + } return !ExceptionUtils.ignoringInterrupted( () -> workQueues diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java index ad99b418a..8d97b921e 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionResourceManager.java @@ -51,8 +51,6 @@ import cz.o2.proxima.internal.com.google.common.base.Preconditions; import cz.o2.proxima.internal.com.google.common.collect.Lists; import cz.o2.proxima.internal.com.google.common.collect.Sets; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -65,6 +63,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -183,7 +182,8 @@ CompletableFuture open(List inputAttrs) { transactionId, attributeToFamily.get(responseDesc), (reqId, response) -> - Objects.requireNonNull(requestFutures.remove(reqId)).complete(response)); + Optional.ofNullable(requestFutures.remove(reqId)) + .ifPresent(c -> c.complete(response))); return sendRequest( Request.builder().flags(Flags.OPEN).inputAttributes(inputAttrs).build(), request) .thenCompose(ign -> res); @@ -333,6 +333,7 @@ public void withHandle(ObserveHandle handle) { @Getter private final Wildcard responseDesc; @Getter private final Regular stateDesc; @Getter private final Regular commitDesc; + private final Random random = new Random(); private final Map openTransactionMap = new ConcurrentHashMap<>(); private final Map cachedAccessors = new ConcurrentHashMap<>(); @@ -450,6 +451,7 @@ public void runObservations( getDeclaredParallelism(requestObserver) .orElse(Runtime.getRuntime().availableProcessors())); } + log.debug("Running transaction observation with observer {}", effectiveObserver); List> families = direct @@ -609,27 +611,17 @@ private void addTransactionResponseConsumer( k -> { log.debug("Starting to observe family {} with URI {}", k, k.getDesc().getStorageUri()); HandleWithAssignment assignment = new HandleWithAssignment(); + CommitLogReader reader = Optionals.get(k.getCommitLogReader()); + List partitions = reader.getPartitions(); + Partition partition = partitions.get(random.nextInt(partitions.size())); assignment.withHandle( - Optionals.get(k.getCommitLogReader()) - .observe( - newResponseObserverNameFor(k), newTransactionResponseObserver(assignment))); + reader.observePartitions( + Collections.singleton(partition), newTransactionResponseObserver(assignment))); ExceptionUtils.ignoringInterrupted(() -> assignment.getObserveHandle().waitUntilReady()); return assignment; }); } - protected String newResponseObserverNameFor(DirectAttributeFamilyDescriptor k) { - String localhost = "localhost"; - try { - localhost = InetAddress.getLocalHost().getHostAddress(); - } catch (UnknownHostException e) { - log.warn("Error getting name of localhost, using {} instead.", localhost, e); - } - return "transaction-response-observer-" - + k.getDesc().getName() - + (localhost.hashCode() & Integer.MAX_VALUE); - } - private CommitLogObserver newTransactionResponseObserver(HandleWithAssignment assignment) { Preconditions.checkArgument(assignment != null); return new CommitLogObserver() { @@ -814,7 +806,9 @@ private CachedTransaction getOrCreateCachedTransaction(String transactionId, Sta TransactionMode mode = attributes.get(0).getTransactionMode(); Preconditions.checkArgument( - attributes.stream().allMatch(a -> a.getTransactionMode() == mode), + attributes.stream() + .filter(a -> a.getTransactionMode() != TransactionMode.NONE) + .allMatch(a -> a.getTransactionMode() == mode), "All passed attributes must have the same transaction mode. Got attributes %s.", attributes); diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java index bf5baf68f..71529bb0e 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriter.java @@ -156,11 +156,17 @@ public static class TransactionRejectedException extends Exception { private final String transactionId; private final Response.Flags responseFlags; - protected TransactionRejectedException(String transactionId, Response.Flags flags) { + public TransactionRejectedException(String transactionId, Response.Flags flags) { + this(transactionId, flags, null); + } + + public TransactionRejectedException( + String transactionId, Response.Flags flags, Throwable cause) { super( String.format( "Transaction %s rejected with flags %s. Please restart the transaction.", - transactionId, flags)); + transactionId, flags), + cause); this.transactionId = transactionId; this.responseFlags = flags; } @@ -205,6 +211,8 @@ public void update(List addedInputs) { switch (state) { case UNKNOWN: future = manager.begin(transactionId, addedInputs); + // update to open, though technically will be open after receiving response + state = State.Flags.OPEN; break; case OPEN: future = manager.updateTransaction(transactionId, addedInputs); @@ -222,24 +230,29 @@ public void commitWrite(List outputs, CommitCallback callback) { // FIXME: this should be removed once we write through coordinator final CommitContext commitContext = new CommitContext(outputs); this.commitAttempted = true; - waitForInflight() + waitForInFlight() .thenCompose(ign -> runTransforms(commitContext)) // need to wait for any requests added during transforms - .thenCompose(ign -> waitForInflight()) + .thenCompose(ign -> waitForInFlight()) .thenCompose(ign -> sendCommitRequest(commitContext)) .thenCompose(response -> handleCommitResponse(response, commitContext)) + .completeOnTimeout( + false, manager.getCfg().getTransactionTimeoutMs(), TimeUnit.MILLISECONDS) .whenComplete( - (ign, err) -> { - manager.release(transactionId); - if (err instanceof CompletionException) { + (finished, err) -> { + if (err instanceof CompletionException && err.getCause() != null) { callback.commit(false, err.getCause()); - } else { + } else if (Boolean.TRUE.equals(finished)) { callback.commit(err == null, err); + } else { + // timeout + callback.commit( + false, new TransactionRejectedException(transactionId, Flags.ABORTED)); } }); } - private CompletableFuture waitForInflight() { + private CompletableFuture waitForInFlight() { final CompletableFuture unfinished; synchronized (runningUpdates) { unfinished = CompletableFuture.allOf(runningUpdates.toArray(new CompletableFuture[] {})); @@ -274,15 +287,12 @@ private CompletableFuture sendCommitRequest(CommitContext context) { Collection transformed = context.getTransformedElements(); List keyAttributes = transformed.stream().map(KeyAttributes::ofStreamElement).collect(Collectors.toList()); - return manager - .commit(transactionId, keyAttributes) - .completeOnTimeout( - Response.empty().aborted(), - manager.getCfg().getTransactionTimeoutMs(), - TimeUnit.MILLISECONDS); + return manager.commit(transactionId, keyAttributes); } - private CompletableFuture handleCommitResponse(Response response, CommitContext context) { + private CompletableFuture handleCommitResponse( + Response response, CommitContext context) { + if (response.getFlags() != Flags.COMMITTED) { if (response.getFlags() == Flags.ABORTED) { state = State.Flags.ABORTED; @@ -292,7 +302,7 @@ private CompletableFuture handleCommitResponse(Response response, CommitCo } Collection transformed = context.getTransformedElements(); StreamElement toWrite = context.getToWrite(); - CompletableFuture result = new CompletableFuture<>(); + CompletableFuture result = new CompletableFuture<>(); CommitCallback compositeCallback = (succ, exc) -> { if (!succ) { @@ -309,7 +319,8 @@ private CompletableFuture handleCommitResponse(Response response, CommitCo if (exc != null) { result.completeExceptionally(exc); } else { - result.complete(null); + // true = completed without timeout + result.complete(true); } }; state = State.Flags.COMMITTED; @@ -466,12 +477,12 @@ public void close() { } public CompletableFuture rollback() { - return manager.rollback(transactionId).thenAccept(ign -> manager.release(transactionId)); + return manager.rollback(transactionId).thenApply(r -> null); } public void sync() throws TransactionRejectedException { try { - waitForInflight().get(); + waitForInFlight().get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new TransactionRejectedException(getTransactionId(), Flags.ABORTED); @@ -479,10 +490,11 @@ public void sync() throws TransactionRejectedException { if (e.getCause() instanceof TransactionRejectedException) { throw (TransactionRejectedException) e.getCause(); } - TransactionRejectedException exc = - new TransactionRejectedException(getTransactionId(), Flags.ABORTED); - exc.addSuppressed(e); - throw exc; + if (e.getCause() != null) { + throw new TransactionRejectedException(getTransactionId(), Flags.ABORTED, e.getCause()); + } else { + throw new TransactionRejectedException(getTransactionId(), Flags.ABORTED, e); + } } } diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java index d290625ce..dddc06c03 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/storage/InMemStorage.java @@ -126,7 +126,7 @@ public class InMemStorage implements DataAccessorFactory { private static final Partition SINGLE_PARTITION = Partition.of(0); private static final OnNextContext CONTEXT = BatchLogObservers.defaultContext(SINGLE_PARTITION); private static final long IDLE_FLUSH_TIME = 500L; - private static final long BOUNDED_OUT_OF_ORDERNESS = 5000L; + private static final long BOUNDED_OUT_OF_ORDERNESS = 2000L; public static class ConsumedOffset implements Offset { diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriterTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriterTest.java index b83799bbf..b0ed2698f 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriterTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/transaction/TransactionalOnlineAttributeWriterTest.java @@ -191,6 +191,7 @@ public void testTransactionCreateUpdateCommitMultipleOutputs() toReturn.add(Response.forRequest(anyRequest()).updated()); toReturn.add(Response.forRequest(anyRequest()).committed()); KeyAttribute ka = KeyAttributes.ofAttributeDescriptor(gateway, "key", status, 1L); + CountDownLatch latch = new CountDownLatch(1); try (TransactionalOnlineAttributeWriter.Transaction t = writer.transactional().begin()) { t.update(Collections.singletonList(ka)); t.update(Collections.singletonList(ka)); @@ -215,8 +216,10 @@ public void testTransactionCreateUpdateCommitMultipleOutputs() (succ, exc) -> { assertTrue(succ); assertNull(exc); + latch.countDown(); }); } + latch.await(); while (!view.get("key", status).isPresent()) { // need to wait for the transformation TimeUnit.MILLISECONDS.sleep(100); diff --git a/direct/ingest-server/build.gradle b/direct/ingest-server/build.gradle index 73fedc6d0..c413427e1 100644 --- a/direct/ingest-server/build.gradle +++ b/direct/ingest-server/build.gradle @@ -35,6 +35,7 @@ dependencies { testImplementation project(path: ':proxima-scheme-proto-testing') testImplementation project(path: ':proxima-direct-transaction-manager') testImplementation libraries.slf4j_log4j + testImplementation libraries.log4j_core testImplementation libraries.junit4 testImplementation libraries.mockito_core compileAnnotationProcessor libraries.lombok diff --git a/direct/ingest-server/src/test/java/cz/o2/proxima/direct/server/transaction/TransactionsTest.java b/direct/ingest-server/src/test/java/cz/o2/proxima/direct/server/transaction/TransactionsTest.java index 23c67b2cb..d7080bede 100644 --- a/direct/ingest-server/src/test/java/cz/o2/proxima/direct/server/transaction/TransactionsTest.java +++ b/direct/ingest-server/src/test/java/cz/o2/proxima/direct/server/transaction/TransactionsTest.java @@ -56,11 +56,13 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Before; import org.junit.Test; -/** Test {@link IngestService} and {@link RetrieveService} with regards to transactions. */ +/** Test {@link IngestService} and {@link RetrieveService} regarding transactions. */ +@Slf4j public class TransactionsTest { private final Repository repo = @@ -161,7 +163,7 @@ public void testTransactionReadWrite() { assertEquals(3, getResponse.getValue().size()); } - @Test(timeout = 10000) + @Test(timeout = 20000) public void testTransactionCommitRejected() { // we need two simultaneous transactions String firstTransaction = begin().getTransactionId(); diff --git a/direct/ingest-server/src/test/resources/log4j2.properties b/direct/ingest-server/src/test/resources/log4j2.properties index 349eb8dd3..ea402d79d 100644 --- a/direct/ingest-server/src/test/resources/log4j2.properties +++ b/direct/ingest-server/src/test/resources/log4j2.properties @@ -21,5 +21,5 @@ appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} %level %c{1}:%L - %msg%n # Root logger option -rootLogger.level = ${sys:LOG_LEVEL:-INFO} +rootLogger.level = ${env:LOG_LEVEL:-INFO} rootLogger.appenderRef.stdout.ref = STDOUT diff --git a/direct/io-kafka/src/main/java/cz/o2/proxima/direct/io/kafka/KafkaLogReader.java b/direct/io-kafka/src/main/java/cz/o2/proxima/direct/io/kafka/KafkaLogReader.java index 4184c75ae..6cac79725 100644 --- a/direct/io-kafka/src/main/java/cz/o2/proxima/direct/io/kafka/KafkaLogReader.java +++ b/direct/io-kafka/src/main/java/cz/o2/proxima/direct/io/kafka/KafkaLogReader.java @@ -856,7 +856,8 @@ public void waitUntilReady() throws InterruptedException { } private OffsetCommitter createOffsetCommitter() { - return new OffsetCommitter<>(accessor.getLogStaleCommitIntervalMs(), accessor.getAutoCommitIntervalMs()); + return new OffsetCommitter<>( + accessor.getLogStaleCommitIntervalMs(), accessor.getAutoCommitIntervalMs()); } // create rebalance listener from consumer diff --git a/direct/transaction-manager/src/main/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserver.java b/direct/transaction-manager/src/main/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserver.java index 82b2f5b2f..8f8f30ce0 100644 --- a/direct/transaction-manager/src/main/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserver.java +++ b/direct/transaction-manager/src/main/java/cz/o2/proxima/direct/transaction/manager/TransactionLogObserver.java @@ -43,8 +43,10 @@ import cz.o2.proxima.internal.com.google.common.collect.Sets; import cz.o2.proxima.internal.com.google.common.collect.SortedSetMultimap; import cz.o2.proxima.internal.com.google.common.collect.Streams; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -52,6 +54,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.PriorityQueue; import java.util.Set; import java.util.SortedSet; import java.util.concurrent.TimeUnit; @@ -160,8 +163,13 @@ public void close() { private final ServerTransactionManager manager; private final AtomicLong sequenceId = new AtomicLong(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock timerLock = new ReentrantReadWriteLock(); private final Object commitLock = new Object(); + @GuardedBy("timerLock") + private PriorityQueue> timers = + new PriorityQueue<>(Comparator.comparing(Pair::getFirst)); + @GuardedBy("lock") private final SortedSetMultimap lastUpdateSeqId = MultimapBuilder.hashKeys().treeSetValues().build(); @@ -332,19 +340,41 @@ public boolean onNext(StreamElement element, OnNextContext context) { log.debug("Received element {} for transaction processing", element); Wildcard requestDesc = manager.getRequestDesc(); if (element.getAttributeDescriptor().equals(requestDesc)) { - handleRequest( - element.getKey(), - requestDesc.extractSuffix(element.getAttribute()), - context, - requestDesc.valueOf(element).orElse(null)); + handleRequest(element, context); } else { // unknown attribute, probably own response or state update, can be safely ignored log.debug("Unknown attribute {}. Ignored.", element.getAttributeDescriptor()); context.confirm(); } + handleWatermark(context.getWatermark()); return true; } + @Override + public void onIdle(OnIdleContext context) { + handleWatermark(context.getWatermark()); + } + + private void handleWatermark(long watermark) { + List process = new ArrayList<>(); + final boolean canFireTimer; + try (var read = Locker.of(timerLock.readLock())) { + if (log.isDebugEnabled()) { + log.debug("Processing watermark {} with {} timers", watermark, timers); + } + canFireTimer = !timers.isEmpty() && timers.peek().getFirst() < watermark; + } + if (canFireTimer) { + try (var writeLock = Locker.of(timerLock.writeLock())) { + while (!timers.isEmpty() && timers.peek().getFirst() < watermark) { + process.add(timers.poll().getSecond()); + } + } + // fire the timers outside the lock + process.forEach(Runnable::run); + } + } + @Override public boolean onError(Throwable error) { log.error("Fatal error processing transactions. Killing self.", error); @@ -357,14 +387,13 @@ void exit(int status) { System.exit(status); } - private void handleRequest( - String transactionId, - String requestId, - OnNextContext context, - @Nullable Request maybeRequest) { - - if (maybeRequest != null) { - processTransactionRequest(transactionId, requestId, maybeRequest, context); + private void handleRequest(StreamElement element, OnNextContext context) { + String transactionId = element.getKey(); + String requestId = manager.getRequestDesc().extractSuffix(element.getAttribute()); + Optional maybeRequest = manager.getRequestDesc().valueOf(element); + if (maybeRequest.isPresent()) { + processTransactionRequest( + transactionId, requestId, maybeRequest.get(), context, element.getStamp()); } else { log.error("Unable to parse request at offset {}", context.getOffset()); context.confirm(); @@ -372,37 +401,64 @@ private void handleRequest( } private void processTransactionRequest( + String transactionId, + String requestId, + Request request, + OnNextContext context, + long requestTimestamp) { + + if (request.getFlags() == Flags.COMMIT) { + // enqueue this for processing after watermark + try (var l = Locker.of(timerLock.writeLock())) { + timers.add( + Pair.of( + requestTimestamp, + () -> processTransactionUpdateRequest(transactionId, requestId, request, context))); + } + log.debug( + "Scheduled transaction {} commit for watermark {}", transactionId, requestTimestamp); + } else { + processTransactionUpdateRequest(transactionId, requestId, request, context); + } + } + + private void processTransactionUpdateRequest( String transactionId, String requestId, Request request, OnNextContext context) { - log.debug( - "Processing request to {} with {} for transaction {}", requestId, request, transactionId); - State currentState = manager.getCurrentState(transactionId); - @Nullable State newState = transitionState(transactionId, currentState, request); - if (newState != null) { - // we have successfully computed new state, produce response - Response response = getResponseForNewState(request, currentState, newState); - manager.ensureTransactionOpen(transactionId, newState); - manager.writeResponseAndUpdateState( - transactionId, newState, requestId, response, context::commit); - } else if (request.getFlags() == Request.Flags.OPEN - && (currentState.getFlags() == State.Flags.OPEN - || currentState.getFlags() == State.Flags.COMMITTED)) { - - manager.writeResponseAndUpdateState( - transactionId, - currentState, - requestId, - Response.forRequest(request).duplicate(currentState.getSequentialId()), - context::commit); - } else { - log.warn( - "Unexpected {} request for transaction {} seqId {} when the state is {}. " - + "Refusing to respond, because the correct response is unknown.", - request.getFlags(), - transactionId, - currentState.getSequentialId(), - currentState.getFlags()); - context.confirm(); + try { + log.debug( + "Processing request to {} with {} for transaction {}", requestId, request, transactionId); + State currentState = manager.getCurrentState(transactionId); + @Nullable State newState = transitionState(transactionId, currentState, request); + if (newState != null) { + // we have successfully computed new state, produce response + Response response = getResponseForNewState(request, currentState, newState); + manager.ensureTransactionOpen(transactionId, newState); + manager.writeResponseAndUpdateState( + transactionId, newState, requestId, response, context::commit); + } else if (request.getFlags() == Request.Flags.OPEN + && (currentState.getFlags() == State.Flags.OPEN + || currentState.getFlags() == State.Flags.COMMITTED)) { + + manager.writeResponseAndUpdateState( + transactionId, + currentState, + requestId, + Response.forRequest(request).duplicate(currentState.getSequentialId()), + context::commit); + } else { + log.warn( + "Unexpected {} request for transaction {} seqId {} when the state is {}. " + + "Refusing to respond, because the correct response is unknown.", + request.getFlags(), + transactionId, + currentState.getSequentialId(), + currentState.getFlags()); + context.confirm(); + } + } catch (Throwable err) { + log.warn("Error during processing transaction {} request {}", transactionId, request, err); + context.commit(false, err); } } @@ -466,10 +522,8 @@ State transitionState(String transactionId, State currentState, Request request) } break; case ABORTED: - if (request.getFlags() == Request.Flags.ROLLBACK) { - return currentState; - } - break; + // aborted always stays aborted + return currentState; } return null; }