Skip to content

Commit

Permalink
Merge pull request #887: [proxima-direct-core] Optimize transaction p…
Browse files Browse the repository at this point in the history
…rocessing
  • Loading branch information
je-ik authored Mar 12, 2024
2 parents f985941 + d1a88be commit f5f6bb3
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package cz.o2.proxima.core.transaction;

import cz.o2.proxima.core.annotations.Internal;
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -58,6 +59,12 @@ public Request(
this.outputAttributes = outputAttributes == null ? Collections.emptyList() : outputAttributes;
this.flags = flags;
this.responsePartitionId = responsePartitionId;

Preconditions.checkArgument(
(flags != Flags.UPDATE && flags != Flags.OPEN)
|| (!this.inputAttributes.isEmpty() || !this.outputAttributes.isEmpty()),
"At least one of input or output attributes have to be non-empty wth flags %s.",
flags);
}

public Request withResponsePartitionId(int responsePartitionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class TransactionPartitionerTest {

@Test
public void testPartitioner() {
Request r = Request.builder().flags(Flags.OPEN).responsePartitionId(1).build();
Request r = Request.builder().flags(Flags.COMMIT).responsePartitionId(1).build();
StreamElement responseUpsert =
response.upsert(
"t", "commit", System.currentTimeMillis(), Response.forRequest(r).committed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public CachedView getOrCreateStateView() {
class CachedTransaction implements AutoCloseable {

@Getter final String transactionId;
@Getter final long created = System.currentTimeMillis();
long touched = System.currentTimeMillis();
final Map<AttributeDescriptor<?>, DirectAttributeFamilyDescriptor> attributeToFamily =
new HashMap<>();
final Map<String, CompletableFuture<Response>> requestFutures = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -314,6 +314,14 @@ public State getState() {
.map(KeyValue::getParsedRequired)
.orElse(State.empty());
}

public void touch(long stamp) {
this.touched = stamp;
}

public long getStamp() {
return touched;
}
}

private static class HandleWithAssignment {
Expand Down Expand Up @@ -440,7 +448,7 @@ public void houseKeeping() {
long now = System.currentTimeMillis();
long releaseTime = now - cleanupIntervalMs;
openTransactionMap.entrySet().stream()
.filter(e -> e.getValue().getCreated() < releaseTime)
.filter(e -> e.getValue().getStamp() < releaseTime)
.map(Map.Entry::getKey)
.collect(Collectors.toList())
.forEach(this::release);
Expand Down Expand Up @@ -505,7 +513,8 @@ public void runObservations(
k -> {
CachedView view = Optionals.get(stateFamily.getCachedView());
Duration ttl = Duration.ofMillis(cleanupIntervalMs);
view.assign(view.getPartitions(), updateConsumer, ttl);
view.assign(
view.getPartitions(), createTransactionUpdateConsumer(updateConsumer), ttl);
return view;
});
initializedLatch.countDown();
Expand All @@ -520,6 +529,20 @@ public void runObservations(
ExceptionUtils.unchecked(initializedLatch::await);
}

private BiConsumer<StreamElement, Pair<Long, Object>> createTransactionUpdateConsumer(
BiConsumer<StreamElement, Pair<Long, Object>> delegate) {

return (element, cached) -> {
if (element.getAttributeDescriptor().equals(getStateDesc())) {
Optional<State> state = getStateDesc().valueOf(element);
if (state.isPresent()) {
getOrCreateCachedTransaction(element.getKey(), state.get()).touch(element.getStamp());
}
}
delegate.accept(element, cached);
};
}

@VisibleForTesting
static boolean isNotThreadSafe(CommitLogObserver requestObserver) {
return requestObserver.getClass().getDeclaredAnnotation(DeclaredThreadSafe.class) == null;
Expand Down Expand Up @@ -682,18 +705,18 @@ private synchronized CachedWriters getCachedAccessors(DirectAttributeFamilyDescr
}

/**
* Initialize (possibly) new transaction. If the transaction already existed prior to this call,
* its current state is returned, otherwise the transaction is opened.
* Initialize new transaction. If the transaction already existed prior to this call, the state is
* updated accordingly.
*
* @param transactionId ID of transaction
* @param attributes attributes affected by this transaction (both input and output)
*/
@Override
public CompletableFuture<Response> begin(String transactionId, List<KeyAttribute> attributes) {
CachedTransaction cachedTransaction =
openTransactionMap.computeIfAbsent(
transactionId, k -> new CachedTransaction(transactionId, attributes));
return cachedTransaction.open(attributes);
// reopen transaction including expiration stamp
CachedTransaction transaction = new CachedTransaction(transactionId, attributes);
openTransactionMap.put(transactionId, transaction);
return transaction.open(attributes);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,80 @@ public void testTransactionRequestUpdate() throws InterruptedException {
}
}

@Test(timeout = 10000)
public void testTransactionRequestOpenAfterAbort() throws InterruptedException {
try (TransactionResourceManager manager = TransactionResourceManager.create(direct)) {
String transactionId = UUID.randomUUID().toString();
BlockingQueue<Response> receivedResponses = new ArrayBlockingQueue<>(5);

// create a simple ping-pong communication
runObservations(
manager,
"requests",
(ingest, context) -> {
if (ingest.getAttributeDescriptor().equals(requestDesc)) {
String key = ingest.getKey();
String requestId = requestDesc.extractSuffix(ingest.getAttribute());
Request request = Optionals.get(requestDesc.valueOf(ingest));
CountDownLatch latch = new CountDownLatch(1);
CommitCallback commit =
(succ, exc) -> {
latch.countDown();
context.commit(succ, exc);
};
long stamp = System.currentTimeMillis();
if (request.getFlags() == Request.Flags.UPDATE) {
manager.writeResponseAndUpdateState(
key,
State.open(1L, stamp, Collections.emptyList()).aborted(),
requestId,
Response.forRequest(request).aborted(),
commit);
} else {
manager.writeResponseAndUpdateState(
key,
State.open(1L, stamp, new HashSet<>(request.getInputAttributes())),
requestId,
Response.forRequest(request).open(1L, stamp),
commit);
}
ExceptionUtils.ignoringInterrupted(latch::await);
} else {
context.confirm();
}
return true;
});

manager
.begin(
transactionId,
Collections.singletonList(
KeyAttributes.ofAttributeDescriptor(gateway, "gw1", status, 1L)))
.whenComplete((response, err) -> receivedResponses.add(response));

receivedResponses.take();
manager
.updateTransaction(
transactionId,
Collections.singletonList(
KeyAttributes.ofAttributeDescriptor(gateway, "gw2", status, 1L)))
.whenComplete((response, err) -> receivedResponses.add(response));

Response response = receivedResponses.take();
assertEquals(Response.Flags.ABORTED, response.getFlags());

manager
.begin(
transactionId,
Collections.singletonList(
KeyAttributes.ofAttributeDescriptor(gateway, "gw1", status, 1L)))
.whenComplete((r, err) -> receivedResponses.add(r));

response = receivedResponses.take();
assertEquals(Response.Flags.OPEN, response.getFlags());
}
}

@Test(timeout = 10000)
public void testCreateCachedTransactionWhenMissing() {
KeyAttribute ka = KeyAttributes.ofAttributeDescriptor(gateway, "g", status, 1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import cz.o2.proxima.core.transaction.KeyAttribute;
import cz.o2.proxima.core.transaction.KeyAttributes;
import cz.o2.proxima.core.transaction.Request;
import cz.o2.proxima.core.transaction.Request.Flags;
import cz.o2.proxima.core.transaction.Response;
import cz.o2.proxima.core.transaction.State;
import cz.o2.proxima.core.util.Optionals;
Expand Down Expand Up @@ -144,10 +145,9 @@ public void testTransactionSchemeProvider() {
"t",
request,
Arrays.asList(
request.upsert(1L, "t", "1", System.currentTimeMillis(), newRequest(Flags.COMMIT)),
request.upsert(
1L, "t", "1", System.currentTimeMillis(), newRequest(Request.Flags.OPEN)),
request.upsert(
2L, "t", "1", System.currentTimeMillis(), newRequest(Request.Flags.OPEN))));
2L, "t", "1", System.currentTimeMillis(), newRequest(Flags.COMMIT))));

assertTrue(request.getValueSerializer() instanceof TransactionProtoSerializer);
assertTrue(request.getValueSerializer().isUsable());
Expand Down

0 comments on commit f5f6bb3

Please sign in to comment.