Skip to content

Commit

Permalink
check for clientId representing a retry before activating and complet…
Browse files Browse the repository at this point in the history
…ing work
  • Loading branch information
m-trieu committed Oct 20, 2023
1 parent 68e9c99 commit 47dc32a
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.Work.State;
import org.apache.beam.runners.dataflow.worker.streaming.WorkDedupeKey;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
Expand Down Expand Up @@ -1260,7 +1261,7 @@ public void close() {
// Consider the item invalid. It will eventually be retried by Windmill if it still needs to
// be processed.
computationState.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(key, workItem.getShardingKey()), workItem.getWorkToken());
ShardedKey.create(key, workItem.getShardingKey()), WorkDedupeKey.of(workItem));
}
} finally {
// Update total processing time counters. Updating in finally clause ensures that
Expand Down Expand Up @@ -1337,7 +1338,10 @@ private void commitLoop() {
for (Windmill.WorkItemCommitRequest workRequest : entry.getValue().getRequestsList()) {
computationState.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(workRequest.getKey(), workRequest.getShardingKey()),
workRequest.getWorkToken());
WorkDedupeKey.builder()
.setCacheToken(workRequest.getCacheToken())
.setWorkToken(workRequest.getWorkToken())
.build());
}
}
}
Expand Down Expand Up @@ -1368,7 +1372,10 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream)
// was deemed stuck.
state.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(request.getKey(), request.getShardingKey()),
request.getWorkToken());
WorkDedupeKey.builder()
.setCacheToken(request.getCacheToken())
.setWorkToken(request.getWorkToken())
.build());
})) {
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
Expand Down Expand Up @@ -83,6 +84,29 @@ static ActiveWorkState forTesting(
return new ActiveWorkState(activeWork, computationStateCache);
}

private static Stream<KeyedGetDataRequest> toKeyedGetDataRequestStream(
Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue, Instant refreshDeadline) {
ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey();
Deque<Work> workQueue = shardedKeyAndWorkQueue.getValue();

return workQueue.stream()
.filter(work -> work.getStartTime().isBefore(refreshDeadline))
.map(
work ->
Windmill.KeyedGetDataRequest.newBuilder()
.setKey(shardedKey.key())
.setShardingKey(shardedKey.shardingKey())
.setWorkToken(work.getWorkItem().getWorkToken())
.addAllLatencyAttribution(work.getLatencyAttributions())
.build());
}

private static String elapsedString(Instant start, Instant end) {
Duration activeFor = new Duration(start, end);
// Duration's toString always starts with "PT"; remove that here.
return activeFor.toString().substring(2);
}

/**
* Activates {@link Work} for the {@link ShardedKey}. Outcome can be 1 of 3 {@link
* ActivateWorkResult}
Expand All @@ -108,9 +132,19 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w
}

// Ensure we don't already have this work token queued.
for (Work queuedWork : workQueue) {
Iterator<Work> workQueueIterator = workQueue.iterator();
while (workQueueIterator.hasNext()) {
Work queuedWork = workQueueIterator.next();
if (queuedWork.getWorkItem().getWorkToken() == work.getWorkItem().getWorkToken()) {
return ActivateWorkResult.DUPLICATE;
// Work is a duplicate.
if (queuedWork.getWorkItem().getCacheToken() == work.getWorkItem().getCacheToken()) {
return ActivateWorkResult.DUPLICATE;
} else {
// Work is a retry. Remove the older work, and queue the most current retry.
workQueueIterator.remove();
workQueue.addLast(work);
return ActivateWorkResult.QUEUED;
}
}
}

Expand All @@ -126,19 +160,20 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w
* #activeWork}.
*/
synchronized Optional<Work> completeWorkAndGetNextWorkForKey(
ShardedKey shardedKey, long workToken) {
ShardedKey shardedKey, WorkDedupeKey workDedupeKey) {
@Nullable Queue<Work> workQueue = activeWork.get(shardedKey);
if (workQueue == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workToken);
LOG.warn(
"Unable to complete inactive work for key {} and token {}.", shardedKey, workDedupeKey);
return Optional.empty();
}
removeCompletedWorkFromQueue(workQueue, shardedKey, workToken);
removeCompletedWorkFromQueue(workQueue, shardedKey, workDedupeKey);
return getNextWork(workQueue, shardedKey);
}

private synchronized void removeCompletedWorkFromQueue(
Queue<Work> workQueue, ShardedKey shardedKey, long workToken) {
Queue<Work> workQueue, ShardedKey shardedKey, WorkDedupeKey workDedupeKey) {
// avoid Preconditions.checkState here to prevent eagerly evaluating the
// format string parameters for the error message.
Work completedWork =
Expand All @@ -147,16 +182,22 @@ private synchronized void removeCompletedWorkFromQueue(
() ->
new IllegalStateException(
String.format(
"Active key %s without work, expected token %d",
shardedKey, workToken)));
"Active key %s without work, expected work_token %d, expected cache_token %d",
shardedKey, workDedupeKey.workToken(), workDedupeKey.cacheToken())));

if (completedWork.getWorkItem().getWorkToken() != workToken) {
if (completedWork.getWorkItem().getWorkToken() != workDedupeKey.workToken()
|| completedWork.isRetryOf(workDedupeKey)) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn(
"Unable to complete due to token mismatch for key {} and token {}, actual token was {}.",
"Unable to complete due to token mismatch for "
+ "key {}, work_token {}, and cache_token {}; "
+ "actual work_token was {},"
+ "actual cache_token was {}.",
shardedKey,
workToken,
completedWork.getWorkItem().getWorkToken());
workDedupeKey.workToken(),
workDedupeKey.cacheToken(),
completedWork.getWorkItem().getWorkToken(),
completedWork.getWorkItem().getCacheToken());
return;
}

Expand All @@ -178,21 +219,22 @@ private synchronized Optional<Work> getNextWork(Queue<Work> workQueue, ShardedKe
* before the stuckCommitDeadline.
*/
synchronized void invalidateStuckCommits(
Instant stuckCommitDeadline, BiConsumer<ShardedKey, Long> shardedKeyAndWorkTokenConsumer) {
for (Entry<ShardedKey, Long> shardedKeyAndWorkToken :
Instant stuckCommitDeadline,
BiConsumer<ShardedKey, WorkDedupeKey> shardedKeyAndWorkTokenConsumer) {
for (Entry<ShardedKey, WorkDedupeKey> shardedKeyAndWorkToken :
getStuckCommitsAt(stuckCommitDeadline).entrySet()) {
ShardedKey shardedKey = shardedKeyAndWorkToken.getKey();
long workToken = shardedKeyAndWorkToken.getValue();
WorkDedupeKey workDedupeKey = shardedKeyAndWorkToken.getValue();
computationStateCache.invalidate(shardedKey.key(), shardedKey.shardingKey());
shardedKeyAndWorkTokenConsumer.accept(shardedKey, workToken);
shardedKeyAndWorkTokenConsumer.accept(shardedKey, workDedupeKey);
}
}

private synchronized ImmutableMap<ShardedKey, Long> getStuckCommitsAt(
private synchronized ImmutableMap<ShardedKey, WorkDedupeKey> getStuckCommitsAt(
Instant stuckCommitDeadline) {
// Determine the stuck commit keys but complete them outside the loop iterating over
// activeWork as completeWork may delete the entry from activeWork.
ImmutableMap.Builder<ShardedKey, Long> stuckCommits = ImmutableMap.builder();
ImmutableMap.Builder<ShardedKey, WorkDedupeKey> stuckCommits = ImmutableMap.builder();
for (Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) {
ShardedKey shardedKey = entry.getKey();
@Nullable Work work = entry.getValue().peek();
Expand All @@ -202,7 +244,7 @@ private synchronized ImmutableMap<ShardedKey, Long> getStuckCommitsAt(
"Detected key {} stuck in COMMITTING state since {}, completing it with error.",
shardedKey,
work.getStateStartTime());
stuckCommits.put(shardedKey, work.getWorkItem().getWorkToken());
stuckCommits.put(shardedKey, WorkDedupeKey.of(work.getWorkItem()));
}
}
}
Expand All @@ -216,23 +258,6 @@ synchronized ImmutableList<KeyedGetDataRequest> getKeysToRefresh(Instant refresh
.collect(toImmutableList());
}

private static Stream<KeyedGetDataRequest> toKeyedGetDataRequestStream(
Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue, Instant refreshDeadline) {
ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey();
Deque<Work> workQueue = shardedKeyAndWorkQueue.getValue();

return workQueue.stream()
.filter(work -> work.getStartTime().isBefore(refreshDeadline))
.map(
work ->
Windmill.KeyedGetDataRequest.newBuilder()
.setKey(shardedKey.key())
.setShardingKey(shardedKey.shardingKey())
.setWorkToken(work.getWorkItem().getWorkToken())
.addAllLatencyAttribution(work.getLatencyAttributions())
.build());
}

synchronized void printActiveWork(PrintWriter writer, Instant now) {
writer.println(
"<table border=\"1\" "
Expand Down Expand Up @@ -278,12 +303,6 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
}
}

private static String elapsedString(Instant start, Instant end) {
Duration activeFor = new Duration(start, end);
// Duration's toString always starts with "PT"; remove that here.
return activeFor.toString().substring(2);
}

enum ActivateWorkResult {
QUEUED,
EXECUTE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ public boolean activateWork(ShardedKey shardedKey, Work work) {
/**
* Marks the work for the given shardedKey as complete. Schedules queued work for the key if any.
*/
public void completeWorkAndScheduleNextWorkForKey(ShardedKey shardedKey, long workToken) {
public void completeWorkAndScheduleNextWorkForKey(
ShardedKey shardedKey, WorkDedupeKey workDedupeKey) {
activeWorkState
.completeWorkAndGetNextWorkForKey(shardedKey, workToken)
.completeWorkAndGetNextWorkForKey(shardedKey, workDedupeKey)
.ifPresent(this::forceExecute);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ boolean isStuckCommittingAt(Instant stuckCommitDeadline) {
&& currentState.startTime().isBefore(stuckCommitDeadline);
}

boolean isRetryOf(WorkDedupeKey workDedupeKey) {
return workItem.getWorkToken() == workDedupeKey.workToken()
&& workItem.getCacheToken() != workDedupeKey.cacheToken();
}

public enum State {
QUEUED(Windmill.LatencyAttribution.State.QUEUED),
PROCESSING(Windmill.LatencyAttribution.State.ACTIVE),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.streaming;

import com.google.auto.value.AutoValue;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;

/**
* A composite key used to see if a unit of {@link Work} is a duplicate. If multiple units of {@link
* Work} have the same workToken AND cacheToken, the {@link Work} is a duplicate. If multiple units
* of {@link Work} have the same workToken, but different cacheTokens, the {@link Work} is a retry.
*/
@AutoValue
public abstract class WorkDedupeKey {
public static WorkDedupeKey of(WorkItem workItem) {
return builder()
.setCacheToken(workItem.getCacheToken())
.setWorkToken(workItem.getWorkToken())
.build();
}

public static WorkDedupeKey of(Work work) {
return of(work.getWorkItem());
}

public static Builder builder() {
return new AutoValue_WorkDedupeKey.Builder();
}

abstract long cacheToken();

abstract long workToken();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setCacheToken(long value);

public abstract Builder setWorkToken(long value);

public abstract WorkDedupeKey build();
}
}
Loading

0 comments on commit 47dc32a

Please sign in to comment.