diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index bc82d3794cb16..af9391f57cd30 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -205,20 +205,19 @@ public void testActivateWorkForKey_DUPLICATE() { @Test public void testActivateWorkForKey_matchingCacheTokens_newWorkTokenGreater_queuedWorkNotActive_QUEUED() { - long cacheToken = 1L; + long matchingCacheToken = 1L; long newWorkToken = 10L; long queuedWorkToken = newWorkToken / 2; - Work differentWorkTokenWork = createWork(createWorkItem(1L, 1L)); - Work queuedWork = createWork(createWorkItem(queuedWorkToken, cacheToken)); - Work newWork = createWork(createWorkItem(newWorkToken, cacheToken)); + Work differentWorkTokenWork = createWork(createWorkItem(100L, 100L)); + Work queuedWork = createWork(createWorkItem(queuedWorkToken, matchingCacheToken)); + Work newWork = createWork(createWorkItem(newWorkToken, matchingCacheToken)); ShardedKey shardedKey = shardedKey("someKey", 1L); activeWorkState.activateWorkForKey(shardedKey, differentWorkTokenWork); activeWorkState.activateWorkForKey(shardedKey, queuedWork); ActivateWorkResult activateWorkResult = activeWorkState.activateWorkForKey(shardedKey, newWork); - // newWork should be queued and queuedWork should not be removed since it is currently active. assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); assertTrue(readOnlyActiveWork.get(shardedKey).contains(newWork)); assertFalse(readOnlyActiveWork.get(shardedKey).contains(queuedWork)); @@ -226,7 +225,22 @@ public void testActivateWorkForKey_DUPLICATE() { } @Test - public void testActivateWorkForKey_matchingCacheTokens_newWorkTokenLesser_STALE() {} + public void testActivateWorkForKey_matchingCacheTokens_newWorkTokenLesser_STALE() { + long cacheToken = 1L; + long queuedWorkToken = 10L; + long newWorkToken = queuedWorkToken / 2; + + Work queuedWork = createWork(createWorkItem(queuedWorkToken, cacheToken)); + Work newWork = createWork(createWorkItem(newWorkToken, cacheToken)); + ShardedKey shardedKey = shardedKey("someKey", 1L); + + activeWorkState.activateWorkForKey(shardedKey, queuedWork); + ActivateWorkResult activateWorkResult = activeWorkState.activateWorkForKey(shardedKey, newWork); + + assertEquals(ActivateWorkResult.STALE, activateWorkResult); + assertFalse(readOnlyActiveWork.get(shardedKey).contains(newWork)); + assertEquals(queuedWork, readOnlyActiveWork.get(shardedKey).peek()); + } @Test public void testActivateWorkForKey_QUEUED() {