Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate direct path #31902

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
15675d0
add getdata clients and shareable logic for GetData w/ throttling and…
m-trieu Jul 4, 2024
8c1ca87
add HeartbeatSender interface
m-trieu Jul 5, 2024
9f42751
address PR comments
m-trieu Jul 11, 2024
e623fc7
move side input state API into Work instance w/ keyed state fetching.…
m-trieu Jul 12, 2024
0735a0c
don't let FanOutWorkRefreshClient exceptions crash the user worker
m-trieu Jul 12, 2024
998c07a
address PR comments
m-trieu Jul 25, 2024
659d5aa
address PR comments
m-trieu Jul 25, 2024
554b73e
add WorkProvider interface and have StreamingEngine and Appliance impls
m-trieu Jul 13, 2024
a7a9efa
change StreamingEngineClient to FanOutStreamingEngineWorkProvider and…
m-trieu Jul 13, 2024
52c1fec
add WorkProvider interface and have StreamingEngine and Appliance impls
m-trieu Jul 13, 2024
83655fa
change StreamingEngineClient to FanOutStreamingEngineWorkProvider and…
m-trieu Jul 13, 2024
e27fde2
add WorkProvider interface and have StreamingEngine and Appliance impls
m-trieu Jul 13, 2024
9bf73c4
change StreamingEngineClient to FanOutStreamingEngineWorkProvider and…
m-trieu Jul 13, 2024
9beeec9
add stream management and correct stream shutdown mechanisms for dire…
m-trieu Jul 16, 2024
0d60a18
integrate direct path components
m-trieu Aug 6, 2024
361ec3c
address PR comments
m-trieu Aug 24, 2024
fd6549e
simplify budgeting logic for GrpcDirectGetWorkStream
m-trieu Aug 27, 2024
36015f8
fix stuckness on range movements
m-trieu Sep 11, 2024
14025db
address PR comments, fix broken tests
m-trieu Sep 12, 2024
2144cd9
address PR comments
m-trieu Sep 16, 2024
92414f7
remove thread polling for new metadata; propogate metadata version to…
m-trieu Sep 25, 2024
b09ba8f
deflake FanOutStreamingEngineWorkerHarnessTest.testOnNewWorkerMetadat…
m-trieu Sep 27, 2024
3db3b7f
address PR comments
m-trieu Sep 30, 2024
06836da
address PR comments
m-trieu Oct 8, 2024
6df1adf
remove MAX_WAIT_SECONDS and just use the deadline passed in, its alre…
m-trieu Oct 10, 2024
4f5b381
address PR comments, remove unused IPv6 WindmillServiceAddress, renam…
m-trieu Oct 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.joda.time.Duration;
Expand Down Expand Up @@ -219,10 +220,8 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {

void setWindmillServiceStreamMaxBackoffMillis(int value);

@Description(
"If true, Dataflow streaming pipeline will be running in direct path mode."
+ " VMs must have IPv6 enabled for this to work.")
@Default.Boolean(false)
@Description("Enables direct path mode for streaming engine.")
@Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class)
boolean getIsWindmillServiceDirectPathEnabled();

void setIsWindmillServiceDirectPathEnabled(boolean isWindmillServiceDirectPathEnabled);
Expand Down Expand Up @@ -300,4 +299,12 @@ public Integer create(PipelineOptions options) {
return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE : 1;
}
}

/** EnableStreamingEngine defaults to false unless one of the experiment is set. */
class EnableWindmillServiceDirectPathFactory implements DefaultValueFactory<Boolean> {
@Override
public Boolean create(PipelineOptions options) {
return ExperimentalOptions.hasExperiment(options, "enable_windmill_service_direct_path");
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public WorkItemCancelledException(long sharding_key) {
super("Work item cancelled for key " + sharding_key);
}

public WorkItemCancelledException(Throwable e) {
super(e);
public WorkItemCancelledException(String message, Throwable cause) {
super(message, cause);
}

/** Returns whether an exception was caused by a {@link WorkItemCancelledException}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,20 @@ synchronized Optional<ExecutableWork> completeWorkAndGetNextWorkForKey(
@Nullable Queue<ExecutableWork> workQueue = activeWork.get(shardedKey);
if (workQueue == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workId);
LOG.warn(
"Unable to complete inactive work for key={} and token={}. Work queue for key does not exist.",
shardedKey,
workId);
return Optional.empty();
}

removeCompletedWorkFromQueue(workQueue, shardedKey, workId);
return getNextWork(workQueue, shardedKey);
}

private synchronized void removeCompletedWorkFromQueue(
Queue<ExecutableWork> workQueue, ShardedKey shardedKey, WorkId workId) {
// avoid Preconditions.checkState here to prevent eagerly evaluating the
// format string parameters for the error message.
ExecutableWork completedWork = workQueue.peek();
@Nullable ExecutableWork completedWork = workQueue.peek();
if (completedWork == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Active key {} without work, expected token {}", shardedKey, workId);
Expand Down Expand Up @@ -337,8 +339,18 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
writer.println(
"<table border=\"1\" "
+ "style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">");
// Columns.
writer.println(
"<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active For</th><th>State</th><th>State Active For</th><th>Processing Thread</th></tr>");
"<tr>"
+ "<th>Key</th>"
+ "<th>Token</th>"
+ "<th>Queued</th>"
+ "<th>Active For</th>"
+ "<th>State</th>"
+ "<th>State Active For</th>"
+ "<th>Processing Thread</th>"
+ "<th>Backend</th>"
+ "</tr>");
// Use StringBuilder because we are appending in loop.
StringBuilder activeWorkStatus = new StringBuilder();
int commitsPendingCount = 0;
Expand Down Expand Up @@ -366,6 +378,8 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
activeWorkStatus.append(elapsedString(activeWork.getStateStartTime(), now));
activeWorkStatus.append("</td><td>");
activeWorkStatus.append(activeWork.getProcessingThreadName());
activeWorkStatus.append("</td><td>");
activeWorkStatus.append(activeWork.backendWorkerToken());
activeWorkStatus.append("</td></tr>\n");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
/**
* Represents the state of an attempt to process a {@link WorkItem} by executing user code.
*
* @implNote Not thread safe, should not be executed or accessed by more than 1 thread at a time.
* @implNote Not thread safe, should not be modified by more than 1 thread at a time.
*/
@NotThreadSafe
@Internal
Expand All @@ -70,7 +70,7 @@ public final class Work implements RefreshableWork {
private final Map<LatencyAttribution.State, Duration> totalDurationPerState;
private final WorkId id;
private final String latencyTrackingId;
private TimedState currentState;
private volatile TimedState currentState;
private volatile boolean isFailed;
private volatile String processingThreadName = "";

Expand Down Expand Up @@ -111,7 +111,18 @@ public static ProcessingContext createProcessingContext(
GetDataClient getDataClient,
Consumer<Commit> workCommitter,
HeartbeatSender heartbeatSender) {
return ProcessingContext.create(computationId, getDataClient, workCommitter, heartbeatSender);
return ProcessingContext.create(
computationId, getDataClient, workCommitter, heartbeatSender, /* backendWorkerToken= */ "");
}

public static ProcessingContext createProcessingContext(
String computationId,
GetDataClient getDataClient,
Consumer<Commit> workCommitter,
HeartbeatSender heartbeatSender,
String backendWorkerToken) {
return ProcessingContext.create(
computationId, getDataClient, workCommitter, heartbeatSender, backendWorkerToken);
}

private static LatencyAttribution.Builder createLatencyAttributionWithActiveLatencyBreakdown(
Expand Down Expand Up @@ -168,6 +179,10 @@ public GlobalData fetchSideInput(GlobalDataRequest request) {
return processingContext.getDataClient().getSideInputData(request);
}

public String backendWorkerToken() {
return processingContext.backendWorkerToken();
}

public Watermarks watermarks() {
return watermarks;
}
Expand Down Expand Up @@ -351,9 +366,10 @@ private static ProcessingContext create(
String computationId,
GetDataClient getDataClient,
Consumer<Commit> workCommitter,
HeartbeatSender heartbeatSender) {
HeartbeatSender heartbeatSender,
String backendWorkerToken) {
return new AutoValue_Work_ProcessingContext(
computationId, getDataClient, heartbeatSender, workCommitter);
computationId, getDataClient, heartbeatSender, workCommitter, backendWorkerToken);
}

/** Computation that the {@link Work} belongs to. */
Expand All @@ -370,6 +386,8 @@ private static ProcessingContext create(
*/
public abstract Consumer<Commit> workCommitter();

public abstract String backendWorkerToken();

private Optional<KeyedGetDataResponse> fetchKeyedState(KeyedGetDataRequest request) {
return Optional.ofNullable(getDataClient().getStateData(computationId(), request));
}
Expand Down
Loading
Loading