Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check for cachetoken representing a retry before activating and completing work #29082

Merged
merged 12 commits into from
Feb 13, 2024

Conversation

m-trieu
Copy link
Contributor

@m-trieu m-trieu commented Oct 20, 2023

Instead of just checking a Work/WorkItem's workToken to detect duplicates, also check the cacheToken.

Change behavior of ActiveWorkState#activateWork to only return ActivateWorkResult.DUPLICATE if both work and cache tokens are the same. Else if the work tokens match but the cache tokens do not, evict the currently queued work and add the newer retry of the work to the work queue.

more context:

r: @scwhittle


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs a rebase also

@@ -83,6 +84,29 @@ static ActiveWorkState forTesting(
return new ActiveWorkState(activeWork, computationStateCache);
}

private static Stream<KeyedGetDataRequest> toKeyedGetDataRequestStream(
Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue, Instant refreshDeadline) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have the ShardedKey and the queue be separate parameters?

also could take just an Iterable instead of Deque

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -94,6 +94,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.Work.State;
import org.apache.beam.runners.dataflow.worker.streaming.WorkDedupeKey;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe name WorkIdentifiers? WorkTokens?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done change to WorkId

if (queuedWork.getWorkItem().getWorkToken() == work.getWorkItem().getWorkToken()) {
return ActivateWorkResult.DUPLICATE;
// Work is a duplicate.
if (queuedWork.getWorkItem().getCacheToken() == work.getWorkItem().getCacheToken()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be cleaner to use WorkDedupKey or whatever it is renamed and call methods like equals or maybe something like knownEarlierThan

if the cache token is the same, we'd prefer to keep the later work item # since they are increasing from windmill within a worker. Otherwise we can just take the last observed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add the startTime to the WorkDedupeKey/WorkId then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

realized we don't need to do that since Work has the startTime


if (completedWork.getWorkItem().getWorkToken() != workToken) {
if (completedWork.getWorkItem().getWorkToken() != workDedupeKey.workToken()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the new class and an equals method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

shardedKey,
workToken,
completedWork.getWorkItem().getWorkToken());
workDedupeKey.workToken(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add toString for new class and use it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done auto value has the out of the box so just printing the object

getStuckCommitsAt(stuckCommitDeadline).entrySet()) {
ShardedKey shardedKey = shardedKeyAndWorkToken.getKey();
long workToken = shardedKeyAndWorkToken.getValue();
WorkDedupeKey workDedupeKey = shardedKeyAndWorkToken.getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update variable names as type changed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -125,6 +125,11 @@ boolean isStuckCommittingAt(Instant stuckCommitDeadline) {
&& currentState.startTime().isBefore(stuckCommitDeadline);
}

boolean isRetryOf(WorkDedupeKey workDedupeKey) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe instead put this method on WorkDedupeKey and have a method vending WorkDedupKey from Work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.build();
}

public static WorkDedupeKey of(Work work) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm if Work changes to have method to return WorkDedupKey

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

ShardedKey shardedKey = shardedKey("someKey", 1L);

activeWorkState.activateWorkForKey(shardedKey, activeWork);
activeWorkState.activateWorkForKey(shardedKey, nextWork);
activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workTokenToComplete);
activeWorkState.completeWorkAndGetNextWorkForKey(
shardedKey, WorkDedupeKey.of(activeWork.getWorkItem()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we add method to get dedup key on Work this could be simplified, ditto below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@scwhittle
Copy link
Contributor

Latest changes don't appear to have been pushed
BTW if fixups to PR are kept as separate commits (instead of squashed and force pushing) I think that Github review tool may work better. It seems easy to squash when merging so I don't think there should be concerns about those commits making it to beam/master branch.


// Work tokens and cache tokens are equal.
if (queuedWork.id().equals(work.id())) {
if (work.newerThan(queuedWork)) {
Copy link
Contributor

@scwhittle scwhittle Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If cache token + work token are equal, the queuedWork and new work shoudl be equivalent. Let's just return DUPLICATE in that case.

What I meant with the newer than check is the case where we have equal cache tokens but different work tokens. Since the equal cache tokens implies it was from the same worker, and workers give out increasing work tokens we can just keep the work item with the higher work token.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to clarify:

  • existingWorkToken == newWorkToken && existingCacheToken == newCacheToken: return DUPLICATE
  • existingWorkToken > newWorkToken && existingCacheToken == cacheToken2: ignore
  • existingWorkToken < newWorkToken && existingCacheToken == cacheToken2: return QUEUED (queue newWorkToken, removeExistingWorkToken if it is not active)
  • existingWorkToken == newWorkToken && existingCacheToken != newCacheToken: return QUEUED (queue newWorkToken, removeExistingWorkToken if it is not active)

is this the behavior we want? If it is equal cache tokens, but different work tokens what does that mean? Same worker with different work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the listed are correct (maybe return STALE in the ignore case?).

If it is equal cache tokens, but different work tokens what does that mean? Same worker with different work?
It means that the the user worker may be observing a stale retry of work previously sent by the windmill worker (if new work token is less or equal to existing work) or it might mean the existing item was a retry of a work item that has since committed and the newly arriving work item for the key is the now active item.

In general, if we get different cache tokens we don't know how the previous work relates to the existing work. We could guess based upon observed ordering but since the requests could be received out of worker if there were multiple windmill workers who sent work, it seems safer to queue in that case.

There is separate work ongoing to use heartbeats to identify work items that are no longer valid and stop processing them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack done


// Retries have the same work token, but different cache tokens.
if (work.isRetryOf(queuedWork)) {
workQueueIterator.remove();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for these cases we're removing, I'm not sure we should remove if the iterator is the first in the queue. In that case it is the active item, removing it isn't actually terminating the active work (we may add support for this in the future in which case we can improve handling here). And removing it will just lead to confusing logs below about tokens mismatching when the active finishes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added check done

@Nullable Queue<Work> workQueue = activeWork.get(shardedKey);
if (workQueue == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workToken);
LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: replace token with ids

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

// avoid Preconditions.checkState here to prevent eagerly evaluating the
// format string parameters for the error message.
Work completedWork =
Work workToComplete =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the previous name was better since the work has already completed user processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


if (completedWork.getWorkItem().getWorkToken() != workToken) {
if (!workToComplete.id().isForSameWork(workId) || workToComplete.id().isRetryOf(workId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use !equals?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Instant stuckCommitDeadline, BiConsumer<ShardedKey, Long> shardedKeyAndWorkTokenConsumer) {
for (Entry<ShardedKey, Long> shardedKeyAndWorkToken :
Instant stuckCommitDeadline, BiConsumer<ShardedKey, WorkId> shardedKeyAndWorkIdConsumer) {
for (Entry<ShardedKey, WorkId> shardedKeyAndWorkToken :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: update variable name to id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return id.isRetryOf(other.id);
}

boolean newerThan(Work other) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see other comment, I was thinking of this as a WorkId method

boolean knownObsoletedBy(WorkId other) {
return other.cacheToken == this.cacheToken && workToken < other.cacheToken;
}

in that case I think the forwarding Work.isRetryOf methods could be removed and just use id().isRetryOf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workToken < other.workToken?

/**
* A composite key used to identify a unit of {@link Work}. If multiple units of {@link Work} have
* the same workToken AND cacheToken, the {@link Work} is a duplicate. If multiple units of {@link
* Work} have the same workToken, but different cacheTokens, the {@link Work} is a retry.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can document the cacheToken same, workToken different as known obsolete in comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return other.workToken() == workToken() && other.cacheToken() != cacheToken();
}

boolean isForSameWork(WorkId other) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove? seems like accessors can just be called

also we have some weird cases (backend truncation of work item) where the work token can be the same but the work is not entirely the same if the cache token is changed so the method name is perhaps confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@m-trieu m-trieu changed the title check for clientId representing a retry before activating and completing work check for cachetoken representing a retry before activating and completing work Oct 30, 2023
@m-trieu m-trieu force-pushed the mtrieu-client-id branch 2 times, most recently from 7847c40 to 3af9e61 Compare October 30, 2023 21:19
@m-trieu
Copy link
Contributor Author

m-trieu commented Oct 31, 2023

Latest changes don't appear to have been pushed BTW if fixups to PR are kept as separate commits (instead of squashed and force pushing) I think that Github review tool may work better. It seems easy to squash when merging so I don't think there should be concerns about those commits making it to beam/master branch.

sgtm i think i got confused about the recommendations :) will keep them at seperate commits from now on

.map(
work ->
Windmill.KeyedGetDataRequest.newBuilder()
.setKey(shardedKey.key())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can rm the key, sharding key is sufficient and cheaper

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could do separately (and possibly in other places where we make reads or commits) since this is just moved

@@ -83,6 +85,33 @@ static ActiveWorkState forTesting(
return new ActiveWorkState(activeWork, computationStateCache);
}

private static Stream<KeyedGetDataRequest> toKeyedGetDataRequests(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is just moved, but since further from context now how about a better name capturing that this is for heartbeats

makeHeartbeatKeyedGetDataRequests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you missing a push? not seeing this change and others below

.map(
work ->
Windmill.KeyedGetDataRequest.newBuilder()
.setKey(shardedKey.key())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could do separately (and possibly in other places where we make reads or commits) since this is just moved

return ActivateWorkResult.STALE;
}
} else if (queuedWork.id().workToken() == work.id().workToken()) {
if (queuedWork.id().cacheToken() != work.id().cacheToken()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is known true from previous statement in else if chain

I think the case
queuedWork.id().workToken() == work.id().workToken()
could just be removed though and fall through to the bottom. If the cache tokens are different we're not actually sure which is the valid item in the backend. And in that case it is safer to keep both since then we will eventually complete the right one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

} else if (queuedWork.id().cacheToken() == work.id().cacheToken()) {
if (work.id().workToken() > queuedWork.id().workToken()) {
removeIfNotActive(queuedWork, workIterator, workQueue);
workQueue.addLast(work);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could instead continue, it might be able to remove other queued items

for example if all same cache token,
[1 active] [2 queued]
if
[3] arrives
it would currently not remove [1] since it's active but by returning we aren't removing [2]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -147,16 +199,19 @@ private synchronized void removeCompletedWorkFromQueue(
() ->
new IllegalStateException(
String.format(
"Active key %s without work, expected token %d",
shardedKey, workToken)));
"Active key %s without work, expected work_token %d, expected cache_token %d",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log workId instead of separate fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -2725,14 +2725,14 @@ public void testActiveWorkForShardedKeys() throws Exception {

// Verify a different shard of key is a separate queue.
Work m4 = createMockWork(3);
assertFalse(computationState.activateWork(key1Shard1, m4));
assertTrue(computationState.activateWork(key1Shard1, m4));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did this change? it seems like it should be duplicate of m3. Think this might be related to early return I commented on in code.

.build();
}

private static WorkId workDedupeToken(long workToken, long cacheToken) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: name workId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Reminder, please take a look at this pr: @Abacn

Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@Abacn
Copy link
Contributor

Abacn commented Nov 27, 2023

waiting on author

(#29082 (comment))

Copy link
Contributor

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jan 27, 2024
@scwhittle
Copy link
Contributor

@m-trieu Can we rebase and get this in? I still think it would be beneficial, it just fell off the radar.

@github-actions github-actions bot removed the stale label Jan 29, 2024
@m-trieu
Copy link
Contributor Author

m-trieu commented Feb 9, 2024

@m-trieu Can we rebase and get this in? I still think it would be beneficial, it just fell off the radar.

@scwhittle ready for another look Thanks!

}

workQueue.removeIf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't updating the activeworkbudget for the removals
see/share removal logic with the failing for heartbeats

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

workQueue.removeIf(
queuedWork ->
queuedWorkToRemove.contains(queuedWork.id()) && !queuedWork.equals(workQueue.peek()));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't add a comment on the right line because github is stupid, but can you replace FailedTokens below with the new WorkId class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// This set is for adding remove-able WorkItems if they exist in the workQueue. We add them to
// this set since a ConcurrentModificationException will be thrown if we modify the workQueue
// and then resume iteration.
Set<WorkId> queuedWorkToRemove = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of a set and then removeif, how about doing in a single pass with an iterator and Iterator.remove()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@scwhittle scwhittle merged commit 514b03b into apache:master Feb 13, 2024
17 checks passed
@scwhittle scwhittle mentioned this pull request Feb 13, 2024
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants