Skip to content

Commit

Permalink
add ActiveWorkState
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Sep 26, 2023
1 parent 6e2aba8 commit 175900c
Show file tree
Hide file tree
Showing 11 changed files with 716 additions and 298 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,16 @@ private static void sleep(int millis) {
Uninterruptibles.sleepUninterruptibly(millis, TimeUnit.MILLISECONDS);
}

/** Sets the stage name and workId of the current Thread for logging. */
private static void setUpWorkLoggingContext(Windmill.WorkItem workItem, String computationId) {
String workIdBuilder =
Long.toHexString(workItem.getShardingKey())
+ '-'
+ Long.toHexString(workItem.getWorkToken());
DataflowWorkerLoggingMDC.setWorkId(workIdBuilder);
DataflowWorkerLoggingMDC.setStageName(computationId);
}

private int chooseMaximumNumberOfThreads() {
if (options.getNumberOfWorkerHarnessThreads() != 0) {
return options.getNumberOfWorkerHarnessThreads();
Expand Down Expand Up @@ -898,16 +908,6 @@ private Windmill.WorkItemCommitRequest.Builder initializeOutputBuilder(
.setCacheToken(workItem.getCacheToken());
}

/** Sets the stage name and workId of the current Thread for logging. */
private static void setUpWorkLoggingContext(Windmill.WorkItem workItem, String computationId) {
String workIdBuilder =
Long.toHexString(workItem.getShardingKey())
+ '-'
+ Long.toHexString(workItem.getWorkToken());
DataflowWorkerLoggingMDC.setWorkId(workIdBuilder);
DataflowWorkerLoggingMDC.setStageName(computationId);
}

private void process(
final ComputationState computationState,
final Instant inputDataWatermark,
Expand Down Expand Up @@ -941,7 +941,7 @@ private void process(

StageInfo stageInfo =
stageInfoMap.computeIfAbsent(
mapTask.getStageName(), s -> new StageInfo(s, mapTask.getSystemName(), this));
mapTask.getStageName(), s -> StageInfo.create(s, mapTask.getSystemName(), this));

ExecutionState executionState = null;
String counterName = "dataflow_source_bytes_processed-" + mapTask.getSystemName();
Expand All @@ -967,13 +967,13 @@ private void process(
new DataflowExecutionContext.DataflowExecutionStateTracker(
ExecutionStateSampler.instance(),
stageInfo
.getExecutionStateRegistry()
.executionStateRegistry()
.getState(
NameContext.forStage(mapTask.getStageName()),
"other",
null,
ScopedProfiler.INSTANCE.emptyScope()),
stageInfo.getDeltaCounters(),
stageInfo.deltaCounters(),
options,
computationId);
StreamingModeExecutionContext context =
Expand All @@ -985,9 +985,9 @@ private void process(
? computationState.getTransformUserNameToStateFamily()
: stateNameMap,
stateCache.forComputation(computationId),
stageInfo.getMetricsContainerRegistry(),
stageInfo.metricsContainerRegistry(),
executionStateTracker,
stageInfo.getExecutionStateRegistry(),
stageInfo.executionStateRegistry(),
maxSinkBytes);
DataflowMapTaskExecutor mapTaskExecutor =
mapTaskExecutorFactory.create(
Expand Down Expand Up @@ -1245,23 +1245,23 @@ public void close() {
} else {
// Consider the item invalid. It will eventually be retried by Windmill if it still needs to
// be processed.
computationState.completeWorkAndScheduleNextWork(
computationState.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(key, workItem.getShardingKey()), workItem.getWorkToken());
}
} finally {
// Update total processing time counters. Updating in finally clause ensures that
// work items causing exceptions are also accounted in time spent.
long processingTimeMsecs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - processingStartTimeNanos);
stageInfo.getTotalProcessingMsecs().addValue(processingTimeMsecs);
stageInfo.totalProcessingMsecs().addValue(processingTimeMsecs);

// Attribute all the processing to timers if the work item contains any timers.
// Tests show that work items rarely contain both timers and message bundles. It should
// be a fairly close approximation.
// Another option: Derive time split between messages and timers based on recent totals.
// either here or in DFE.
if (work.getWorkItem().hasTimers()) {
stageInfo.getTimerProcessingMsecs().addValue(processingTimeMsecs);
stageInfo.timerProcessingMsecs().addValue(processingTimeMsecs);
}

DataflowWorkerLoggingMDC.setWorkId(null);
Expand Down Expand Up @@ -1321,7 +1321,7 @@ private void commitLoop() {
computationRequestMap.entrySet()) {
ComputationState computationState = entry.getKey();
for (Windmill.WorkItemCommitRequest workRequest : entry.getValue().getRequestsList()) {
computationState.completeWorkAndScheduleNextWork(
computationState.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(workRequest.getKey(), workRequest.getShardingKey()),
workRequest.getWorkToken());
}
Expand Down Expand Up @@ -1352,7 +1352,7 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream)
activeCommitBytes.addAndGet(-size);
// This may throw an exception if the commit was not active, which is possible if it
// was deemed stuck.
state.completeWorkAndScheduleNextWork(
state.completeWorkAndScheduleNextWorkForKey(
ShardedKey.create(request.getKey(), request.getShardingKey()),
request.getWorkToken());
})) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static NameContext forStage(String stageName) {
}

/** Returns the name of the stage this instruction is executing in. */
public abstract @Nullable String stageName();
public abstract String stageName();

/**
* Returns the "original" name of this instruction. This name is a short name assigned by the SDK
Expand Down
Loading

0 comments on commit 175900c

Please sign in to comment.