Skip to content

Commit

Permalink
Make subquery dynamoDB query strongly consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
kr565370 committed Mar 13, 2024
1 parent 394cb0b commit 4bb2e05
Showing 1 changed file with 7 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,10 @@ public TrackedQuery getStatus(String queryId, String subQueryId) throws QueryTra
.withTableName(trackerTableName)
.addKeyConditionsEntry(QUERY_ID, new Condition()
.withAttributeValueList(new AttributeValue(queryId))
.withComparisonOperator(ComparisonOperator.EQ)
)
.withComparisonOperator(ComparisonOperator.EQ))
.addKeyConditionsEntry(SUB_QUERY_ID, new Condition()
.withAttributeValueList(new AttributeValue(subQueryId))
.withComparisonOperator(ComparisonOperator.EQ)
)
);
.withComparisonOperator(ComparisonOperator.EQ)));

if (result.getCount() == 0) {
return null;
Expand Down Expand Up @@ -205,11 +202,11 @@ private void updateState(DynamoDBQueryTrackerEntry entry) {
private void updateStateOfParent(DynamoDBQueryTrackerEntry leafQueryEntry) {
List<Map<String, AttributeValue>> trackedQueries = dynamoDB.query(new QueryRequest()
.withTableName(trackerTableName)
.withConsistentRead(true)
.addKeyConditionsEntry(QUERY_ID, new Condition()
.withAttributeValueList(new AttributeValue(leafQueryEntry.getQueryId()))
.withComparisonOperator(ComparisonOperator.EQ)
)
).getItems();
.withComparisonOperator(ComparisonOperator.EQ)))
.getItems();

List<TrackedQuery> children = trackedQueries.stream()
.map(DynamoDBQueryTrackerEntry::toTrackedQuery)
Expand All @@ -219,8 +216,8 @@ private void updateStateOfParent(DynamoDBQueryTrackerEntry leafQueryEntry) {
QueryState parentState = getParentState(children);

if (parentState != null) {
long totalRecordCount = children.stream().mapToLong(query ->
query.getRecordCount() != null ? query.getRecordCount() : 0).sum();
long totalRecordCount = children.stream()
.mapToLong(query -> query.getRecordCount() != null ? query.getRecordCount() : 0).sum();
LOGGER.info("Updating state of parent to {}", parentState);
updateState(leafQueryEntry.updateParent(parentState, totalRecordCount));
}
Expand Down

0 comments on commit 4bb2e05

Please sign in to comment.