diff --git a/core/src/main/java/com/scalar/db/storage/dynamo/QueryScanner.java b/core/src/main/java/com/scalar/db/storage/dynamo/QueryScanner.java index fcf5ec38d8..9a8170ab58 100644 --- a/core/src/main/java/com/scalar/db/storage/dynamo/QueryScanner.java +++ b/core/src/main/java/com/scalar/db/storage/dynamo/QueryScanner.java @@ -13,25 +13,34 @@ import java.util.Optional; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +@NotThreadSafe public class QueryScanner implements Scanner { private final PaginatedRequest request; private final ResultInterpreter resultInterpreter; private Iterator> itemsIterator; + @Nullable private Integer remainingLimit; @Nullable private Map lastEvaluatedKey; - private int totalResultCount; private ScannerIterator scannerIterator; @SuppressFBWarnings("EI_EXPOSE_REP2") - public QueryScanner(PaginatedRequest request, ResultInterpreter resultInterpreter) { + public QueryScanner(PaginatedRequest request, int limit, ResultInterpreter resultInterpreter) { this.request = request; - this.resultInterpreter = resultInterpreter; - handleResponse(request.execute()); + if (limit > 0) { + remainingLimit = limit; + handleResponse(request.execute(limit)); + } else { + remainingLimit = null; + handleResponse(request.execute()); + } + + this.resultInterpreter = resultInterpreter; } @Override @@ -49,7 +58,11 @@ private boolean hasNext() { return true; } if (lastEvaluatedKey != null) { - handleResponse(request.execute(lastEvaluatedKey)); + if (remainingLimit != null) { + handleResponse(request.execute(lastEvaluatedKey, remainingLimit)); + } else { + handleResponse(request.execute(lastEvaluatedKey)); + } return itemsIterator.hasNext(); } return false; @@ -57,10 +70,11 @@ private boolean hasNext() { private void handleResponse(PaginatedRequestResponse response) { List> items = response.items(); - totalResultCount += items.size(); + if (remainingLimit != null) { + remainingLimit -= items.size(); + } itemsIterator = items.iterator(); - if ((request.limit() == null || totalResultCount < request.limit()) - && response.hasLastEvaluatedKey()) { + if ((remainingLimit == null || remainingLimit > 0) && response.hasLastEvaluatedKey()) { lastEvaluatedKey = response.lastEvaluatedKey(); } else { lastEvaluatedKey = null; diff --git a/core/src/main/java/com/scalar/db/storage/dynamo/SelectStatementHandler.java b/core/src/main/java/com/scalar/db/storage/dynamo/SelectStatementHandler.java index ef7eb6fb1d..3b757af5e1 100644 --- a/core/src/main/java/com/scalar/db/storage/dynamo/SelectStatementHandler.java +++ b/core/src/main/java/com/scalar/db/storage/dynamo/SelectStatementHandler.java @@ -142,16 +142,16 @@ private Scanner executeScanWithIndex(Selection selection, TableMetadata tableMet builder.expressionAttributeNames(expressionAttributeNames); + int limit = 0; if (selection instanceof Scan) { Scan scan = (Scan) selection; - if (scan.getLimit() > 0) { - builder.limit(scan.getLimit()); - } + limit = scan.getLimit(); } + com.scalar.db.storage.dynamo.request.QueryRequest request = new com.scalar.db.storage.dynamo.request.QueryRequest(client, builder.build()); return new QueryScanner( - request, new ResultInterpreter(selection.getProjections(), tableMetadata)); + request, limit, new ResultInterpreter(selection.getProjections(), tableMetadata)); } private Scanner executeScan(Scan scan, TableMetadata tableMetadata) { @@ -171,10 +171,6 @@ private Scanner executeScan(Scan scan, TableMetadata tableMetadata) { } } - if (scan.getLimit() > 0) { - builder.limit(scan.getLimit()); - } - if (!scan.getProjections().isEmpty()) { Map expressionAttributeNames = new HashMap<>(); projectionExpression(builder, scan, expressionAttributeNames); @@ -184,20 +180,17 @@ private Scanner executeScan(Scan scan, TableMetadata tableMetadata) { if (scan.getConsistency() != Consistency.EVENTUAL) { builder.consistentRead(true); } + com.scalar.db.storage.dynamo.request.QueryRequest queryRequest = new com.scalar.db.storage.dynamo.request.QueryRequest(client, builder.build()); return new QueryScanner( - queryRequest, new ResultInterpreter(scan.getProjections(), tableMetadata)); + queryRequest, scan.getLimit(), new ResultInterpreter(scan.getProjections(), tableMetadata)); } private Scanner executeFullScan(ScanAll scan, TableMetadata tableMetadata) { DynamoOperation dynamoOperation = new DynamoOperation(scan, tableMetadata); ScanRequest.Builder builder = ScanRequest.builder().tableName(dynamoOperation.getTableName()); - if (scan.getLimit() > 0) { - builder.limit(scan.getLimit()); - } - if (!scan.getProjections().isEmpty()) { Map expressionAttributeNames = new HashMap<>(); projectionExpression(builder, scan, expressionAttributeNames); @@ -207,10 +200,13 @@ private Scanner executeFullScan(ScanAll scan, TableMetadata tableMetadata) { if (scan.getConsistency() != Consistency.EVENTUAL) { builder.consistentRead(true); } + com.scalar.db.storage.dynamo.request.ScanRequest requestWrapper = new com.scalar.db.storage.dynamo.request.ScanRequest(client, builder.build()); return new QueryScanner( - requestWrapper, new ResultInterpreter(scan.getProjections(), tableMetadata)); + requestWrapper, + scan.getLimit(), + new ResultInterpreter(scan.getProjections(), tableMetadata)); } private void projectionExpression( diff --git a/core/src/main/java/com/scalar/db/storage/dynamo/request/PaginatedRequest.java b/core/src/main/java/com/scalar/db/storage/dynamo/request/PaginatedRequest.java index 999c7bdb6b..4246271e1c 100644 --- a/core/src/main/java/com/scalar/db/storage/dynamo/request/PaginatedRequest.java +++ b/core/src/main/java/com/scalar/db/storage/dynamo/request/PaginatedRequest.java @@ -8,22 +8,34 @@ public interface PaginatedRequest { /** * Execute the request * - * @return the request response + * @return the response */ PaginatedRequestResponse execute(); + /** + * Execute the request with limit + * + * @param limit the maximum number of items to evaluate (not necessarily the number of matching + * items) + * @return the response + */ + PaginatedRequestResponse execute(int limit); + /** * Execute the request that will be evaluated starting from the given start key * - * @param exclusiveStartKey The primary key of the first item that this operation will evaluate. - * @return the request response + * @param exclusiveStartKey the primary key of the first item that this operation will evaluate. + * @return the response */ PaginatedRequestResponse execute(Map exclusiveStartKey); /** - * Returns the request limit + * Execute the request that will be evaluated starting from the given start key with limit * - * @return the request limit + * @param exclusiveStartKey the primary key of the first item that this operation will evaluate. + * @param limit the maximum number of items to evaluate (not necessarily the number of matching + * items) + * @return the response */ - Integer limit(); + PaginatedRequestResponse execute(Map exclusiveStartKey, int limit); } diff --git a/core/src/main/java/com/scalar/db/storage/dynamo/request/QueryRequest.java b/core/src/main/java/com/scalar/db/storage/dynamo/request/QueryRequest.java index 59beb35605..c046381ba3 100644 --- a/core/src/main/java/com/scalar/db/storage/dynamo/request/QueryRequest.java +++ b/core/src/main/java/com/scalar/db/storage/dynamo/request/QueryRequest.java @@ -19,6 +19,21 @@ public QueryRequest( this.dynamoRequest = dynamoRequest; } + @Override + public PaginatedRequestResponse execute() { + QueryResponse response = client.query(dynamoRequest); + + return new PaginatedRequestResponse( + response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey()); + } + + @Override + public PaginatedRequestResponse execute(int limit) { + QueryRequest request = + new QueryRequest(this.client, this.dynamoRequest.toBuilder().limit(limit).build()); + return request.execute(); + } + @Override public PaginatedRequestResponse execute(Map exclusiveStartKey) { QueryRequest request = @@ -29,15 +44,16 @@ public PaginatedRequestResponse execute(Map exclusiveSta } @Override - public PaginatedRequestResponse execute() { - QueryResponse response = client.query(dynamoRequest); - - return new PaginatedRequestResponse( - response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey()); - } - - @Override - public Integer limit() { - return dynamoRequest.limit(); + public PaginatedRequestResponse execute( + Map exclusiveStartKey, int limit) { + QueryRequest request = + new QueryRequest( + this.client, + this.dynamoRequest + .toBuilder() + .exclusiveStartKey(exclusiveStartKey) + .limit(limit) + .build()); + return request.execute(); } } diff --git a/core/src/main/java/com/scalar/db/storage/dynamo/request/ScanRequest.java b/core/src/main/java/com/scalar/db/storage/dynamo/request/ScanRequest.java index 84ce4bb136..f517edc000 100644 --- a/core/src/main/java/com/scalar/db/storage/dynamo/request/ScanRequest.java +++ b/core/src/main/java/com/scalar/db/storage/dynamo/request/ScanRequest.java @@ -19,6 +19,21 @@ public ScanRequest( this.dynamoRequest = dynamoRequest; } + @Override + public PaginatedRequestResponse execute() { + ScanResponse response = client.scan(dynamoRequest); + + return new PaginatedRequestResponse( + response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey()); + } + + @Override + public PaginatedRequestResponse execute(int limit) { + ScanRequest request = + new ScanRequest(this.client, this.dynamoRequest.toBuilder().limit(limit).build()); + return request.execute(); + } + @Override public PaginatedRequestResponse execute(Map exclusiveStartKey) { ScanRequest request = @@ -29,15 +44,16 @@ public PaginatedRequestResponse execute(Map exclusiveSta } @Override - public PaginatedRequestResponse execute() { - ScanResponse response = client.scan(dynamoRequest); - - return new PaginatedRequestResponse( - response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey()); - } - - @Override - public Integer limit() { - return dynamoRequest.limit(); + public PaginatedRequestResponse execute( + Map exclusiveStartKey, int limit) { + ScanRequest request = + new ScanRequest( + this.client, + this.dynamoRequest + .toBuilder() + .exclusiveStartKey(exclusiveStartKey) + .limit(limit) + .build()); + return request.execute(); } } diff --git a/core/src/test/java/com/scalar/db/storage/dynamo/QueryScannerTest.java b/core/src/test/java/com/scalar/db/storage/dynamo/QueryScannerTest.java index c8105c769c..1e40781dba 100644 --- a/core/src/test/java/com/scalar/db/storage/dynamo/QueryScannerTest.java +++ b/core/src/test/java/com/scalar/db/storage/dynamo/QueryScannerTest.java @@ -40,11 +40,10 @@ public void one_ShouldReturnResult() { Map item = Collections.emptyMap(); List> items = Arrays.asList(item, item, item); when(request.execute()).thenReturn(response); - when(request.limit()).thenReturn(null); when(response.items()).thenReturn(items); when(resultInterpreter.interpret(item)).thenReturn(result); - QueryScanner queryScanner = new QueryScanner(request, resultInterpreter); + QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter); // Act Optional actual1 = queryScanner.one(); @@ -71,11 +70,10 @@ public void all_ShouldReturnResults() { Map item = Collections.emptyMap(); List> items = Arrays.asList(item, item, item); when(request.execute()).thenReturn(response); - when(request.limit()).thenReturn(null); when(response.items()).thenReturn(items); when(resultInterpreter.interpret(item)).thenReturn(result); - QueryScanner queryScanner = new QueryScanner(request, resultInterpreter); + QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter); // Act List results1 = queryScanner.all(); @@ -100,9 +98,8 @@ public void iterator_ShouldReturnResults() { when(response.items()).thenReturn(items); when(resultInterpreter.interpret(item)).thenReturn(result); when(request.execute()).thenReturn(response); - when(request.limit()).thenReturn(null); - QueryScanner queryScanner = new QueryScanner(request, resultInterpreter); + QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter); // Act Iterator iterator = queryScanner.iterator(); @@ -134,9 +131,8 @@ public void one_ResponseWithLastEvaluatedKey_ShouldReturnResults() { when(resultInterpreter.interpret(item)).thenReturn(result); when(request.execute()).thenReturn(response); when(request.execute(lastEvaluatedKey)).thenReturn(response); - when(request.limit()).thenReturn(null); - QueryScanner queryScanner = new QueryScanner(request, resultInterpreter); + QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter); // Act Optional actual1 = queryScanner.one(); @@ -164,26 +160,27 @@ public void one_ResponseWithLastEvaluatedKey_ShouldReturnResults() { @Test public void one_RequestWithLimitAndResponseWithLastEvaluatedKey_ShouldReturnResults() { // Arrange + int limit = 3; + Map item = Collections.emptyMap(); - List> items = Arrays.asList(item, item); + List> items1 = Arrays.asList(item, item); + List> items2 = Collections.singletonList(item); Map lastEvaluatedKey = Collections.emptyMap(); - when(request.limit()).thenReturn(4); - when(response.items()).thenReturn(items); + when(response.items()).thenReturn(items1).thenReturn(items2); when(response.hasLastEvaluatedKey()).thenReturn(true); when(response.lastEvaluatedKey()).thenReturn(lastEvaluatedKey); - when(request.execute()).thenReturn(response); - when(request.execute(lastEvaluatedKey)).thenReturn(response); + when(request.execute(limit)).thenReturn(response); + when(request.execute(lastEvaluatedKey, limit - items1.size())).thenReturn(response); when(resultInterpreter.interpret(item)).thenReturn(result); - QueryScanner queryScanner = new QueryScanner(request, resultInterpreter); + QueryScanner queryScanner = new QueryScanner(request, limit, resultInterpreter); // Act Optional actual1 = queryScanner.one(); Optional actual2 = queryScanner.one(); Optional actual3 = queryScanner.one(); Optional actual4 = queryScanner.one(); - Optional actual5 = queryScanner.one(); // Assert assertThat(actual1).isPresent(); @@ -192,12 +189,10 @@ public void one_RequestWithLimitAndResponseWithLastEvaluatedKey_ShouldReturnResu assertThat(actual2.get()).isEqualTo(result); assertThat(actual3).isPresent(); assertThat(actual3.get()).isEqualTo(result); - assertThat(actual4).isPresent(); - assertThat(actual4.get()).isEqualTo(result); - assertThat(actual5).isNotPresent(); + assertThat(actual4).isNotPresent(); - verify(resultInterpreter, times(4)).interpret(item); - verify(request).execute(lastEvaluatedKey); - verify(request).execute(); + verify(resultInterpreter, times(limit)).interpret(item); + verify(request).execute(limit); + verify(request).execute(lastEvaluatedKey, limit - items1.size()); } } diff --git a/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java index 30f6eec32e..e947eb7366 100644 --- a/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.stream.IntStream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterAll; @@ -1614,54 +1615,100 @@ public void scan_ScanGivenForNonIndexedColumn_ShouldThrowIllegalArgumentExceptio @Test public void scan_ScanLargeData_ShouldRetrieveExpectedValues() throws ExecutionException, IOException { + int recordCount = 345; + // Arrange Key partitionKey = new Key(COL_NAME1, 1); - for (int i = 0; i < 345; i++) { + for (int i = 0; i < recordCount; i++) { Key clusteringKey = new Key(COL_NAME4, i); storage.put( new Put(partitionKey, clusteringKey) .withBlobValue(COL_NAME6, new byte[getLargeDataSizeInBytes()])); } + Scan scan = new Scan(partitionKey); // Act List results = scanAll(scan); // Assert - assertThat(results.size()).isEqualTo(345); - for (int i = 0; i < 345; i++) { - assertThat(results.get(i).getValue(COL_NAME1).isPresent()).isTrue(); - assertThat(results.get(i).getValue(COL_NAME1).get().getAsInt()).isEqualTo(1); - assertThat(results.get(i).getValue(COL_NAME4).isPresent()).isTrue(); - assertThat(results.get(i).getValue(COL_NAME4).get().getAsInt()).isEqualTo(i); + assertThat(results.size()).isEqualTo(recordCount); + for (int i = 0; i < recordCount; i++) { + assertThat(results.get(i).getInt(COL_NAME1)).isEqualTo(1); + assertThat(results.get(i).getInt(COL_NAME4)).isEqualTo(i); } } @Test public void scan_ScanLargeDataWithOrdering_ShouldRetrieveExpectedValues() throws ExecutionException, IOException { + int recordCount = 345; + int fetchCount = 234; + // Arrange Key partitionKey = new Key(COL_NAME1, 1); - for (int i = 0; i < 345; i++) { + for (int i = 0; i < recordCount; i++) { Key clusteringKey = new Key(COL_NAME4, i); storage.put( new Put(partitionKey, clusteringKey) .withBlobValue(COL_NAME6, new byte[getLargeDataSizeInBytes()])); } - Scan scan = new Scan(partitionKey).withOrdering(new Ordering(COL_NAME4, Order.ASC)); + + Scan scanAsc = new Scan(partitionKey).withOrdering(new Ordering(COL_NAME4, Order.ASC)); + Scan scanDesc = new Scan(partitionKey).withOrdering(new Ordering(COL_NAME4, Order.DESC)); // Act - List results = new ArrayList<>(); - try (Scanner scanner = storage.scan(scan)) { + List resultsAsc = new ArrayList<>(); + try (Scanner scanner = storage.scan(scanAsc)) { Iterator iterator = scanner.iterator(); - for (int i = 0; i < 234; i++) { - results.add(iterator.next()); + for (int i = 0; i < fetchCount; i++) { + resultsAsc.add(iterator.next()); + } + } + + List resultsDesc = new ArrayList<>(); + try (Scanner scanner = storage.scan(scanDesc)) { + Iterator iterator = scanner.iterator(); + for (int i = 0; i < fetchCount; i++) { + resultsDesc.add(iterator.next()); } } // Assert - assertThat(results.size()).isEqualTo(234); - for (int i = 0; i < 234; i++) { + assertThat(resultsAsc.size()).isEqualTo(fetchCount); + for (int i = 0; i < fetchCount; i++) { + assertThat(resultsAsc.get(i).getInt(COL_NAME1)).isEqualTo(1); + assertThat(resultsAsc.get(i).getInt(COL_NAME4)).isEqualTo(i); + } + + assertThat(resultsDesc.size()).isEqualTo(fetchCount); + for (int i = 0; i < fetchCount; i++) { + assertThat(resultsDesc.get(i).getInt(COL_NAME1)).isEqualTo(1); + assertThat(resultsDesc.get(i).getInt(COL_NAME4)).isEqualTo(recordCount - i - 1); + } + } + + @Test + public void scan_ScanLargeDataWithLimit_ShouldRetrieveExpectedValues() throws ExecutionException { + // Arrange + int recordCount = 345; + int limit = 234; + + Key partitionKey = new Key(COL_NAME1, 1); + for (int i = 0; i < recordCount; i++) { + Key clusteringKey = new Key(COL_NAME4, i); + storage.put( + new Put(partitionKey, clusteringKey) + .withBlobValue(COL_NAME6, new byte[getLargeDataSizeInBytes()])); + } + Scan scan = new Scan(partitionKey).withLimit(limit); + + // Act + List results = storage.scan(scan).all(); + + // Assert + assertThat(results.size()).isEqualTo(limit); + for (int i = 0; i < limit; i++) { assertThat(results.get(i).getInt(COL_NAME1)).isEqualTo(1); assertThat(results.get(i).getInt(COL_NAME4)).isEqualTo(i); } @@ -1891,6 +1938,69 @@ public void scan_ScanAllWithProjectionsGivenOnNonPrimaryKey_ShouldRetrieveOnlyPr assertThat(results.get(0).getBoolean(COL_NAME5)).isTrue(); } + @Test + public void scan_ScanAllLargeData_ShouldRetrieveExpectedValues() + throws ExecutionException, IOException { + int recordCount = 345; + + // Arrange + Key clusteringKey = new Key(COL_NAME4, 1); + for (int i = 0; i < recordCount; i++) { + Key partitionKey = new Key(COL_NAME1, i); + storage.put( + new Put(partitionKey, clusteringKey) + .withBlobValue(COL_NAME6, new byte[getLargeDataSizeInBytes()])); + } + + ScanAll scanAll = new ScanAll(); + + // Act + List results = scanAll(scanAll); + + // Assert + assertThat(results.size()).isEqualTo(recordCount); + + Set partitionKeys = new HashSet<>(); + for (int i = 0; i < recordCount; i++) { + partitionKeys.add(results.get(i).getInt(COL_NAME1)); + assertThat(results.get(i).getInt(COL_NAME4)).isEqualTo(1); + } + assertThat(partitionKeys.size()).isEqualTo(recordCount); + assertThat(partitionKeys).allMatch(i -> i >= 0 && i < recordCount); + } + + @Test + public void scan_ScanAllLargeDataWithLimit_ShouldRetrieveExpectedValues() + throws ExecutionException { + // Arrange + int recordCount = 345; + int limit = 234; + + Key clusteringKey = new Key(COL_NAME4, 1); + for (int i = 0; i < recordCount; i++) { + Key partitionKey = new Key(COL_NAME1, i); + storage.put( + new Put(partitionKey, clusteringKey) + .withBlobValue(COL_NAME6, new byte[getLargeDataSizeInBytes()])); + } + + Scan scan = new ScanAll().withLimit(limit); + + // Act + List results = storage.scan(scan).all(); + + // Assert + assertThat(results.size()).isEqualTo(limit); + + Set partitionKeys = new HashSet<>(); + for (int i = 0; i < limit; i++) { + partitionKeys.add(results.get(i).getInt(COL_NAME1)); + assertThat(results.get(i).getInt(COL_NAME4)).isEqualTo(1); + } + assertThat(partitionKeys.size()).isEqualTo(limit); + assertThat(partitionKeys).allMatch(i -> i >= 0 && i < recordCount); + } + private void populateRecords() { List puts = preparePuts(); puts.forEach(p -> assertThatCode(() -> storage.put(p)).doesNotThrowAnyException());