Skip to content

Commit

Permalink
check for cachetoken representing a retry before activating and compl…
Browse files Browse the repository at this point in the history
…eting work (#29082)
  • Loading branch information
m-trieu authored Feb 13, 2024
1 parent b923a67 commit 514b03b
Show file tree
Hide file tree
Showing 7 changed files with 418 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import org.apache.beam.runners.dataflow.worker.status.LastExceptionDataProvider;
import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
import org.apache.beam.runners.dataflow.worker.streaming.ActiveWorkState.FailedTokens;
import org.apache.beam.runners.dataflow.worker.streaming.Commit;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutionState;
Expand All @@ -97,13 +96,15 @@
import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.Work.State;
import org.apache.beam.runners.dataflow.worker.streaming.WorkId;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationHeartbeatResponse;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
Expand Down Expand Up @@ -1311,7 +1312,7 @@ public void close() {
// Consider the item invalid. It will eventually be retried by Windmill if it still needs to
// be processed.
computationState.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(key, workItem.getShardingKey()), workItem.getWorkToken());
ShardedKey.create(key, workItem.getShardingKey()), work.id());
}
} finally {
// Update total processing time counters. Updating in finally clause ensures that
Expand Down Expand Up @@ -1389,7 +1390,10 @@ private void commitLoop() {
for (Windmill.WorkItemCommitRequest workRequest : entry.getValue().getRequestsList()) {
computationState.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(workRequest.getKey(), workRequest.getShardingKey()),
workRequest.getWorkToken());
WorkId.builder()
.setCacheToken(workRequest.getCacheToken())
.setWorkToken(workRequest.getWorkToken())
.build());
}
}
}
Expand All @@ -1409,7 +1413,11 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream)
.forComputation(state.getComputationId())
.invalidate(request.getKey(), request.getShardingKey());
state.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken());
ShardedKey.create(request.getKey(), request.getShardingKey()),
WorkId.builder()
.setWorkToken(request.getWorkToken())
.setCacheToken(request.getCacheToken())
.build());
return true;
}

Expand All @@ -1431,7 +1439,10 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream)
activeCommitBytes.addAndGet(-size);
state.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(request.getKey(), request.getShardingKey()),
request.getWorkToken());
WorkId.builder()
.setCacheToken(request.getCacheToken())
.setWorkToken(request.getWorkToken())
.build());
})) {
return true;
} else {
Expand Down Expand Up @@ -1963,20 +1974,19 @@ private void sendWorkerUpdatesToDataflowService(
}
}

public void handleHeartbeatResponses(List<Windmill.ComputationHeartbeatResponse> responses) {
for (Windmill.ComputationHeartbeatResponse computationHeartbeatResponse : responses) {
public void handleHeartbeatResponses(List<ComputationHeartbeatResponse> responses) {
for (ComputationHeartbeatResponse computationHeartbeatResponse : responses) {
// Maps sharding key to (work token, cache token) for work that should be marked failed.
Map<Long, List<FailedTokens>> failedWork = new HashMap<>();
Multimap<Long, WorkId> failedWork = ArrayListMultimap.create();
for (Windmill.HeartbeatResponse heartbeatResponse :
computationHeartbeatResponse.getHeartbeatResponsesList()) {
if (heartbeatResponse.getFailed()) {
failedWork
.computeIfAbsent(heartbeatResponse.getShardingKey(), key -> new ArrayList<>())
.add(
FailedTokens.newBuilder()
.setWorkToken(heartbeatResponse.getWorkToken())
.setCacheToken(heartbeatResponse.getCacheToken())
.build());
failedWork.put(
heartbeatResponse.getShardingKey(),
WorkId.builder()
.setWorkToken(heartbeatResponse.getWorkToken())
.setCacheToken(heartbeatResponse.getCacheToken())
.build());
}
}
ComputationState state = computationMap.get(computationHeartbeatResponse.getComputationId());
Expand Down
Loading

0 comments on commit 514b03b

Please sign in to comment.