Skip to content

Commit

Permalink
Merge pull request #925: Bulk scanning
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik authored Jul 18, 2024
2 parents 5426299 + 1c6e7d4 commit c54eaf3
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import cz.o2.proxima.direct.server.rpc.proto.service.RetrieveServiceGrpc;
import cz.o2.proxima.direct.server.rpc.proto.service.RetrieveServiceGrpc.RetrieveServiceBlockingStub;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.KeyValue;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.ScanResult;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -339,10 +340,12 @@ public void testScan() {
List<ScanResult> res =
Collections.singletonList(
ScanResult.newBuilder()
.setKey("key")
.setAttribute("attribute")
.setStamp(now)
.setValue(ByteString.copyFrom(new byte[] {1}))
.addValue(
KeyValue.newBuilder()
.setKey("key")
.setAttribute("attribute")
.setStamp(now)
.setValue(ByteString.copyFrom(new byte[] {1})))
.build());
return res.iterator();
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,27 +403,41 @@ public void scan(ScanRequest request, StreamObserver<ScanResult> responseObserve
request.getAttributeList());
DirectAttributeFamilyDescriptor family = Iterables.getOnlyElement(families);
BatchLogReader reader = Optionals.get(family.getBatchReader());
ScanResult.Builder builder = ScanResult.newBuilder();
AtomicInteger serializedEstimate = new AtomicInteger();
reader.observe(
reader.getPartitions(),
attributes,
new BatchLogObserver() {
@Override
public boolean onNext(StreamElement element) {
if (!element.isDelete()) {
ScanResult result =
ScanResult.newBuilder()
builder.addValue(
Rpc.KeyValue.newBuilder()
.setAttribute(element.getAttribute())
.setKey(element.getKey())
.setValue(ByteString.copyFrom(element.getValue()))
.setStamp(element.getStamp())
.build();
responseObserver.onNext(result);
.setStamp(element.getStamp()));
int current =
serializedEstimate.addAndGet(
element.getValue().length
+ element.getAttribute().length()
+ element.getKey().length()
+ 8);
if (current > 65535) {
responseObserver.onNext(builder.build());
builder.clear();
serializedEstimate.set(0);
}
}
return true;
}

@Override
public void onCompleted() {
if (builder.getValueCount() > 0) {
responseObserver.onNext(builder.build());
}
ExceptionUtils.unchecked(() -> result.put(Optional.empty()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.GetRequest;
import cz.o2.proxima.direct.server.rpc.proto.service.Rpc.ScanResult;
import cz.o2.proxima.direct.server.test.Test.ExtendedMessage;
import cz.o2.proxima.direct.server.transaction.TransactionContext;
import cz.o2.proxima.typesafe.config.ConfigFactory;
Expand Down Expand Up @@ -1329,9 +1330,17 @@ public void onCompleted() {

@Test
public void testScanValid() {
testScanElements(100);
}

@Test(timeout = 30_000)
public void testScanValidLarge() {
testScanElements(10000);
}

private void testScanElements(int numElements) {
EntityDescriptor entity = server.repo.getEntity("dummy");
AttributeDescriptor<?> attribute = entity.getAttribute("wildcard.*");
int numElements = 100;
OnlineAttributeWriter writer = Optionals.get(server.direct.getWriter(attribute));
long now = System.currentTimeMillis();
for (int i = 0; i < numElements; i++) {
Expand Down Expand Up @@ -1371,13 +1380,17 @@ public void onCompleted() {

retrieve.scan(request, responseObserver);

int numScanned = responses.stream().mapToInt(ScanResult::getValueCount).sum();
assertEquals(numElements, numScanned);

assertTrue(finished.get());
Rpc.ScanResult response = responses.get(0);
assertEquals("wildcard.0", response.getAttribute());
assertEquals("key", response.getKey());
assertEquals(now, response.getStamp());
assertArrayEquals(new byte[] {1, 2, 3}, response.getValue().toByteArray());
assertEquals(numElements, responses.size());
assertTrue(response.getValueCount() > 1);
Rpc.KeyValue kv = response.getValue(0);
assertEquals("wildcard.0", kv.getAttribute());
assertEquals("key", kv.getKey());
assertEquals(now, kv.getStamp());
assertArrayEquals(new byte[] {1, 2, 3}, kv.getValue().toByteArray());
}

@Test
Expand Down
13 changes: 11 additions & 2 deletions rpc/src/main/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,21 @@ message ScanRequest {

}

message ScanResult {

message KeyValue {
string key = 1;
string attribute = 2;
bytes value = 3;
uint64 stamp = 4;
}

message ScanResult {

reserved 1;
reserved 2;
reserved 3;
reserved 4;

repeated KeyValue value = 5;

}

Expand Down

0 comments on commit c54eaf3

Please sign in to comment.