Skip to content

Commit

Permalink
Merge pull request #882: [proxima-direct-transaction-manager] optimiz…
Browse files Browse the repository at this point in the history
…e wildcard update performance
  • Loading branch information
je-ik authored Mar 4, 2024
2 parents c3efc65 + e298f38 commit a4e3b46
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 20 deletions.
1 change: 1 addition & 0 deletions direct/transaction-manager/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
testImplementation project(':proxima-scheme-proto')
testImplementation libraries.mockito_core
testImplementation libraries.slf4j_log4j
testImplementation libraries.log4j_core
testImplementation libraries.junit4
compileAnnotationProcessor libraries.lombok
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import cz.o2.proxima.core.repository.EntityAwareAttributeDescriptor.Wildcard;
import cz.o2.proxima.core.repository.EntityDescriptor;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.time.Watermarks;
import cz.o2.proxima.core.transaction.Commit;
import cz.o2.proxima.core.transaction.KeyAttribute;
import cz.o2.proxima.core.transaction.KeyAttributes;
Expand Down Expand Up @@ -63,6 +62,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -167,7 +167,7 @@ public void close() {
private final Object commitLock = new Object();

@GuardedBy("timerLock")
private PriorityQueue<Pair<Long, Runnable>> timers =
private final PriorityQueue<Pair<Long, Runnable>> timers =
new PriorityQueue<>(Comparator.comparing(Pair::getFirst));

@GuardedBy("lock")
Expand Down Expand Up @@ -432,6 +432,11 @@ private void processTransactionUpdateRequest(
@Nullable State newState = transitionState(transactionId, currentState, request);
if (newState != null) {
// we have successfully computed new state, produce response
log.info(
"Transaction {} transitioned to state {} from {}",
transactionId,
newState.getFlags(),
currentState.getFlags());
Response response = getResponseForNewState(request, currentState, newState);
manager.ensureTransactionOpen(transactionId, newState);
manager.writeResponseAndUpdateState(
Expand Down Expand Up @@ -464,7 +469,7 @@ private void processTransactionUpdateRequest(

private void abortTransaction(String transactionId, State state) {
long seqId = state.getSequentialId();
// we need to rollback all updates to lastUpdateSeqId with the same seqId
// we need to roll back all updates to lastUpdateSeqId with the same seqId
try (var l = Locker.of(this.lock.writeLock())) {
state.getCommittedAttributes().stream()
.map(KeyWithAttribute::ofWildcard)
Expand Down Expand Up @@ -536,31 +541,63 @@ private State transitionToAborted(String transactionId, State state) {
}

private State transitionToUpdated(State currentState, Request request) {
if (!isCompatibleUpdate(currentState.getInputAttributes(), request.getInputAttributes())) {
if (!isCompatibleWildcardUpdate(currentState, request.getInputAttributes())) {
metrics.getTransactionsRejected().increment();
return currentState.aborted();
}
metrics.getTransactionsUpdated().increment();
return currentState.update(request.getInputAttributes());
}

private boolean isCompatibleUpdate(
Collection<KeyAttribute> inputs, Collection<KeyAttribute> additionalAttributes) {
private static Map<String, Set<KeyAttribute>> getPartitionedWildcards(
Stream<KeyAttribute> attributes) {

List<KeyAttribute> wildcards =
inputs.stream()
.filter(ka -> ka.getSequenceId() < Watermarks.MAX_WATERMARK)
.filter(ka -> ka.isWildcardQuery() || ka.getAttributeDescriptor().isWildcard())
.collect(Collectors.toList());
return additionalAttributes.stream()
return attributes
.filter(ka -> ka.isWildcardQuery() || ka.getAttributeDescriptor().isWildcard())
.collect(
Collectors.groupingBy(TransactionLogObserver::getWildcardQueryId, Collectors.toSet()));
}

@Internal
private static String getWildcardQueryId(KeyAttribute ka) {
return ka.getKey()
+ "/"
+ ka.getEntity().getName()
+ ":"
+ ka.getAttributeDescriptor().getName();
}

private boolean isCompatibleWildcardUpdate(
State state, Collection<KeyAttribute> additionalAttributes) {

Map<String, Set<KeyAttribute>> existingWildcards =
getPartitionedWildcards(state.getInputAttributes().stream());
Map<String, Set<KeyAttribute>> updatesPartitioned =
getPartitionedWildcards(additionalAttributes.stream());

return updatesPartitioned.entrySet().stream()
.allMatch(
ka -> {
// verify we didn't have any prior KeyAttributes matching the query
// or that this KeyAttribute does not contain any earlier versions of wildcard
KeyWithAttribute kwa = KeyWithAttribute.ofWildcard(ka);
return wildcards.stream()
.noneMatch(inputKa -> KeyWithAttribute.ofWildcard(inputKa).equals(kwa));
e -> {
// either the wildcard has not been queried yet
// or both outcomes are the same
Set<KeyAttribute> currentKeyAttributes = existingWildcards.get(e.getKey());
Set<KeyAttribute> updatedKeyAttributes = e.getValue();
boolean currentContainQuery =
currentKeyAttributes != null
&& currentKeyAttributes.stream().anyMatch(KeyAttribute::isWildcardQuery);
boolean updatesContainQuery =
updatedKeyAttributes.stream().anyMatch(KeyAttribute::isWildcardQuery);

if (updatesContainQuery && currentKeyAttributes != null) {
// we either did not query the data previously, or we have the exact same results
if (!Sets.difference(currentKeyAttributes, updatedKeyAttributes).isEmpty()) {
return false;
}
}
if (currentContainQuery) {
return Sets.difference(updatedKeyAttributes, currentKeyAttributes).isEmpty();
}
return true;
});
}

Expand Down Expand Up @@ -599,7 +636,7 @@ static Iterable<KeyAttribute> concatInputsAndOutputs(
Preconditions.checkArgument(
!ka.isWildcardQuery(), "Got KeyAttribute %s, which is not allowed output.", ka);
KeyWithAttribute kwa = KeyWithAttribute.of(ka);
if (!ka.getAttributeSuffix().isPresent()) {
if (ka.getAttributeSuffix().isEmpty()) {
mapOfInputs.putIfAbsent(kwa, ka);
} else {
// we can add wildcard output only if the inputs do not contain wildcard query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,98 @@ public void testTransactionUpdateWithWildcardConflict2() throws InterruptedExcep
assertEquals(Response.Flags.ABORTED, response.getFlags());
}

@Test(timeout = 10000)
public void testTransactionUpdateWithWildcardConflict3() throws InterruptedException {
createObserver();
ClientTransactionManager clientManager = direct.getClientTransactionManager();
String transactionId = UUID.randomUUID().toString();
BlockingQueue<Response> responseQueue = new ArrayBlockingQueue<>(5);
// first read "userGateways.1"
clientManager
.begin(
transactionId,
Collections.singletonList(
KeyAttributes.ofAttributeDescriptor(user, "user", userGateways, 1L, "1")))
.thenAccept(responseQueue::add);
// discard this
responseQueue.take();
// then read "userGateways.2"
clientManager
.updateTransaction(
transactionId,
Collections.singletonList(
KeyAttributes.ofAttributeDescriptor(user, "user", userGateways, 1L, "2")))
.thenAccept(responseQueue::add);
Response response = responseQueue.take();
assertEquals(Response.Flags.UPDATED, response.getFlags());
}

@Test(timeout = 10000)
public void testTransactionUpdateWithWildcardConflict4() throws InterruptedException {
createObserver();
ClientTransactionManager clientManager = direct.getClientTransactionManager();
String transactionId = UUID.randomUUID().toString();
BlockingQueue<Response> responseQueue = new ArrayBlockingQueue<>(5);
// first read "userGateways.1"
clientManager
.begin(
transactionId,
Collections.singletonList(
KeyAttributes.ofAttributeDescriptor(user, "user", userGateways, 1L, "1")))
.thenAccept(responseQueue::add);
// discard this
responseQueue.take();
// then query "userGateways.*"
clientManager
.updateTransaction(
transactionId,
KeyAttributes.ofWildcardQueryElements(
user,
"user",
userGateways,
Arrays.asList(
// one update matches the previous query
userGateways.upsert(1L, "user", "1", now, new byte[] {}),
// one is added due to the query
userGateways.upsert(2L, "user", "2", now, new byte[] {}))))
.thenAccept(responseQueue::add);
Response response = responseQueue.take();
assertEquals(Response.Flags.UPDATED, response.getFlags());
}

@Test(timeout = 10000)
public void testTransactionUpdateWithWildcardConflict5() throws InterruptedException {
createObserver();
ClientTransactionManager clientManager = direct.getClientTransactionManager();
String transactionId = UUID.randomUUID().toString();
BlockingQueue<Response> responseQueue = new ArrayBlockingQueue<>(5);
// first read "userGateways.1"
clientManager
.begin(
transactionId,
Collections.singletonList(
KeyAttributes.ofAttributeDescriptor(user, "user", userGateways, 1L, "1")))
.thenAccept(responseQueue::add);
// discard this
responseQueue.take();
// then query "userGateways.*", but get different results
clientManager
.updateTransaction(
transactionId,
KeyAttributes.ofWildcardQueryElements(
user,
"user",
userGateways,
Arrays.asList(
// one update DOES NOT match the previous query
userGateways.upsert(3L, "user", "1", now, new byte[] {}),
// one is added due to the query
userGateways.upsert(2L, "user", "2", now, new byte[] {}))))
.thenAccept(responseQueue::add);
Response response = responseQueue.take();
assertEquals(Response.Flags.ABORTED, response.getFlags());
}

@Test(timeout = 10000)
public void testTransactionRollback() throws InterruptedException {
createObserver();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} %level %c{1}:%L - %msg%n

# Root logger option
rootLogger.level = ${sys:LOG_LEVEL:-INFO}
rootLogger.level = ${env:LOG_LEVEL:-INFO}
rootLogger.appenderRef.stdout.ref = STDOUT

0 comments on commit a4e3b46

Please sign in to comment.