Skip to content

Commit

Permalink
Merge pull request #884: [proxima-direct-transaction-manager] #326 ad…
Browse files Browse the repository at this point in the history
…d TransactionMonitoringPolicy
  • Loading branch information
je-ik authored Mar 5, 2024
2 parents 43d56bd + 52765de commit c8be78e
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public long apply() {
interface ServerTransactionConfig extends TransactionConfig {
/** Get initial sequential ID policy. */
InitialSequenceIdPolicy getInitialSeqIdPolicy();

TransactionMonitoringPolicy getTransactionMonitoringPolicy();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2017-2024 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.direct.core.transaction;

import cz.o2.proxima.core.transaction.Request;
import cz.o2.proxima.core.transaction.Response;
import cz.o2.proxima.core.transaction.State;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

/**
* Policy that can be specified for monitoring transactions during execution. Implementing classes
* should take care of performance. This class is required to be thread-safe.
*/
@ThreadSafe
public interface TransactionMonitoringPolicy {

static TransactionMonitoringPolicy nop() {
return new TransactionMonitoringPolicy() {
@Override
public void incomingRequest(
String transactionId, Request request, long timestamp, long watermark) {
// nop
}

@Override
public void stateUpdate(String transactionId, State currentState, @Nullable State newState) {
// nop
}

@Override
public void outgoingResponse(String transactionId, Response response) {
// nop
}
};
}

/**
* Called when new request regarding given transaction is received.
*
* @param transactionId ID of transaction
* @param request request to be processed
* @param timestamp timestamp of the request
* @param watermark watermark of all requests
*/
void incomingRequest(String transactionId, Request request, long timestamp, long watermark);

/**
* Called when transaction state is about to be updated.
*
* @param transactionId ID of transaction
* @param currentState the current state of the transaction
* @param newState the state to which the transaction will be transitioned
*/
void stateUpdate(String transactionId, State currentState, @Nullable State newState);

/**
* Called when a response to transaction client is about to be sent.
*
* @param transactionId ID of transaction
* @param response the response to be sent
*/
void outgoingResponse(String transactionId, Response response);
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public long getTransactionTimeoutMs() {
public InitialSequenceIdPolicy getInitialSeqIdPolicy() {
return initialSequenceIdPolicy;
}

@Override
public TransactionMonitoringPolicy getTransactionMonitoringPolicy() {
return transactionMonitoringPolicy;
}
}

private class CachedWriters implements AutoCloseable {
Expand Down Expand Up @@ -358,6 +363,9 @@ public void withHandle(ObserveHandle handle) {
@Getter(AccessLevel.PACKAGE)
private final InitialSequenceIdPolicy initialSequenceIdPolicy;

@Getter(AccessLevel.PACKAGE)
private final TransactionMonitoringPolicy transactionMonitoringPolicy;

@VisibleForTesting
public TransactionResourceManager(DirectDataOperator direct, Map<String, Object> cfg) {
this.direct = direct;
Expand All @@ -369,6 +377,7 @@ public TransactionResourceManager(DirectDataOperator direct, Map<String, Object>
this.transactionTimeoutMs = getTransactionTimeout(cfg);
this.cleanupIntervalMs = getCleanupInterval(cfg);
this.initialSequenceIdPolicy = getInitialSequenceIdPolicy(cfg);
this.transactionMonitoringPolicy = getTransactionMonitoringPolicy(cfg);

log.info(
"Created {} with transaction timeout {} ms",
Expand Down Expand Up @@ -402,6 +411,15 @@ private static InitialSequenceIdPolicy getInitialSequenceIdPolicy(Map<String, Ob
.orElse(new InitialSequenceIdPolicy.Default());
}

private static TransactionMonitoringPolicy getTransactionMonitoringPolicy(
Map<String, Object> cfg) {

return Optional.ofNullable(cfg.get("monitoring-policy"))
.map(Object::toString)
.map(c -> Classpath.newInstance(c, TransactionMonitoringPolicy.class))
.orElse(TransactionMonitoringPolicy.nop());
}

@Override
public void close() {
openTransactionMap.forEach((k, v) -> v.close());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.core.transaction.ServerTransactionManager;
import cz.o2.proxima.direct.core.transaction.TransactionMonitoringPolicy;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.com.google.common.collect.Iterators;
Expand Down Expand Up @@ -178,11 +179,14 @@ public void close() {
private final Map<KeyWithAttribute, Map<KeyWithAttribute, SeqIdWithTombstone>> updatesToWildcard =
new HashMap<>();

private final TransactionMonitoringPolicy monitoringPolicy;

public TransactionLogObserver(DirectDataOperator direct, Metrics metrics) {
this.direct = direct;
this.metrics = metrics;
this.unsynchronizedManager = getServerTransactionManager(direct);
this.manager = synchronizedManager(unsynchronizedManager);
this.monitoringPolicy = unsynchronizedManager.getCfg().getTransactionMonitoringPolicy();
sequenceId.set(unsynchronizedManager.getCfg().getInitialSeqIdPolicy().apply());
assertSingleton();
startHouseKeeping();
Expand Down Expand Up @@ -392,6 +396,8 @@ private void handleRequest(StreamElement element, OnNextContext context) {
String requestId = manager.getRequestDesc().extractSuffix(element.getAttribute());
Optional<Request> maybeRequest = manager.getRequestDesc().valueOf(element);
if (maybeRequest.isPresent()) {
monitoringPolicy.incomingRequest(
transactionId, maybeRequest.get(), element.getStamp(), context.getWatermark());
processTransactionRequest(
transactionId, requestId, maybeRequest.get(), context, element.getStamp());
} else {
Expand Down Expand Up @@ -431,6 +437,7 @@ private void processTransactionUpdateRequest(
State currentState = manager.getCurrentState(transactionId);
@Nullable State newState = transitionState(transactionId, currentState, request);
if (newState != null) {
monitoringPolicy.stateUpdate(transactionId, currentState, newState);
// we have successfully computed new state, produce response
log.info(
"Transaction {} transitioned to state {} from {}",
Expand All @@ -439,18 +446,18 @@ private void processTransactionUpdateRequest(
currentState.getFlags());
Response response = getResponseForNewState(request, currentState, newState);
manager.ensureTransactionOpen(transactionId, newState);
monitoringPolicy.outgoingResponse(transactionId, response);
manager.writeResponseAndUpdateState(
transactionId, newState, requestId, response, context::commit);
} else if (request.getFlags() == Request.Flags.OPEN
&& (currentState.getFlags() == State.Flags.OPEN
|| currentState.getFlags() == State.Flags.COMMITTED)) {

Response response = Response.forRequest(request).duplicate(currentState.getSequentialId());
monitoringPolicy.stateUpdate(transactionId, currentState, currentState);
monitoringPolicy.outgoingResponse(transactionId, response);
manager.writeResponseAndUpdateState(
transactionId,
currentState,
requestId,
Response.forRequest(request).duplicate(currentState.getSequentialId()),
context::commit);
transactionId, currentState, requestId, response, context::commit);
} else {
log.warn(
"Unexpected {} request for transaction {} seqId {} when the state is {}. "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2017-2024 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.direct.transaction.manager;

import cz.o2.proxima.core.transaction.Request;
import cz.o2.proxima.core.transaction.Response;
import cz.o2.proxima.core.transaction.State;
import cz.o2.proxima.core.util.Pair;
import cz.o2.proxima.direct.core.transaction.TransactionMonitoringPolicy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;

public class TestMonitoringPolicy implements TransactionMonitoringPolicy {

private static List<Request> requests = Collections.synchronizedList(new ArrayList<>());
private static List<Response> responses = Collections.synchronizedList(new ArrayList<>());
private static List<Pair<State, State>> states = Collections.synchronizedList(new ArrayList<>());

public static void clear() {
requests.clear();
responses.clear();
states.clear();
}

public static boolean isEmpty() {
return requests.isEmpty() || responses.isEmpty() || states.isEmpty();
}

@Override
public void incomingRequest(
String transactionId, Request request, long timestamp, long watermark) {

requests.add(request);
}

@Override
public void stateUpdate(String transactionId, State currentState, @Nullable State newState) {
states.add(Pair.of(currentState, newState));
}

@Override
public void outgoingResponse(String transactionId, Response response) {
responses.add(response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

import static org.junit.Assert.*;

import cz.o2.proxima.core.repository.AttributeDescriptor;
import com.google.common.collect.ImmutableMap;
import cz.o2.proxima.core.repository.EntityAwareAttributeDescriptor.Wildcard;
import cz.o2.proxima.core.repository.EntityDescriptor;
import cz.o2.proxima.core.repository.Repository;
import cz.o2.proxima.core.repository.config.ConfigUtils;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.transaction.KeyAttribute;
import cz.o2.proxima.core.transaction.KeyAttributes;
import cz.o2.proxima.core.transaction.Request;
import cz.o2.proxima.core.transaction.Response;
import cz.o2.proxima.core.transaction.State;
import cz.o2.proxima.core.util.TransformationRunner;
Expand Down Expand Up @@ -64,21 +63,18 @@ public class TransactionLogObserverTest {
private final Config conf =
ConfigUtils.withStorageReplacement(
ConfigFactory.defaultApplication()
.withFallback(
ConfigFactory.parseMap(
ImmutableMap.of(
"transactions.monitoring-policy", TestMonitoringPolicy.class.getName())))
.withFallback(ConfigFactory.load("test-transactions.conf"))
.resolve(),
transactionFamilies::contains,
name -> URI.create("kafka-test://broker/topic_" + name));
private final Repository repo = Repository.ofTest(conf);
private final DirectDataOperator direct = repo.getOrCreateOperator(DirectDataOperator.class);
private final EntityDescriptor gateway = repo.getEntity("gateway");
private final EntityDescriptor user = repo.getEntity("user");
private final AttributeDescriptor<byte[]> gatewayStatus = gateway.getAttribute("status");
private final Wildcard<byte[]> userGateways = Wildcard.of(user, user.getAttribute("gateway.*"));
private final EntityDescriptor transaction = repo.getEntity("_transaction");
private final Wildcard<Request> request =
Wildcard.of(transaction, transaction.getAttribute("request.*"));
private final Wildcard<Response> response =
Wildcard.of(transaction, transaction.getAttribute("response.*"));
private long now;
private TransactionLogObserver observer;
private Metrics metrics;
Expand Down Expand Up @@ -162,6 +158,14 @@ public void testCreateTransactionCommit() throws InterruptedException {
assertTrue(metrics.getTransactionsCommitted().getValue() > 0.0);
}

@Test(timeout = 10000)
public void testCreateTransactionCommitWithMonitoring() throws InterruptedException {
TestMonitoringPolicy.clear();
assertTrue(TestMonitoringPolicy.isEmpty());
testCreateTransactionCommit();
assertFalse(TestMonitoringPolicy.isEmpty());
}

@Test(timeout = 10000)
public void testCreateTransactionCommitWithConflictInInputs() throws InterruptedException {
createObserver();
Expand Down

0 comments on commit c8be78e

Please sign in to comment.