Skip to content

Commit

Permalink
[Dataflow Streaming] Remove call to Thread.setName and track thread n…
Browse files Browse the repository at this point in the history
…ame inside Work. (#32715)

Thread.setName is expensive and uses upto 4% cpu on jobs with many keys.
  • Loading branch information
arunpandianp authored Oct 9, 2024
1 parent 1479362 commit 42cad40
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
"<table border=\"1\" "
+ "style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">");
writer.println(
"<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active For</th><th>State</th><th>State Active For</th></tr>");
"<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active For</th><th>State</th><th>State Active For</th><th>Processing Thread</th></tr>");
// Use StringBuilder because we are appending in loop.
StringBuilder activeWorkStatus = new StringBuilder();
int commitsPendingCount = 0;
Expand All @@ -364,6 +364,8 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
activeWorkStatus.append(activeWork.getState());
activeWorkStatus.append("</td><td>");
activeWorkStatus.append(elapsedString(activeWork.getStateStartTime(), now));
activeWorkStatus.append("</td><td>");
activeWorkStatus.append(activeWork.getProcessingThreadName());
activeWorkStatus.append("</td></tr>\n");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public final class Work implements RefreshableWork {
private final String latencyTrackingId;
private TimedState currentState;
private volatile boolean isFailed;
private volatile String processingThreadName = "";

private Work(
WorkItem workItem,
Expand Down Expand Up @@ -188,6 +189,14 @@ public void setState(State state) {
this.currentState = TimedState.create(state, now);
}

public String getProcessingThreadName() {
return processingThreadName;
}

public void setProcessingThreadName(String processingThreadName) {
this.processingThreadName = processingThreadName;
}

@Override
public void setFailed() {
this.isFailed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;

Expand Down Expand Up @@ -223,18 +221,10 @@ private void executeMonitorHeld(Runnable work, long workBytes) {
try {
executor.execute(
() -> {
String threadName = Thread.currentThread().getName();
try {
if (work instanceof ExecutableWork) {
String workToken =
debugFormattedWorkToken(
((ExecutableWork) work).work().getWorkItem().getWorkToken());
Thread.currentThread().setName(threadName + ":" + workToken);
}
work.run();
} finally {
decrementCounters(workBytes);
Thread.currentThread().setName(threadName);
}
});
} catch (RuntimeException e) {
Expand All @@ -244,11 +234,6 @@ private void executeMonitorHeld(Runnable work, long workBytes) {
}
}

@VisibleForTesting
public static String debugFormattedWorkToken(long workToken) {
return String.format("%016x", workToken);
}

private void decrementCounters(long workBytes) {
monitor.enter();
--elementsOutstanding;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ private void processWork(ComputationState computationState, Work work) {
Windmill.WorkItem workItem = work.getWorkItem();
String computationId = computationState.getComputationId();
ByteString key = workItem.getKey();
work.setProcessingThreadName(Thread.currentThread().getName());
work.setState(Work.State.PROCESSING);
setUpWorkLoggingContext(work.getLatencyTrackingId(), computationId);
LOG.debug("Starting processing for {}:\n{}", computationId, work);
Expand Down Expand Up @@ -288,6 +289,7 @@ private void processWork(ComputationState computationState, Work work) {
}

resetWorkLoggingContext(work.getLatencyTrackingId());
work.setProcessingThreadName("");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,40 +293,4 @@ public void testRenderSummaryHtml() {
+ "Work Queue Bytes: 0/10000000<br>/n";
assertEquals(expectedSummaryHtml, executor.summaryHtml());
}

@Test
public void testExecute_updatesThreadNameForExecutableWork() throws InterruptedException {
CountDownLatch waitForWorkExecution = new CountDownLatch(1);
ExecutableWork executableWork =
createWork(
work -> {
assertTrue(
Thread.currentThread()
.getName()
.contains(
BoundedQueueExecutor.debugFormattedWorkToken(
work.getWorkItem().getWorkToken())));
waitForWorkExecution.countDown();
});
executor.execute(executableWork, executableWork.getWorkItem().getSerializedSize());
waitForWorkExecution.await();
}

@Test
public void testForceExecute_updatesThreadNameForExecutableWork() throws InterruptedException {
CountDownLatch waitForWorkExecution = new CountDownLatch(1);
ExecutableWork executableWork =
createWork(
work -> {
assertTrue(
Thread.currentThread()
.getName()
.contains(
BoundedQueueExecutor.debugFormattedWorkToken(
work.getWorkItem().getWorkToken())));
waitForWorkExecution.countDown();
});
executor.forceExecute(executableWork, executableWork.getWorkItem().getSerializedSize());
waitForWorkExecution.await();
}
}

0 comments on commit 42cad40

Please sign in to comment.