diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index c80c3a882e528..4607096dd66af 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -338,7 +338,7 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) { ""); writer.println( - ""); + ""); // Use StringBuilder because we are appending in loop. StringBuilder activeWorkStatus = new StringBuilder(); int commitsPendingCount = 0; @@ -364,6 +364,8 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) { activeWorkStatus.append(activeWork.getState()); activeWorkStatus.append("\n"); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java index e77823602eda7..03d1e1ae469a3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java @@ -72,6 +72,7 @@ public final class Work implements RefreshableWork { private final String latencyTrackingId; private TimedState currentState; private volatile boolean isFailed; + private volatile String processingThreadName = ""; private Work( WorkItem workItem, @@ -188,6 +189,14 @@ public void setState(State state) { this.currentState = TimedState.create(state, now); } + public String getProcessingThreadName() { + return processingThreadName; + } + + public void setProcessingThreadName(String processingThreadName) { + this.processingThreadName = processingThreadName; + } + @Override public void setFailed() { this.isFailed = true; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 5e3f293f7d5b6..9286be84ceaa3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -22,8 +22,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; -import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard; @@ -223,18 +221,10 @@ private void executeMonitorHeld(Runnable work, long workBytes) { try { executor.execute( () -> { - String threadName = Thread.currentThread().getName(); try { - if (work instanceof ExecutableWork) { - String workToken = - debugFormattedWorkToken( - ((ExecutableWork) work).work().getWorkItem().getWorkToken()); - Thread.currentThread().setName(threadName + ":" + workToken); - } work.run(); } finally { decrementCounters(workBytes); - Thread.currentThread().setName(threadName); } }); } catch (RuntimeException e) { @@ -244,11 +234,6 @@ private void executeMonitorHeld(Runnable work, long workBytes) { } } - @VisibleForTesting - public static String debugFormattedWorkToken(long workToken) { - return String.format("%016x", workToken); - } - private void decrementCounters(long workBytes) { monitor.enter(); --elementsOutstanding; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 965a29126dc27..9a3e6eb6b099a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -225,6 +225,7 @@ private void processWork(ComputationState computationState, Work work) { Windmill.WorkItem workItem = work.getWorkItem(); String computationId = computationState.getComputationId(); ByteString key = workItem.getKey(); + work.setProcessingThreadName(Thread.currentThread().getName()); work.setState(Work.State.PROCESSING); setUpWorkLoggingContext(work.getLatencyTrackingId(), computationId); LOG.debug("Starting processing for {}:\n{}", computationId, work); @@ -288,6 +289,7 @@ private void processWork(ComputationState computationState, Work work) { } resetWorkLoggingContext(work.getLatencyTrackingId()); + work.setProcessingThreadName(""); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java index ad77958837a12..7349252899202 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java @@ -293,40 +293,4 @@ public void testRenderSummaryHtml() { + "Work Queue Bytes: 0/10000000
/n"; assertEquals(expectedSummaryHtml, executor.summaryHtml()); } - - @Test - public void testExecute_updatesThreadNameForExecutableWork() throws InterruptedException { - CountDownLatch waitForWorkExecution = new CountDownLatch(1); - ExecutableWork executableWork = - createWork( - work -> { - assertTrue( - Thread.currentThread() - .getName() - .contains( - BoundedQueueExecutor.debugFormattedWorkToken( - work.getWorkItem().getWorkToken()))); - waitForWorkExecution.countDown(); - }); - executor.execute(executableWork, executableWork.getWorkItem().getSerializedSize()); - waitForWorkExecution.await(); - } - - @Test - public void testForceExecute_updatesThreadNameForExecutableWork() throws InterruptedException { - CountDownLatch waitForWorkExecution = new CountDownLatch(1); - ExecutableWork executableWork = - createWork( - work -> { - assertTrue( - Thread.currentThread() - .getName() - .contains( - BoundedQueueExecutor.debugFormattedWorkToken( - work.getWorkItem().getWorkToken()))); - waitForWorkExecution.countDown(); - }); - executor.forceExecute(executableWork, executableWork.getWorkItem().getSerializedSize()); - waitForWorkExecution.await(); - } }
KeyTokenQueuedActive ForStateState Active For
KeyTokenQueuedActive ForStateState Active ForProcessing Thread
"); activeWorkStatus.append(elapsedString(activeWork.getStateStartTime(), now)); + activeWorkStatus.append(""); + activeWorkStatus.append(activeWork.getProcessingThreadName()); activeWorkStatus.append("