diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 4c1693d613874..70641eae1150b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -94,6 +94,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.streaming.Work.State; +import org.apache.beam.runners.dataflow.worker.streaming.WorkDedupeKey; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter; @@ -1260,7 +1261,7 @@ public void close() { // Consider the item invalid. It will eventually be retried by Windmill if it still needs to // be processed. computationState.completeWorkAndScheduleNextWorkForKey( - ShardedKey.create(key, workItem.getShardingKey()), workItem.getWorkToken()); + ShardedKey.create(key, workItem.getShardingKey()), WorkDedupeKey.of(workItem)); } } finally { // Update total processing time counters. Updating in finally clause ensures that @@ -1337,7 +1338,10 @@ private void commitLoop() { for (Windmill.WorkItemCommitRequest workRequest : entry.getValue().getRequestsList()) { computationState.completeWorkAndScheduleNextWorkForKey( ShardedKey.create(workRequest.getKey(), workRequest.getShardingKey()), - workRequest.getWorkToken()); + WorkDedupeKey.builder() + .setCacheToken(workRequest.getCacheToken()) + .setWorkToken(workRequest.getWorkToken()) + .build()); } } } @@ -1368,7 +1372,10 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) // was deemed stuck. state.completeWorkAndScheduleNextWorkForKey( ShardedKey.create(request.getKey(), request.getShardingKey()), - request.getWorkToken()); + WorkDedupeKey.builder() + .setCacheToken(request.getCacheToken()) + .setWorkToken(request.getWorkToken()) + .build()); })) { return true; } else { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index 9858666c40a23..c8c352575036f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -23,6 +23,7 @@ import java.util.ArrayDeque; import java.util.Deque; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; @@ -83,6 +84,29 @@ static ActiveWorkState forTesting( return new ActiveWorkState(activeWork, computationStateCache); } + private static Stream toKeyedGetDataRequestStream( + Entry> shardedKeyAndWorkQueue, Instant refreshDeadline) { + ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey(); + Deque workQueue = shardedKeyAndWorkQueue.getValue(); + + return workQueue.stream() + .filter(work -> work.getStartTime().isBefore(refreshDeadline)) + .map( + work -> + Windmill.KeyedGetDataRequest.newBuilder() + .setKey(shardedKey.key()) + .setShardingKey(shardedKey.shardingKey()) + .setWorkToken(work.getWorkItem().getWorkToken()) + .addAllLatencyAttribution(work.getLatencyAttributions()) + .build()); + } + + private static String elapsedString(Instant start, Instant end) { + Duration activeFor = new Duration(start, end); + // Duration's toString always starts with "PT"; remove that here. + return activeFor.toString().substring(2); + } + /** * Activates {@link Work} for the {@link ShardedKey}. Outcome can be 1 of 3 {@link * ActivateWorkResult} @@ -108,9 +132,19 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w } // Ensure we don't already have this work token queued. - for (Work queuedWork : workQueue) { + Iterator workQueueIterator = workQueue.iterator(); + while (workQueueIterator.hasNext()) { + Work queuedWork = workQueueIterator.next(); if (queuedWork.getWorkItem().getWorkToken() == work.getWorkItem().getWorkToken()) { - return ActivateWorkResult.DUPLICATE; + // Work is a duplicate. + if (queuedWork.getWorkItem().getCacheToken() == work.getWorkItem().getCacheToken()) { + return ActivateWorkResult.DUPLICATE; + } else { + // Work is a retry. Remove the older work, and queue the most current retry. + workQueueIterator.remove(); + workQueue.addLast(work); + return ActivateWorkResult.QUEUED; + } } } @@ -126,19 +160,20 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w * #activeWork}. */ synchronized Optional completeWorkAndGetNextWorkForKey( - ShardedKey shardedKey, long workToken) { + ShardedKey shardedKey, WorkDedupeKey workDedupeKey) { @Nullable Queue workQueue = activeWork.get(shardedKey); if (workQueue == null) { // Work may have been completed due to clearing of stuck commits. - LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workToken); + LOG.warn( + "Unable to complete inactive work for key {} and token {}.", shardedKey, workDedupeKey); return Optional.empty(); } - removeCompletedWorkFromQueue(workQueue, shardedKey, workToken); + removeCompletedWorkFromQueue(workQueue, shardedKey, workDedupeKey); return getNextWork(workQueue, shardedKey); } private synchronized void removeCompletedWorkFromQueue( - Queue workQueue, ShardedKey shardedKey, long workToken) { + Queue workQueue, ShardedKey shardedKey, WorkDedupeKey workDedupeKey) { // avoid Preconditions.checkState here to prevent eagerly evaluating the // format string parameters for the error message. Work completedWork = @@ -147,16 +182,22 @@ private synchronized void removeCompletedWorkFromQueue( () -> new IllegalStateException( String.format( - "Active key %s without work, expected token %d", - shardedKey, workToken))); + "Active key %s without work, expected work_token %d, expected cache_token %d", + shardedKey, workDedupeKey.workToken(), workDedupeKey.cacheToken()))); - if (completedWork.getWorkItem().getWorkToken() != workToken) { + if (completedWork.getWorkItem().getWorkToken() != workDedupeKey.workToken() + || completedWork.isRetryOf(workDedupeKey)) { // Work may have been completed due to clearing of stuck commits. LOG.warn( - "Unable to complete due to token mismatch for key {} and token {}, actual token was {}.", + "Unable to complete due to token mismatch for " + + "key {}, work_token {}, and cache_token {}; " + + "actual work_token was {}," + + "actual cache_token was {}.", shardedKey, - workToken, - completedWork.getWorkItem().getWorkToken()); + workDedupeKey.workToken(), + workDedupeKey.cacheToken(), + completedWork.getWorkItem().getWorkToken(), + completedWork.getWorkItem().getCacheToken()); return; } @@ -178,21 +219,22 @@ private synchronized Optional getNextWork(Queue workQueue, ShardedKe * before the stuckCommitDeadline. */ synchronized void invalidateStuckCommits( - Instant stuckCommitDeadline, BiConsumer shardedKeyAndWorkTokenConsumer) { - for (Entry shardedKeyAndWorkToken : + Instant stuckCommitDeadline, + BiConsumer shardedKeyAndWorkTokenConsumer) { + for (Entry shardedKeyAndWorkToken : getStuckCommitsAt(stuckCommitDeadline).entrySet()) { ShardedKey shardedKey = shardedKeyAndWorkToken.getKey(); - long workToken = shardedKeyAndWorkToken.getValue(); + WorkDedupeKey workDedupeKey = shardedKeyAndWorkToken.getValue(); computationStateCache.invalidate(shardedKey.key(), shardedKey.shardingKey()); - shardedKeyAndWorkTokenConsumer.accept(shardedKey, workToken); + shardedKeyAndWorkTokenConsumer.accept(shardedKey, workDedupeKey); } } - private synchronized ImmutableMap getStuckCommitsAt( + private synchronized ImmutableMap getStuckCommitsAt( Instant stuckCommitDeadline) { // Determine the stuck commit keys but complete them outside the loop iterating over // activeWork as completeWork may delete the entry from activeWork. - ImmutableMap.Builder stuckCommits = ImmutableMap.builder(); + ImmutableMap.Builder stuckCommits = ImmutableMap.builder(); for (Entry> entry : activeWork.entrySet()) { ShardedKey shardedKey = entry.getKey(); @Nullable Work work = entry.getValue().peek(); @@ -202,7 +244,7 @@ private synchronized ImmutableMap getStuckCommitsAt( "Detected key {} stuck in COMMITTING state since {}, completing it with error.", shardedKey, work.getStateStartTime()); - stuckCommits.put(shardedKey, work.getWorkItem().getWorkToken()); + stuckCommits.put(shardedKey, WorkDedupeKey.of(work.getWorkItem())); } } } @@ -216,23 +258,6 @@ synchronized ImmutableList getKeysToRefresh(Instant refresh .collect(toImmutableList()); } - private static Stream toKeyedGetDataRequestStream( - Entry> shardedKeyAndWorkQueue, Instant refreshDeadline) { - ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey(); - Deque workQueue = shardedKeyAndWorkQueue.getValue(); - - return workQueue.stream() - .filter(work -> work.getStartTime().isBefore(refreshDeadline)) - .map( - work -> - Windmill.KeyedGetDataRequest.newBuilder() - .setKey(shardedKey.key()) - .setShardingKey(shardedKey.shardingKey()) - .setWorkToken(work.getWorkItem().getWorkToken()) - .addAllLatencyAttribution(work.getLatencyAttributions()) - .build()); - } - synchronized void printActiveWork(PrintWriter writer, Instant now) { writer.println( " counters, String return null; } - static Work createMockWork(long workToken) { - return Work.create( - Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(workToken).build(), - Instant::now, - Collections.emptyList(), - work -> {}); + static Work createMockWork(long workToken, long cacheToken) { + return createMockWork(workToken, cacheToken, work -> {}); } - static Work createMockWork(long workToken, Consumer processWorkFn) { + static Work createMockWork(long workToken, long cacheToken, Consumer processWorkFn) { return Work.create( - Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(workToken).build(), + Windmill.WorkItem.newBuilder() + .setKey(ByteString.EMPTY) + .setWorkToken(workToken) + .setCacheToken(cacheToken) + .build(), Instant::now, Collections.emptyList(), processWorkFn); @@ -2646,7 +2648,7 @@ public void testUnboundedSourceWorkRetry() throws Exception { } @Test - public void testActiveWork() throws Exception { + public void testActiveWork() { BoundedQueueExecutor mockExecutor = Mockito.mock(BoundedQueueExecutor.class); ComputationState computationState = new ComputationState( @@ -2659,39 +2661,39 @@ public void testActiveWork() throws Exception { ShardedKey key1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1); ShardedKey key2 = ShardedKey.create(ByteString.copyFromUtf8("key2"), 2); - Work m1 = createMockWork(1); + Work m1 = createMockWork(1, 1); assertTrue(computationState.activateWork(key1, m1)); Mockito.verify(mockExecutor).execute(m1, m1.getWorkItem().getSerializedSize()); - computationState.completeWorkAndScheduleNextWorkForKey(key1, 1); + computationState.completeWorkAndScheduleNextWorkForKey(key1, WorkDedupeKey.of(m1)); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify work queues. - Work m2 = createMockWork(2); + Work m2 = createMockWork(2, 2); assertTrue(computationState.activateWork(key1, m2)); Mockito.verify(mockExecutor).execute(m2, m2.getWorkItem().getSerializedSize()); - Work m3 = createMockWork(3); + Work m3 = createMockWork(3, 3); assertTrue(computationState.activateWork(key1, m3)); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify another key is a separate queue. - Work m4 = createMockWork(4); + Work m4 = createMockWork(4, 4); assertTrue(computationState.activateWork(key2, m4)); Mockito.verify(mockExecutor).execute(m4, m4.getWorkItem().getSerializedSize()); - computationState.completeWorkAndScheduleNextWorkForKey(key2, 4); + computationState.completeWorkAndScheduleNextWorkForKey(key2, WorkDedupeKey.of(m4)); Mockito.verifyNoMoreInteractions(mockExecutor); - computationState.completeWorkAndScheduleNextWorkForKey(key1, 2); + computationState.completeWorkAndScheduleNextWorkForKey(key1, WorkDedupeKey.of(m2)); Mockito.verify(mockExecutor).forceExecute(m3, m3.getWorkItem().getSerializedSize()); - computationState.completeWorkAndScheduleNextWorkForKey(key1, 3); + computationState.completeWorkAndScheduleNextWorkForKey(key1, WorkDedupeKey.of(m3)); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify duplicate work dropped. - Work m5 = createMockWork(5); + Work m5 = createMockWork(5, 5); computationState.activateWork(key1, m5); Mockito.verify(mockExecutor).execute(m5, m5.getWorkItem().getSerializedSize()); assertFalse(computationState.activateWork(key1, m5)); Mockito.verifyNoMoreInteractions(mockExecutor); - computationState.completeWorkAndScheduleNextWorkForKey(key1, 5); + computationState.completeWorkAndScheduleNextWorkForKey(key1, WorkDedupeKey.of(m5)); Mockito.verifyNoMoreInteractions(mockExecutor); } @@ -2709,22 +2711,22 @@ public void testActiveWorkForShardedKeys() throws Exception { ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1); ShardedKey key1Shard2 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 2); - Work m1 = createMockWork(1); + Work m1 = createMockWork(1, 1); assertTrue(computationState.activateWork(key1Shard1, m1)); Mockito.verify(mockExecutor).execute(m1, m1.getWorkItem().getSerializedSize()); - computationState.completeWorkAndScheduleNextWorkForKey(key1Shard1, 1); + computationState.completeWorkAndScheduleNextWorkForKey(key1Shard1, WorkDedupeKey.of(m1)); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify work queues. - Work m2 = createMockWork(2); + Work m2 = createMockWork(2, 2); assertTrue(computationState.activateWork(key1Shard1, m2)); Mockito.verify(mockExecutor).execute(m2, m2.getWorkItem().getSerializedSize()); - Work m3 = createMockWork(3); + Work m3 = createMockWork(3, 3); assertTrue(computationState.activateWork(key1Shard1, m3)); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify a different shard of key is a separate queue. - Work m4 = createMockWork(3); + Work m4 = createMockWork(3, 3); assertFalse(computationState.activateWork(key1Shard1, m4)); Mockito.verifyNoMoreInteractions(mockExecutor); assertTrue(computationState.activateWork(key1Shard2, m4)); @@ -2732,7 +2734,7 @@ public void testActiveWorkForShardedKeys() throws Exception { // Verify duplicate work dropped assertFalse(computationState.activateWork(key1Shard2, m4)); - computationState.completeWorkAndScheduleNextWorkForKey(key1Shard2, 3); + computationState.completeWorkAndScheduleNextWorkForKey(key1Shard2, WorkDedupeKey.of(m3)); Mockito.verifyNoMoreInteractions(mockExecutor); } @@ -2777,8 +2779,8 @@ public void testMaxThreadMetric() throws Exception { } }; - Work m2 = createMockWork(2, sleepProcessWorkFn); - Work m3 = createMockWork(3, sleepProcessWorkFn); + Work m2 = createMockWork(2, 2, sleepProcessWorkFn); + Work m3 = createMockWork(3, 3, sleepProcessWorkFn); assertTrue(computationState.activateWork(key1Shard1, m2)); assertTrue(computationState.activateWork(key1Shard1, m3)); @@ -2793,8 +2795,6 @@ public void testMaxThreadMetric() throws Exception { executor.shutdown(); } - volatile boolean stop = false; - @Test public void testActiveThreadMetric() throws Exception { int maxThreads = 5; @@ -2816,7 +2816,7 @@ public void testActiveThreadMetric() throws Exception { ComputationState computationState = new ComputationState( "computation", - defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))), + defaultMapTask(Collections.singletonList(makeSourceInstruction(StringUtf8Coder.of()))), executor, ImmutableMap.of(), null); @@ -2834,11 +2834,11 @@ public void testActiveThreadMetric() throws Exception { } }; - Work m2 = createMockWork(2, sleepProcessWorkFn); + Work m2 = createMockWork(2, 2, sleepProcessWorkFn); - Work m3 = createMockWork(3, sleepProcessWorkFn); + Work m3 = createMockWork(3, 3, sleepProcessWorkFn); - Work m4 = createMockWork(4, sleepProcessWorkFn); + Work m4 = createMockWork(4, 4, sleepProcessWorkFn); assertEquals(0, executor.activeCount()); assertTrue(computationState.activateWork(key1Shard1, m2)); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index 12ae816de8292..a837706d64053 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -34,8 +34,8 @@ import java.util.Optional; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.worker.streaming.ActiveWorkState.ActivateWorkResult; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -62,22 +62,27 @@ private static Work emptyWork() { return createWork(null); } - private static Work createWork(@Nullable Windmill.WorkItem workItem) { + private static Work createWork(@Nullable WorkItem workItem) { return Work.create(workItem, Instant::now, Collections.emptyList(), unused -> {}); } - private static Work expiredWork(Windmill.WorkItem workItem) { + private static Work expiredWork(WorkItem workItem) { return Work.create(workItem, () -> Instant.EPOCH, Collections.emptyList(), unused -> {}); } - private static Windmill.WorkItem createWorkItem(long workToken) { - return Windmill.WorkItem.newBuilder() + private static WorkItem createWorkItem(long workToken, long cacheToken) { + return WorkItem.newBuilder() .setKey(ByteString.copyFromUtf8("")) .setShardingKey(1) .setWorkToken(workToken) + .setCacheToken(cacheToken) .build(); } + private static WorkDedupeKey workDedupeToken(long workToken, long cacheToken) { + return WorkDedupeKey.builder().setCacheToken(cacheToken).setWorkToken(workToken).build(); + } + @Before public void setup() { Map> readWriteActiveWorkMap = new HashMap<>(); @@ -98,12 +103,14 @@ public void testActivateWorkForKey_EXECUTE_unknownKey() { public void testActivateWorkForKey_EXECUTE_emptyWorkQueueForKey() { ShardedKey shardedKey = shardedKey("someKey", 1L); long workToken = 1L; - + long cacheToken = 2L; ActivateWorkResult activateWorkResult = - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(workToken))); + activeWorkState.activateWorkForKey( + shardedKey, createWork(createWorkItem(workToken, cacheToken))); Optional nextWorkForKey = - activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workToken); + activeWorkState.completeWorkAndGetNextWorkForKey( + shardedKey, workDedupeToken(workToken, cacheToken)); assertEquals(ActivateWorkResult.EXECUTE, activateWorkResult); assertEquals(Optional.empty(), nextWorkForKey); @@ -113,24 +120,46 @@ public void testActivateWorkForKey_EXECUTE_emptyWorkQueueForKey() { @Test public void testActivateWorkForKey_DUPLICATE() { long workToken = 10L; + long cacheToken = 5L; ShardedKey shardedKey = shardedKey("someKey", 1L); // ActivateWork with the same shardedKey, and the same workTokens. - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(workToken))); + activeWorkState.activateWorkForKey( + shardedKey, createWork(createWorkItem(workToken, cacheToken))); ActivateWorkResult activateWorkResult = - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(workToken))); + activeWorkState.activateWorkForKey( + shardedKey, createWork(createWorkItem(workToken, cacheToken))); assertEquals(ActivateWorkResult.DUPLICATE, activateWorkResult); } + @Test + public void testActivateWorkForKey_withMatchingWorkTokenAndDifferentCacheToken_QUEUED() { + long workToken = 10L; + long cacheToken1 = 5L; + long cacheToken2 = 7L; + + Work firstWork = createWork(createWorkItem(workToken, cacheToken1)); + Work secondWork = createWork(createWorkItem(workToken, cacheToken2)); + ShardedKey shardedKey = shardedKey("someKey", 1L); + + // ActivateWork with the same shardedKey, and the same workTokens, but different cacheTokens. + activeWorkState.activateWorkForKey(shardedKey, firstWork); + ActivateWorkResult activateWorkResult = + activeWorkState.activateWorkForKey(shardedKey, secondWork); + + assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); + assertEquals(secondWork, readOnlyActiveWork.get(shardedKey).peek()); + } + @Test public void testActivateWorkForKey_QUEUED() { ShardedKey shardedKey = shardedKey("someKey", 1L); // ActivateWork with the same shardedKey, but different workTokens. - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(1L))); + activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(1L, 1L))); ActivateWorkResult activateWorkResult = - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(2L))); + activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(2L, 2L))); assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); } @@ -139,18 +168,39 @@ public void testActivateWorkForKey_QUEUED() { public void testCompleteWorkAndGetNextWorkForKey_noWorkQueueForKey() { assertEquals( Optional.empty(), - activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey("someKey", 1L), 10L)); + activeWorkState.completeWorkAndGetNextWorkForKey( + shardedKey("someKey", 1L), workDedupeToken(1L, 1L))); } @Test - public void testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueDoesNotMatchWorkToComplete() { - long workTokenToComplete = 1L; + public void + testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueWorkTokenDoesNotMatchWorkToComplete() { + long workTokenInQueue = 2L; + long otherWorkToken = 1L; + long cacheToken = 1L; + Work workInQueue = createWork(createWorkItem(workTokenInQueue, cacheToken)); + ShardedKey shardedKey = shardedKey("someKey", 1L); + + activeWorkState.activateWorkForKey(shardedKey, workInQueue); + activeWorkState.completeWorkAndGetNextWorkForKey( + shardedKey, workDedupeToken(otherWorkToken, cacheToken)); + + assertEquals(1, readOnlyActiveWork.get(shardedKey).size()); + assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek()); + } - Work workInQueue = createWork(createWorkItem(2L)); + @Test + public void + testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueCacheTokenDoesNotMatchWorkToComplete() { + long cacheTokenInQueue = 2L; + long otherCacheToken = 1L; + long workToken = 1L; + Work workInQueue = createWork(createWorkItem(workToken, cacheTokenInQueue)); ShardedKey shardedKey = shardedKey("someKey", 1L); activeWorkState.activateWorkForKey(shardedKey, workInQueue); - activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workTokenToComplete); + activeWorkState.completeWorkAndGetNextWorkForKey( + shardedKey, workDedupeToken(workToken, otherCacheToken)); assertEquals(1, readOnlyActiveWork.get(shardedKey).size()); assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek()); @@ -158,15 +208,14 @@ public void testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueDoesNotMatchW @Test public void testCompleteWorkAndGetNextWorkForKey_removesWorkFromQueueWhenComplete() { - long workTokenToComplete = 1L; - - Work activeWork = createWork(createWorkItem(workTokenToComplete)); - Work nextWork = createWork(createWorkItem(2L)); + Work activeWork = createWork(createWorkItem(1L, 1L)); + Work nextWork = createWork(createWorkItem(2L, 2L)); ShardedKey shardedKey = shardedKey("someKey", 1L); activeWorkState.activateWorkForKey(shardedKey, activeWork); activeWorkState.activateWorkForKey(shardedKey, nextWork); - activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workTokenToComplete); + activeWorkState.completeWorkAndGetNextWorkForKey( + shardedKey, WorkDedupeKey.of(activeWork.getWorkItem())); assertEquals(nextWork, readOnlyActiveWork.get(shardedKey).peek()); assertEquals(1, readOnlyActiveWork.get(shardedKey).size()); @@ -175,37 +224,36 @@ public void testCompleteWorkAndGetNextWorkForKey_removesWorkFromQueueWhenComplet @Test public void testCompleteWorkAndGetNextWorkForKey_removesQueueIfNoWorkPresent() { - Work workInQueue = createWork(createWorkItem(1L)); + Work workInQueue = createWork(createWorkItem(1L, 1L)); ShardedKey shardedKey = shardedKey("someKey", 1L); activeWorkState.activateWorkForKey(shardedKey, workInQueue); activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workInQueue.getWorkItem().getWorkToken()); + shardedKey, WorkDedupeKey.of(workInQueue.getWorkItem())); assertFalse(readOnlyActiveWork.containsKey(shardedKey)); } @Test public void testCompleteWorkAndGetNextWorkForKey_returnsWorkIfPresent() { - Work workToBeCompleted = createWork(createWorkItem(1L)); - Work nextWork = createWork(createWorkItem(2L)); + Work workToBeCompleted = createWork(createWorkItem(1L, 1L)); + Work nextWork = createWork(createWorkItem(2L, 2L)); ShardedKey shardedKey = shardedKey("someKey", 1L); activeWorkState.activateWorkForKey(shardedKey, workToBeCompleted); activeWorkState.activateWorkForKey(shardedKey, nextWork); activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workToBeCompleted.getWorkItem().getWorkToken()); + shardedKey, WorkDedupeKey.of(workToBeCompleted)); Optional nextWorkOpt = activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workToBeCompleted.getWorkItem().getWorkToken()); + shardedKey, WorkDedupeKey.of(workToBeCompleted)); assertTrue(nextWorkOpt.isPresent()); assertSame(nextWork, nextWorkOpt.get()); Optional endOfWorkQueue = - activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, nextWork.getWorkItem().getWorkToken()); + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, WorkDedupeKey.of(nextWork)); assertFalse(endOfWorkQueue.isPresent()); assertFalse(readOnlyActiveWork.containsKey(shardedKey)); @@ -213,11 +261,11 @@ public void testCompleteWorkAndGetNextWorkForKey_returnsWorkIfPresent() { @Test public void testInvalidateStuckCommits() { - Map invalidatedCommits = new HashMap<>(); + Map invalidatedCommits = new HashMap<>(); - Work stuckWork1 = expiredWork(createWorkItem(1L)); + Work stuckWork1 = expiredWork(createWorkItem(1L, 1L)); stuckWork1.setState(Work.State.COMMITTING); - Work stuckWork2 = expiredWork(createWorkItem(2L)); + Work stuckWork2 = expiredWork(createWorkItem(2L, 2L)); stuckWork2.setState(Work.State.COMMITTING); ShardedKey shardedKey1 = shardedKey("someKey", 1L); ShardedKey shardedKey2 = shardedKey("anotherKey", 2L); @@ -228,9 +276,9 @@ public void testInvalidateStuckCommits() { activeWorkState.invalidateStuckCommits(Instant.now(), invalidatedCommits::put); assertThat(invalidatedCommits) - .containsEntry(shardedKey1, stuckWork1.getWorkItem().getWorkToken()); + .containsEntry(shardedKey1, WorkDedupeKey.of(stuckWork1.getWorkItem())); assertThat(invalidatedCommits) - .containsEntry(shardedKey2, stuckWork2.getWorkItem().getWorkToken()); + .containsEntry(shardedKey2, WorkDedupeKey.of(stuckWork2.getWorkItem())); verify(computationStateCache).invalidate(shardedKey1.key(), shardedKey1.shardingKey()); verify(computationStateCache).invalidate(shardedKey2.key(), shardedKey2.shardingKey()); } @@ -238,11 +286,10 @@ public void testInvalidateStuckCommits() { @Test public void testGetKeysToRefresh() { Instant refreshDeadline = Instant.now(); - - Work freshWork = createWork(createWorkItem(3L)); - Work refreshableWork1 = expiredWork(createWorkItem(1L)); + Work freshWork = createWork(createWorkItem(3L, 3L)); + Work refreshableWork1 = expiredWork(createWorkItem(1L, 1L)); refreshableWork1.setState(Work.State.COMMITTING); - Work refreshableWork2 = expiredWork(createWorkItem(2L)); + Work refreshableWork2 = expiredWork(createWorkItem(2L, 2L)); refreshableWork2.setState(Work.State.COMMITTING); ShardedKey shardedKey1 = shardedKey("someKey", 1L); ShardedKey shardedKey2 = shardedKey("anotherKey", 2L);