Skip to content

Commit

Permalink
integrate direct path components
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Aug 6, 2024
1 parent 7916a29 commit ff287f2
Show file tree
Hide file tree
Showing 10 changed files with 330 additions and 280 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
}

/** Add up all the throttle times of all streams including GetWorkerMetadataStream. */
public long getAndResetThrottleTimes() {
@Override
public long getAndResetThrottleTime() {
return connections.get().windmillStreams().values().stream()
.map(WindmillStreamSender::getAndResetThrottleTime)
.reduce(0L, Long::sum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
private final Function<String, Optional<ComputationState>> computationStateFetcher;
private final ExecutorService workProviderExecutor;
private final GetWorkSender getWorkSender;
private final Supplier<Long> throttleTimeSupplier;

SingleSourceWorkerHarness(
WorkCommitter workCommitter,
Expand All @@ -74,7 +75,8 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
StreamingWorkScheduler streamingWorkScheduler,
Runnable waitForResources,
Function<String, Optional<ComputationState>> computationStateFetcher,
GetWorkSender getWorkSender) {
GetWorkSender getWorkSender,
Supplier<Long> throttleTimeSupplier) {
this.workCommitter = workCommitter;
this.getDataClient = getDataClient;
this.heartbeatSender = heartbeatSender;
Expand All @@ -90,6 +92,7 @@ public final class SingleSourceWorkerHarness implements StreamingWorkerHarness {
.build());
this.isRunning = new AtomicBoolean(false);
this.getWorkSender = getWorkSender;
this.throttleTimeSupplier = throttleTimeSupplier;
}

public static SingleSourceWorkerHarness.Builder builder() {
Expand Down Expand Up @@ -144,6 +147,11 @@ public void shutdown() {
workCommitter.stop();
}

@Override
public long getAndResetThrottleTime() {
return throttleTimeSupplier.get();
}

private void streamingEngineDispatchLoop(
Function<WorkItemReceiver, WindmillStream.GetWorkStream> getWorkStreamFactory) {
while (isRunning.get()) {
Expand Down Expand Up @@ -254,6 +262,8 @@ Builder setComputationStateFetcher(

Builder setGetWorkSender(GetWorkSender getWorkSender);

Builder setThrottleTimeSupplier(Supplier<Long> throttleTimeSupplier);

SingleSourceWorkerHarness build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ public interface StreamingWorkerHarness {
void start();

void shutdown();

long getAndResetThrottleTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.api.services.dataflow.model.WorkItemStatus;
import com.google.api.services.dataflow.model.WorkerMessage;
import com.google.api.services.dataflow.model.WorkerMessageResponse;
import com.google.auto.value.AutoBuilder;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.ArrayList;
Expand Down Expand Up @@ -97,7 +98,7 @@ public final class StreamingWorkerStatusReporter {
// Used to track the number of WorkerMessages that have been sent without PerWorkerMetrics.
private final AtomicLong workerMessagesIndex;

private StreamingWorkerStatusReporter(
StreamingWorkerStatusReporter(
boolean publishCounters,
WorkUnitClient dataflowServiceClient,
Supplier<Long> windmillQuotaThrottleTime,
Expand Down Expand Up @@ -131,57 +132,13 @@ private StreamingWorkerStatusReporter(
this.workerMessagesIndex = new AtomicLong();
}

public static StreamingWorkerStatusReporter create(
WorkUnitClient workUnitClient,
Supplier<Long> windmillQuotaThrottleTime,
Supplier<Collection<StageInfo>> allStageInfo,
FailureTracker failureTracker,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
return new StreamingWorkerStatusReporter(
/* publishCounters= */ true,
workUnitClient,
windmillQuotaThrottleTime,
allStageInfo,
failureTracker,
streamingCounters,
memoryMonitor,
workExecutor,
threadName ->
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(threadName).build()),
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
}

@VisibleForTesting
public static StreamingWorkerStatusReporter forTesting(
boolean publishCounters,
WorkUnitClient workUnitClient,
Supplier<Long> windmillQuotaThrottleTime,
Supplier<Collection<StageInfo>> allStageInfo,
FailureTracker failureTracker,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
BoundedQueueExecutor workExecutor,
Function<String, ScheduledExecutorService> executorFactory,
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
return new StreamingWorkerStatusReporter(
publishCounters,
workUnitClient,
windmillQuotaThrottleTime,
allStageInfo,
failureTracker,
streamingCounters,
memoryMonitor,
workExecutor,
executorFactory,
windmillHarnessUpdateReportingPeriodMillis,
perWorkerMetricsUpdateReportingPeriodMillis);
public static Builder builder() {
return new AutoBuilder_StreamingWorkerStatusReporter_Builder()
.setPublishCounters(true)
.setExecutorFactory(
threadName ->
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(threadName).build()));
}

/**
Expand Down Expand Up @@ -228,6 +185,22 @@ private static void shutdownExecutor(ScheduledExecutorService executor) {
}
}

// Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the
// WorkerMessages RPC schedule. The desired reporting period
// (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple
// of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
private static long getPerWorkerMetricsUpdateFrequency(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
if (windmillHarnessUpdateReportingPeriodMillis == 0) {
return 0;
}
return LongMath.divide(
perWorkerMetricsUpdateReportingPeriodMillis,
windmillHarnessUpdateReportingPeriodMillis,
RoundingMode.CEILING);
}

@SuppressWarnings("FutureReturnValueIgnored")
public void start() {
reportHarnessStartup();
Expand Down Expand Up @@ -276,22 +249,6 @@ private void reportHarnessStartup() {
}
}

// Calculates the PerWorkerMetrics reporting frequency, ensuring alignment with the
// WorkerMessages RPC schedule. The desired reporting period
// (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest multiple
// of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
private static long getPerWorkerMetricsUpdateFrequency(
long windmillHarnessUpdateReportingPeriodMillis,
long perWorkerMetricsUpdateReportingPeriodMillis) {
if (windmillHarnessUpdateReportingPeriodMillis == 0) {
return 0;
}
return LongMath.divide(
perWorkerMetricsUpdateReportingPeriodMillis,
windmillHarnessUpdateReportingPeriodMillis,
RoundingMode.CEILING);
}

/** Sends counter updates to Dataflow backend. */
private void sendWorkerUpdatesToDataflowService(
CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException {
Expand Down Expand Up @@ -496,4 +453,33 @@ private void updateThreadMetrics() {
.maxOutstandingBundles()
.addValue((long) workExecutor.maximumElementsOutstanding());
}

@AutoBuilder
public interface Builder {
Builder setPublishCounters(boolean publishCounters);

Builder setDataflowServiceClient(WorkUnitClient dataflowServiceClient);

Builder setWindmillQuotaThrottleTime(Supplier<Long> windmillQuotaThrottleTime);

Builder setAllStageInfo(Supplier<Collection<StageInfo>> allStageInfo);

Builder setFailureTracker(FailureTracker failureTracker);

Builder setStreamingCounters(StreamingCounters streamingCounters);

Builder setMemoryMonitor(MemoryMonitor memoryMonitor);

Builder setWorkExecutor(BoundedQueueExecutor workExecutor);

Builder setExecutorFactory(Function<String, ScheduledExecutorService> executorFactory);

Builder setWindmillHarnessUpdateReportingPeriodMillis(
long windmillHarnessUpdateReportingPeriodMillis);

Builder setPerWorkerMetricsUpdateReportingPeriodMillis(
long perWorkerMetricsUpdateReportingPeriodMillis);

StreamingWorkerStatusReporter build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ protected synchronized void onNewStream() {
send(
StreamingGetWorkRequest.newBuilder()
.setRequest(
request.toBuilder()
request
.toBuilder()
.setMaxBytes(budgetAdjustment.bytes())
.setMaxItems(budgetAdjustment.items()))
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@
import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class GrpcGetWorkStream
extends AbstractWindmillStream<StreamingGetWorkRequest, StreamingGetWorkResponseChunk>
implements GetWorkStream {

private static final Logger LOG = LoggerFactory.getLogger(GrpcGetWorkStream.class);

private final GetWorkRequest request;
private final WorkItemReceiver receiver;
private final ThrottleTimer getWorkThrottleTimer;
Expand Down

This file was deleted.

Loading

0 comments on commit ff287f2

Please sign in to comment.