Skip to content

Commit

Permalink
Merge pull request #886: Transaction duplicate
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik authored Mar 6, 2024
2 parents 573e8dc + b2f8960 commit f985941
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -435,46 +435,26 @@ private void processTransactionUpdateRequest(
log.debug(
"Processing request to {} with {} for transaction {}", requestId, request, transactionId);
State currentState = manager.getCurrentState(transactionId);
@Nullable State newState = transitionState(transactionId, currentState, request);
if (newState != null) {
monitoringPolicy.stateUpdate(transactionId, currentState, newState);
// we have successfully computed new state, produce response
log.info(
"Transaction {} transitioned to state {} from {}",
transactionId,
newState.getFlags(),
currentState.getFlags());
Response response = getResponseForNewState(request, currentState, newState);
manager.ensureTransactionOpen(transactionId, newState);
monitoringPolicy.outgoingResponse(transactionId, response);
manager.writeResponseAndUpdateState(
transactionId, newState, requestId, response, context::commit);
} else if (request.getFlags() == Request.Flags.OPEN
&& (currentState.getFlags() == State.Flags.OPEN
|| currentState.getFlags() == State.Flags.COMMITTED)) {

Response response = Response.forRequest(request).duplicate(currentState.getSequentialId());
monitoringPolicy.stateUpdate(transactionId, currentState, currentState);
monitoringPolicy.outgoingResponse(transactionId, response);
manager.writeResponseAndUpdateState(
transactionId, currentState, requestId, response, context::commit);
} else {
log.warn(
"Unexpected {} request for transaction {} seqId {} when the state is {}. "
+ "Refusing to respond, because the correct response is unknown.",
request.getFlags(),
transactionId,
currentState.getSequentialId(),
currentState.getFlags());
context.confirm();
}
State newState = transitionState(transactionId, currentState, request);
monitoringPolicy.stateUpdate(transactionId, currentState, newState);
// we have successfully computed new state, produce response
log.info(
"Transaction {} transitioned to state {} from {}",
transactionId,
newState.getFlags(),
currentState.getFlags());
Response response = getResponseForNewState(request, currentState, newState);
manager.ensureTransactionOpen(transactionId, newState);
monitoringPolicy.outgoingResponse(transactionId, response);
manager.writeResponseAndUpdateState(
transactionId, newState, requestId, response, context::commit);
} catch (Throwable err) {
log.warn("Error during processing transaction {} request {}", transactionId, request, err);
context.commit(false, err);
}
}

private void abortTransaction(String transactionId, State state) {
private void abortTransaction(State state) {
long seqId = state.getSequentialId();
// we need to roll back all updates to lastUpdateSeqId with the same seqId
try (var l = Locker.of(this.lock.writeLock())) {
Expand Down Expand Up @@ -502,6 +482,9 @@ private Response getResponseForNewState(Request request, State oldState, State s
? Response.forRequest(request).open(state.getSequentialId(), state.getStamp())
: Response.forRequest(request).updated();
case COMMITTED:
if (request.getFlags() == Request.Flags.OPEN) {
return Response.forRequest(request).duplicate(state.getSequentialId());
}
return Response.forRequest(request).committed();
case ABORTED:
return Response.forRequest(request).aborted();
Expand All @@ -510,7 +493,6 @@ private Response getResponseForNewState(Request request, State oldState, State s
}

@VisibleForTesting
@Nullable
State transitionState(String transactionId, State currentState, Request request) {
switch (currentState.getFlags()) {
case UNKNOWN:
Expand All @@ -525,27 +507,30 @@ State transitionState(String transactionId, State currentState, Request request)
case UPDATE:
return transitionToUpdated(currentState, request);
case ROLLBACK:
abortTransaction(transactionId, currentState);
abortTransaction(currentState);
return currentState.aborted();
}
break;
case COMMITTED:
if (request.getFlags() == Request.Flags.ROLLBACK) {
return transitionToAborted(transactionId, currentState);
}
if (request.getFlags() == Request.Flags.OPEN) {
return currentState;
}
break;
case ABORTED:
if (request.getFlags() == Flags.OPEN) {
return transitionToOpen(transactionId, request);
}
return currentState;
}
return null;
return currentState.aborted();
}

private State transitionToAborted(String transactionId, State state) {
log.info("Transaction {} seqId {} rolled back", transactionId, state.getSequentialId());
abortTransaction(transactionId, state);
abortTransaction(state);
metrics.getTransactionsRolledBack().increment();
return state.aborted();
}
Expand Down Expand Up @@ -777,7 +762,7 @@ private void stateUpdate(StreamElement newUpdate, Pair<Long, Object> oldValue) {
if (state.getFlags() == State.Flags.COMMITTED) {
transactionPostCommit(state);
} else if (state.getFlags() == State.Flags.ABORTED) {
abortTransaction(newUpdate.getKey(), state);
abortTransaction(state);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ public void testCreateTransactionDuplicate() throws InterruptedException {
KeyAttributes.ofAttributeDescriptor(user, "user", userGateways, 1L, "1")))
.thenAccept(responseQueue::add);
Response response = responseQueue.take();
assertEquals(Response.Flags.DUPLICATE, response.getFlags());
assertEquals(firstResponse.getSeqId(), response.getSeqId());
assertEquals(Response.Flags.ABORTED, response.getFlags());
}

@Test(timeout = 10000)
Expand Down

0 comments on commit f985941

Please sign in to comment.