From 21741c8e8282ea34d34a1b193a57320a4e749d28 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Wed, 9 Oct 2024 01:19:02 -0700 Subject: [PATCH 1/2] Remove call to Thread.setName and track thread name inside Work. Thread.setName is expensive and uses upto 4% cpu on jobs with many keys. --- .../worker/streaming/ActiveWorkState.java | 4 ++- .../dataflow/worker/streaming/Work.java | 9 +++++ .../worker/util/BoundedQueueExecutor.java | 15 -------- .../processing/StreamingWorkScheduler.java | 1 + .../worker/util/BoundedQueueExecutorTest.java | 36 ------------------- 5 files changed, 13 insertions(+), 52 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index c80c3a882e528..4607096dd66af 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -338,7 +338,7 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) { ""); writer.println( - ""); + ""); // Use StringBuilder because we are appending in loop. StringBuilder activeWorkStatus = new StringBuilder(); int commitsPendingCount = 0; @@ -364,6 +364,8 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) { activeWorkStatus.append(activeWork.getState()); activeWorkStatus.append("\n"); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java index e77823602eda7..130ed3a400566 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java @@ -72,6 +72,7 @@ public final class Work implements RefreshableWork { private final String latencyTrackingId; private TimedState currentState; private volatile boolean isFailed; + private volatile String processingThreadName = "not set"; private Work( WorkItem workItem, @@ -188,6 +189,14 @@ public void setState(State state) { this.currentState = TimedState.create(state, now); } + public String getProcessingThreadName() { + return processingThreadName; + } + + public void setProcessingThreadName(String processingThreadName) { + this.processingThreadName = processingThreadName; + } + @Override public void setFailed() { this.isFailed = true; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 5e3f293f7d5b6..9286be84ceaa3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -22,8 +22,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; -import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard; @@ -223,18 +221,10 @@ private void executeMonitorHeld(Runnable work, long workBytes) { try { executor.execute( () -> { - String threadName = Thread.currentThread().getName(); try { - if (work instanceof ExecutableWork) { - String workToken = - debugFormattedWorkToken( - ((ExecutableWork) work).work().getWorkItem().getWorkToken()); - Thread.currentThread().setName(threadName + ":" + workToken); - } work.run(); } finally { decrementCounters(workBytes); - Thread.currentThread().setName(threadName); } }); } catch (RuntimeException e) { @@ -244,11 +234,6 @@ private void executeMonitorHeld(Runnable work, long workBytes) { } } - @VisibleForTesting - public static String debugFormattedWorkToken(long workToken) { - return String.format("%016x", workToken); - } - private void decrementCounters(long workBytes) { monitor.enter(); --elementsOutstanding; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 965a29126dc27..099b9994bca91 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -225,6 +225,7 @@ private void processWork(ComputationState computationState, Work work) { Windmill.WorkItem workItem = work.getWorkItem(); String computationId = computationState.getComputationId(); ByteString key = workItem.getKey(); + work.setProcessingThreadName(Thread.currentThread().getName()); work.setState(Work.State.PROCESSING); setUpWorkLoggingContext(work.getLatencyTrackingId(), computationId); LOG.debug("Starting processing for {}:\n{}", computationId, work); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java index ad77958837a12..7349252899202 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java @@ -293,40 +293,4 @@ public void testRenderSummaryHtml() { + "Work Queue Bytes: 0/10000000
/n"; assertEquals(expectedSummaryHtml, executor.summaryHtml()); } - - @Test - public void testExecute_updatesThreadNameForExecutableWork() throws InterruptedException { - CountDownLatch waitForWorkExecution = new CountDownLatch(1); - ExecutableWork executableWork = - createWork( - work -> { - assertTrue( - Thread.currentThread() - .getName() - .contains( - BoundedQueueExecutor.debugFormattedWorkToken( - work.getWorkItem().getWorkToken()))); - waitForWorkExecution.countDown(); - }); - executor.execute(executableWork, executableWork.getWorkItem().getSerializedSize()); - waitForWorkExecution.await(); - } - - @Test - public void testForceExecute_updatesThreadNameForExecutableWork() throws InterruptedException { - CountDownLatch waitForWorkExecution = new CountDownLatch(1); - ExecutableWork executableWork = - createWork( - work -> { - assertTrue( - Thread.currentThread() - .getName() - .contains( - BoundedQueueExecutor.debugFormattedWorkToken( - work.getWorkItem().getWorkToken()))); - waitForWorkExecution.countDown(); - }); - executor.forceExecute(executableWork, executableWork.getWorkItem().getSerializedSize()); - waitForWorkExecution.await(); - } } From 53cefdadd953bdd6cfd21a2f63296c4cd5a9b927 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Wed, 9 Oct 2024 03:06:26 -0700 Subject: [PATCH 2/2] clear thread name at end of processWork --- .../org/apache/beam/runners/dataflow/worker/streaming/Work.java | 2 +- .../worker/windmill/work/processing/StreamingWorkScheduler.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java index 130ed3a400566..03d1e1ae469a3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java @@ -72,7 +72,7 @@ public final class Work implements RefreshableWork { private final String latencyTrackingId; private TimedState currentState; private volatile boolean isFailed; - private volatile String processingThreadName = "not set"; + private volatile String processingThreadName = ""; private Work( WorkItem workItem, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 099b9994bca91..9a3e6eb6b099a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -289,6 +289,7 @@ private void processWork(ComputationState computationState, Work work) { } resetWorkLoggingContext(work.getLatencyTrackingId()); + work.setProcessingThreadName(""); } }
KeyTokenQueuedActive ForStateState Active For
KeyTokenQueuedActive ForStateState Active ForProcessing Thread
"); activeWorkStatus.append(elapsedString(activeWork.getStateStartTime(), now)); + activeWorkStatus.append(""); + activeWorkStatus.append(activeWork.getProcessingThreadName()); activeWorkStatus.append("