Skip to content

Commit

Permalink
Worker message plumbing (apache#29879)
Browse files Browse the repository at this point in the history
* use StreamingScalingReport for autoscaling signals

* add unit test stub

* spotless apply

* comment test stub

* add unit test

* simplify response processing

* spotless apply

* add more reported metrics

* remove byte metrics

* fix bug

* fix DataflowWorkUnitClient test

* fix DataflowWorkUnitClient test

* formatting

* add check for scheduledtimer

* option to options

* fix timer check

* revert long to int change

* refactor timers

* var type fix

* fix timers refactoring

* use arraylist instead of map

* add timer to list

* fix comment

---------

Co-authored-by: scwhittle <[email protected]>
  • Loading branch information
edman124 and scwhittle authored Jan 18, 2024
1 parent 79b9de2 commit e867ed7
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20230812-$google_clients_version",
// Keep version consistent with the version in google_cloud_resourcemanager, managed by google_cloud_platform_libraries_bom
google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20230806-$google_clients_version",
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20220920-$google_clients_version",
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20231203-$google_clients_version",
google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240110-$google_clients_version",
google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
// Keep version consistent with the version in google_cloud_nio, managed by google_cloud_platform_libraries_bom
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@
import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
import com.google.api.services.dataflow.model.ReportWorkItemStatusRequest;
import com.google.api.services.dataflow.model.ReportWorkItemStatusResponse;
import com.google.api.services.dataflow.model.SendWorkerMessagesRequest;
import com.google.api.services.dataflow.model.SendWorkerMessagesResponse;
import com.google.api.services.dataflow.model.StreamingScalingReport;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
import com.google.api.services.dataflow.model.WorkItemStatus;
import com.google.api.services.dataflow.model.WorkerMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
Expand All @@ -48,6 +53,7 @@
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -269,4 +275,40 @@ public WorkItemServiceState reportWorkItemStatus(WorkItemStatus workItemStatus)
logger.debug("ReportWorkItemStatus result: {}", state);
return state;
}

/** Creates WorkerMessage from StreamingScalingReport */
@Override
public WorkerMessage createWorkerMessageFromStreamingScalingReport(
StreamingScalingReport report) {
DateTime endTime = DateTime.now();
logger.debug("Reporting WorkMessageResponse");
Map<String, String> labels =
ImmutableMap.of("JOB_ID", options.getJobId(), "WORKER_ID", options.getWorkerId());
WorkerMessage msg =
new WorkerMessage()
.setTime(toCloudTime(endTime))
.setStreamingScalingReport(report)
.setLabels(labels);
return msg;
}

/** Reports the autoscaling signals to dataflow */
@Override
public void reportWorkerMessage(WorkerMessage msg) throws IOException {
SendWorkerMessagesRequest request =
new SendWorkerMessagesRequest()
.setLocation(options.getRegion())
.setWorkerMessages(Collections.singletonList(msg));
SendWorkerMessagesResponse result =
dataflow
.projects()
.locations()
.workerMessages(options.getProject(), options.getRegion(), request)
.execute();
if (result == null) {
logger.warn("Worker Message response is null");
throw new IOException("Got null Worker Message response");
}
// Currently no response is expected
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.api.services.dataflow.model.Status;
import com.google.api.services.dataflow.model.StreamingComputationConfig;
import com.google.api.services.dataflow.model.StreamingConfigTask;
import com.google.api.services.dataflow.model.StreamingScalingReport;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemStatus;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand Down Expand Up @@ -276,13 +277,12 @@ public class StreamingDataflowWorker {
private final HotKeyLogger hotKeyLogger;
// Periodic sender of debug information to the debug capture service.
private final DebugCapture.@Nullable Manager debugCaptureManager;
private ScheduledExecutorService refreshWorkTimer;
private ScheduledExecutorService statusPageTimer;
private ScheduledExecutorService globalWorkerUpdatesTimer;
// Collection of ScheduledExecutorServices that are running periodic functions.
private ArrayList<ScheduledExecutorService> scheduledExecutors =
new ArrayList<ScheduledExecutorService>();
private int retryLocallyDelayMs = 10000;
// Periodically fires a global config request to dataflow service. Only used when windmill service
// is enabled.
private ScheduledExecutorService globalConfigRefreshTimer;
// Possibly overridden by streaming engine config.
private int maxWorkItemCommitBytes = Integer.MAX_VALUE;

Expand Down Expand Up @@ -579,14 +579,25 @@ public void start() {
sampler.start();

// Periodically report workers counters and other updates.
globalWorkerUpdatesTimer = executorSupplier.apply("GlobalWorkerUpdatesTimer");
globalWorkerUpdatesTimer.scheduleWithFixedDelay(
ScheduledExecutorService workerUpdateTimer = executorSupplier.apply("GlobalWorkerUpdates");
workerUpdateTimer.scheduleWithFixedDelay(
this::reportPeriodicWorkerUpdates,
0,
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
TimeUnit.MILLISECONDS);
scheduledExecutors.add(workerUpdateTimer);

ScheduledExecutorService workerMessageTimer = executorSupplier.apply("ReportWorkerMessage");
if (options.getWindmillHarnessUpdateReportingPeriod().getMillis() > 0) {
workerMessageTimer.scheduleWithFixedDelay(
this::reportPeriodicWorkerMessage,
0,
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
TimeUnit.MILLISECONDS);
scheduledExecutors.add(workerMessageTimer);
}

refreshWorkTimer = executorSupplier.apply("RefreshWork");
ScheduledExecutorService refreshWorkTimer = executorSupplier.apply("RefreshWork");
if (options.getActiveWorkRefreshPeriodMillis() > 0) {
refreshWorkTimer.scheduleWithFixedDelay(
new Runnable() {
Expand All @@ -602,15 +613,17 @@ public void run() {
options.getActiveWorkRefreshPeriodMillis(),
options.getActiveWorkRefreshPeriodMillis(),
TimeUnit.MILLISECONDS);
scheduledExecutors.add(refreshWorkTimer);
}
if (windmillServiceEnabled && options.getStuckCommitDurationMillis() > 0) {
int periodMillis = Math.max(options.getStuckCommitDurationMillis() / 10, 100);
refreshWorkTimer.scheduleWithFixedDelay(
this::invalidateStuckCommits, periodMillis, periodMillis, TimeUnit.MILLISECONDS);
scheduledExecutors.add(refreshWorkTimer);
}

if (options.getPeriodicStatusPageOutputDirectory() != null) {
statusPageTimer = executorSupplier.apply("DumpStatusPages");
ScheduledExecutorService statusPageTimer = executorSupplier.apply("DumpStatusPages");
statusPageTimer.scheduleWithFixedDelay(
() -> {
Collection<Capturable> pages = statusPages.getDebugCapturePages();
Expand Down Expand Up @@ -645,6 +658,7 @@ public void run() {
60,
60,
TimeUnit.SECONDS);
scheduledExecutors.add(statusPageTimer);
}

reportHarnessStartup();
Expand Down Expand Up @@ -676,25 +690,15 @@ public void addWorkerStatusPage(BaseStatusServlet page) {

public void stop() {
try {
if (globalConfigRefreshTimer != null) {
globalConfigRefreshTimer.shutdown();
}
globalWorkerUpdatesTimer.shutdown();
if (refreshWorkTimer != null) {
refreshWorkTimer.shutdown();
}
if (statusPageTimer != null) {
statusPageTimer.shutdown();
}
if (globalConfigRefreshTimer != null) {
globalConfigRefreshTimer.awaitTermination(300, TimeUnit.SECONDS);
}
globalWorkerUpdatesTimer.awaitTermination(300, TimeUnit.SECONDS);
if (refreshWorkTimer != null) {
refreshWorkTimer.awaitTermination(300, TimeUnit.SECONDS);
for (ScheduledExecutorService timer : scheduledExecutors) {
if (timer != null) {
timer.shutdown();
}
}
if (statusPageTimer != null) {
statusPageTimer.awaitTermination(300, TimeUnit.SECONDS);
for (ScheduledExecutorService timer : scheduledExecutors) {
if (timer != null) {
timer.awaitTermination(300, TimeUnit.SECONDS);
}
}
statusPages.stop();
if (debugCaptureManager != null) {
Expand All @@ -716,6 +720,7 @@ public void stop() {

// one last send
reportPeriodicWorkerUpdates();
reportPeriodicWorkerMessage();
} catch (Exception e) {
LOG.warn("Exception while shutting down: ", e);
}
Expand Down Expand Up @@ -1584,12 +1589,14 @@ private void schedulePeriodicGlobalConfigRequests() {
LOG.info("windmillServerStub is now ready");

// Now start a thread that periodically refreshes the windmill service endpoint.
globalConfigRefreshTimer = executorSupplier.apply("GlobalConfigRefreshTimer");
globalConfigRefreshTimer.scheduleWithFixedDelay(
ScheduledExecutorService configRefreshTimer =
executorSupplier.apply("GlobalConfigRefreshTimer");
configRefreshTimer.scheduleWithFixedDelay(
this::getGlobalConfig,
0,
options.getGlobalConfigRefreshPeriod().getMillis(),
TimeUnit.MILLISECONDS);
scheduledExecutors.add(configRefreshTimer);
}

private void getGlobalConfig() {
Expand Down Expand Up @@ -1742,9 +1749,20 @@ private void updateThreadMetrics() {
maxOutstandingBytes.getAndReset();
maxOutstandingBytes.addValue(workUnitExecutor.maximumBytesOutstanding());
outstandingBundles.getAndReset();
outstandingBundles.addValue(workUnitExecutor.elementsOutstanding());
outstandingBundles.addValue((long) workUnitExecutor.elementsOutstanding());
maxOutstandingBundles.getAndReset();
maxOutstandingBundles.addValue(workUnitExecutor.maximumElementsOutstanding());
maxOutstandingBundles.addValue((long) workUnitExecutor.maximumElementsOutstanding());
}

private void sendWorkerMessage() throws IOException {
StreamingScalingReport activeThreadsReport =
new StreamingScalingReport()
.setActiveThreadCount(workUnitExecutor.activeCount())
.setActiveBundleCount(workUnitExecutor.elementsOutstanding())
.setMaximumThreadCount(chooseMaximumNumberOfThreads())
.setMaximumBundleCount(workUnitExecutor.maximumElementsOutstanding());
workUnitClient.reportWorkerMessage(
workUnitClient.createWorkerMessageFromStreamingScalingReport(activeThreadsReport));
}

@VisibleForTesting
Expand All @@ -1760,6 +1778,17 @@ public void reportPeriodicWorkerUpdates() {
}
}

@VisibleForTesting
public void reportPeriodicWorkerMessage() {
try {
sendWorkerMessage();
} catch (IOException e) {
LOG.warn("Failed to send worker messages", e);
} catch (Exception e) {
LOG.error("Unexpected exception while trying to send worker messages", e);
}
}

/**
* Returns key for a counter update. It is a String in case of legacy counter and
* CounterStructuredName in the case of a structured counter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
*/
package org.apache.beam.runners.dataflow.worker;

import com.google.api.services.dataflow.model.StreamingScalingReport;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
import com.google.api.services.dataflow.model.WorkItemStatus;
import com.google.api.services.dataflow.model.WorkerMessage;
import java.io.IOException;
import java.util.Optional;

Expand Down Expand Up @@ -49,4 +51,19 @@ interface WorkUnitClient {
* @return a {@link WorkItemServiceState} (e.g. a new stop position)
*/
WorkItemServiceState reportWorkItemStatus(WorkItemStatus workItemStatus) throws IOException;

/**
* Creates a {@link WorkerMessage} containing the given Streaming Scaling Report
*
* @param report the StreamingScalingReport containing autoscaling metrics
* @return a {@link WorkerMessage}
*/
WorkerMessage createWorkerMessageFromStreamingScalingReport(StreamingScalingReport report);

/**
* Reports the autoscaling signals with a {@link StreamingScalingReport}.
*
* @param msg the WorkerMessage to report
*/
void reportWorkerMessage(WorkerMessage msg) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ public long bytesOutstanding() {
return bytesOutstanding;
}

public long elementsOutstanding() {
public int elementsOutstanding() {
return elementsOutstanding;
}

public long maximumBytesOutstanding() {
return maximumBytesOutstanding;
}

public long maximumElementsOutstanding() {
public int maximumElementsOutstanding() {
return maximumElementsOutstanding;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@
import com.google.api.services.dataflow.model.LeaseWorkItemRequest;
import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
import com.google.api.services.dataflow.model.MapTask;
import com.google.api.services.dataflow.model.SendWorkerMessagesRequest;
import com.google.api.services.dataflow.model.SendWorkerMessagesResponse;
import com.google.api.services.dataflow.model.SeqMapTask;
import com.google.api.services.dataflow.model.StreamingScalingReport;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkerMessage;
import java.io.IOException;
import java.util.Optional;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
Expand Down Expand Up @@ -227,6 +231,26 @@ public void testCloudServiceCallMultipleWorkItems() throws Exception {
client.getWorkItem();
}

@Test
public void testReportWorkerMessage() throws Exception {
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
response.setContentType(Json.MEDIA_TYPE);
SendWorkerMessagesResponse workerMessage = new SendWorkerMessagesResponse();
workerMessage.setFactory(Transport.getJsonFactory());
response.setContent(workerMessage.toPrettyString());
when(request.execute()).thenReturn(response);
StreamingScalingReport activeThreadsReport =
new StreamingScalingReport().setActiveThreadCount(1);
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);
WorkerMessage msg = client.createWorkerMessageFromStreamingScalingReport(activeThreadsReport);
client.reportWorkerMessage(msg);

SendWorkerMessagesRequest actualRequest =
Transport.getJsonFactory()
.fromString(request.getContentAsString(), SendWorkerMessagesRequest.class);
assertEquals(ImmutableList.of(msg), actualRequest.getWorkerMessages());
}

private LowLevelHttpResponse generateMockResponse(WorkItem... workItems) throws Exception {
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
response.setContentType(Json.MEDIA_TYPE);
Expand Down

0 comments on commit e867ed7

Please sign in to comment.