Skip to content

Commit

Permalink
[proxima-direct-ingest-server] fix single write ingest transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Jan 21, 2025
1 parent 5bccd7a commit 69cce1d
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,14 +327,24 @@ private void processSingleIngest(Ingest request, Consumer<Status> consumer) {
if (log.isDebugEnabled()) {
log.debug("Processing input ingest {}", TextFormat.shortDebugString(request));
}
Metrics.INGESTS.increment();
try {
if (!writeRequest(request, consumer)) {
Metrics.INVALID_REQUEST.increment();
if (request.getTransactionId().isEmpty()) {
Metrics.INGESTS.increment();
try {
if (!writeRequest(request, consumer)) {
Metrics.INVALID_REQUEST.increment();
}
} catch (Exception err) {
log.error("Error processing user request {}", TextFormat.shortDebugString(request), err);
consumer.accept(status(request.getUuid(), 500, err.getMessage()));
}
} else {
@Nullable StreamElement streamElement = validateAndConvertToStreamElement(request, consumer);
if (streamElement != null) {
transactionContext
.get(request.getTransactionId())
.addOutputs(Collections.singletonList(streamElement));
consumer.accept(ok(request.getUuid()));
}
} catch (Exception err) {
log.error("Error processing user request {}", TextFormat.shortDebugString(request), err);
consumer.accept(status(request.getUuid(), 500, err.getMessage()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void begin(
Rpc.BeginTransactionRequest request,
StreamObserver<Rpc.BeginTransactionResponse> responseObserver) {

String transactionId = transactionContext.create(request.getTranscationId());
String transactionId = transactionContext.create(request.getTransactionId());
responseObserver.onNext(
Rpc.BeginTransactionResponse.newBuilder().setTransactionId(transactionId).build());
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import cz.o2.proxima.direct.core.transaction.ServerTransactionManager;
import cz.o2.proxima.direct.server.IngestService;
import cz.o2.proxima.direct.server.RetrieveService;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.BeginTransactionRequest;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.BeginTransactionResponse;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.GetRequest;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.GetResponse;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.Ingest;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.IngestBulk;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.Status;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.StatusBulk;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.TransactionCommitRequest;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.TransactionCommitResponse;
Expand All @@ -51,6 +53,8 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -168,6 +172,53 @@ public void testTransactionReadWrite() throws InterruptedException {
assertEquals(3, getResponse.getValue().size());
}

@Test(timeout = 10000)
public void testTransactionReadWriteSingle() throws InterruptedException {
BeginTransactionResponse response = begin();
String transactionId = response.getTransactionId();
assertFalse(transactionId.isEmpty());
long stamp = System.currentTimeMillis();
replicatedLatch = new CountDownLatch(2);
Rpc.Status status =
ingest(
transactionId,
StreamElement.upsert(
gateway,
gatewayUsers,
UUID.randomUUID().toString(),
"gw1",
gatewayUsers.toAttributePrefix() + "usr1",
stamp,
new byte[] {1, 2, 3}));

assertEquals(200, status.getStatus());

// written, but not yet committed
GetResponse getResponse =
get(
GetRequest.newBuilder()
.setEntity("gateway")
.setKey("gw1")
.setAttribute("user.usr1")
.build());
assertEquals(404, getResponse.getStatus());

TransactionCommitResponse commitResponse = commit(transactionId);
assertEquals(TransactionCommitResponse.Status.COMMITTED, commitResponse.getStatus());
replicatedLatch.await();

// verify we can read the results
getResponse =
get(
GetRequest.newBuilder()
.setEntity("gateway")
.setKey("gw1")
.setAttribute("user.usr1")
.build());
assertEquals(200, getResponse.getStatus());
assertEquals(3, getResponse.getValue().size());
}

@Test(timeout = 20000)
public void testTransactionCommitRejected() throws InterruptedException {
// we need two simultaneous transactions
Expand Down Expand Up @@ -336,12 +387,36 @@ private BeginTransactionResponse begin(@Nullable String transactionId) {
ListStreamObserver<BeginTransactionResponse> beginObserver = intoList();
retrieve.begin(
BeginTransactionRequest.newBuilder()
.setTranscationId(MoreObjects.firstNonNull(transactionId, ""))
.setTransactionId(MoreObjects.firstNonNull(transactionId, ""))
.build(),
beginObserver);
return Iterables.getOnlyElement(beginObserver.getOutputs());
}

private Rpc.Status ingest(String transactionId, StreamElement update)
throws InterruptedException {

BlockingQueue<Status> status = new ArrayBlockingQueue<>(1);
StreamObserver<Rpc.Status> res =
new StreamObserver<>() {
@Override
public void onNext(Rpc.Status s) {
ExceptionUtils.unchecked(() -> status.put(s));
}

@Override
public void onError(Throwable throwable) {
log.error("Error", throwable);
throw new IllegalStateException(throwable);
}

@Override
public void onCompleted() {}
};
ingest.ingest(asIngest(transactionId, update), res);
return status.take();
}

private StatusBulk ingestBulk(String transactionId, StreamElement... updates) {
return ingestBulk(transactionId, Arrays.asList(updates));
}
Expand All @@ -353,23 +428,25 @@ private StatusBulk ingestBulk(String transactionId, List<StreamElement> updates)
IngestBulk.newBuilder()
.addAllIngest(
updates.stream()
.map(
el ->
Ingest.newBuilder()
.setUuid(el.getUuid())
.setTransactionId(transactionId)
.setEntity(el.getEntityDescriptor().getName())
.setKey(el.getKey())
.setAttribute(el.getAttribute())
.setStamp(el.getStamp())
.setValue(ByteString.copyFrom(el.getValue()))
.build())
.map(el -> asIngest(transactionId, el))
.collect(Collectors.toList()))
.build());
ingestBulk.onCompleted();
return Iterables.getOnlyElement(ingestObserver.getOutputs());
}

private static Ingest asIngest(String transactionId, StreamElement el) {
return Ingest.newBuilder()
.setUuid(el.getUuid())
.setTransactionId(transactionId)
.setEntity(el.getEntityDescriptor().getName())
.setKey(el.getKey())
.setAttribute(el.getAttribute())
.setStamp(el.getStamp())
.setValue(ByteString.copyFrom(el.getValue()))
.build();
}

private static <T> ListStreamObserver<T> intoList() {
return new ListStreamObserver<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ public void testCreateTransactionCommitWithConflictInOutputs() throws Interrupte
Collections.singletonList(
userGateways.upsert(
t2OpenResponse.getSeqId(), "user", "1", t1OpenResponse.getStamp(), new byte[] {}));
// cannot overwrite higher sequentialId with lower one
clientManager.commit(t2, t2Outputs).thenAccept(responseQueue::add);
Response response = responseQueue.take();
assertEquals(Response.Flags.COMMITTED, response.getFlags());
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/main/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ message BeginTransactionRequest {
repeated KeyAttribute attributesInvolved = 1;

/** An optional transaction ID. When omitted a new transaction ID will be generated. */
string transcationId = 2;
string transactionId = 2;

}

Expand Down

0 comments on commit 69cce1d

Please sign in to comment.