Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

break nested classes out of StreamingDataflowWorker #28537

Merged
merged 1 commit into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,8 @@
})
public class BatchDataflowWorker implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(BatchDataflowWorker.class);

/** A client to get and update work items. */
private final WorkUnitClient workUnitClient;

/**
* Pipeline options, initially provided via the constructor and partially provided via each work
* work unit.
*/
private final DataflowWorkerHarnessOptions options;

/** The factory to create {@link DataflowMapTaskExecutor DataflowMapTaskExecutors}. */
private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;

/** The idGenerator to generate unique id globally. */
private static final IdGenerator idGenerator = IdGenerators.decrementingLongs();

/**
* Function which converts map tasks to their network representation for execution.
*
Expand All @@ -90,64 +76,50 @@ public class BatchDataflowWorker implements Closeable {
new FixMultiOutputInfosOnParDoInstructions(idGenerator)
.andThen(new MapTaskToNetworkFunction(idGenerator));

/** Registry of known {@link ReaderFactory ReaderFactories}. */
private final ReaderRegistry readerRegistry = ReaderRegistry.defaultRegistry();

/** Registry of known {@link SinkFactory SinkFactories}. */
private final SinkRegistry sinkRegistry = SinkRegistry.defaultRegistry();

/** A side input cache shared between all execution contexts. */
private final Cache<?, WeightedValue<?>> sideInputDataCache;

/**
* A side input cache shared between all execution contexts. This cache is meant to store values
* as weak references. This allows for insertion of logical keys with zero weight since they will
* only be scoped to the lifetime of the value being cached.
*/
private final Cache<?, ?> sideInputWeakReferenceCache;

private static final int DEFAULT_STATUS_PORT = 8081;

/** Status pages returning health of worker. */
private WorkerStatusPages statusPages;

/** Periodic sender of debug information to the debug capture service. */
private DebugCapture.Manager debugCaptureManager = null;

/**
* A weight in "bytes" for the overhead of a {@link Weighted} wrapper in the cache. It is just an
* approximation so it is OK for it to be fairly arbitrary as long as it is nonzero.
*/
private static final int OVERHEAD_WEIGHT = 8;

private static final long MEGABYTES = 1024 * 1024;

/**
* Limit the number of logical references. Weak references may never be cleared if the object is
* long lived irrespective if the user actually is interested in the key lookup anymore.
*/
private static final int MAX_LOGICAL_REFERENCES = 1_000_000;

/** How many concurrent write operations to a cache should we allow. */
private static final int CACHE_CONCURRENCY_LEVEL = 4 * Runtime.getRuntime().availableProcessors();
/** A client to get and update work items. */
private final WorkUnitClient workUnitClient;
/**
* Pipeline options, initially provided via the constructor and partially provided via each work
* work unit.
*/
private final DataflowWorkerHarnessOptions options;
/** The factory to create {@link DataflowMapTaskExecutor DataflowMapTaskExecutors}. */
private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
/** Registry of known {@link ReaderFactory ReaderFactories}. */
private final ReaderRegistry readerRegistry = ReaderRegistry.defaultRegistry();
/** Registry of known {@link SinkFactory SinkFactories}. */
private final SinkRegistry sinkRegistry = SinkRegistry.defaultRegistry();
/** A side input cache shared between all execution contexts. */
private final Cache<?, WeightedValue<?>> sideInputDataCache;
/**
* A side input cache shared between all execution contexts. This cache is meant to store values
* as weak references. This allows for insertion of logical keys with zero weight since they will
* only be scoped to the lifetime of the value being cached.
*/
private final Cache<?, ?> sideInputWeakReferenceCache;

private final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToNetwork;

private final MemoryMonitor memoryMonitor;
private final Thread memoryMonitorThread;

/**
* Returns a {@link BatchDataflowWorker} configured to execute user functions via intrinsic Java
* execution.
*
* <p>This is also known as the "legacy" or "pre-portability" approach. It is not yet deprecated
* as there is not a compatible path forward for users.
*/
static BatchDataflowWorker forBatchIntrinsicWorkerHarness(
WorkUnitClient workUnitClient, DataflowWorkerHarnessOptions options) {
return new BatchDataflowWorker(
workUnitClient, IntrinsicMapTaskExecutorFactory.defaultFactory(), options);
}
/** Status pages returning health of worker. */
private final WorkerStatusPages statusPages;
/** Periodic sender of debug information to the debug capture service. */
private DebugCapture.Manager debugCaptureManager = null;

protected BatchDataflowWorker(
WorkUnitClient workUnitClient,
Expand Down Expand Up @@ -188,6 +160,19 @@ protected BatchDataflowWorker(
ExecutionStateSampler.instance().start();
}

/**
* Returns a {@link BatchDataflowWorker} configured to execute user functions via intrinsic Java
* execution.
*
* <p>This is also known as the "legacy" or "pre-portability" approach. It is not yet deprecated
* as there is not a compatible path forward for users.
*/
static BatchDataflowWorker forBatchIntrinsicWorkerHarness(
WorkUnitClient workUnitClient, DataflowWorkerHarnessOptions options) {
return new BatchDataflowWorker(
workUnitClient, IntrinsicMapTaskExecutorFactory.defaultFactory(), options);
}

private static DebugCapture.Manager initializeAndStartDebugCaptureManager(
DataflowWorkerHarnessOptions options, Collection<Capturable> debugCapturePages) {
DebugCapture.Manager result = new DebugCapture.Manager(options, debugCapturePages);
Expand Down Expand Up @@ -215,7 +200,7 @@ private static Thread startMemoryMonitorThread(MemoryMonitor memoryMonitor) {
*/
public boolean getAndPerformWork() throws IOException {
while (true) {
Optional<WorkItem> work = workUnitClient.getWorkItem();
Optional<WorkItem> work = Optional.fromJavaUtil(workUnitClient.getWorkItem());
if (work.isPresent()) {
WorkItemStatusClient statusProvider = new WorkItemStatusClient(workUnitClient, work.get());
return doWork(work.get(), statusProvider);
Expand Down Expand Up @@ -243,7 +228,7 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) thr
} else if (workItem.getSourceOperationTask() != null) {
stageName = workItem.getSourceOperationTask().getStageName();
} else {
throw new RuntimeException("Unknown kind of work item: " + workItem.toString());
throw new RuntimeException("Unknown kind of work item: " + workItem);
}

CounterSet counterSet = new CounterSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkProgressUpdater;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -87,7 +87,7 @@ class DataflowWorkUnitClient implements WorkUnitClient {
}

/**
* Gets a {@link WorkItem} from the Dataflow service, or returns {@link Optional#absent()} if no
* Gets a {@link WorkItem} from the Dataflow service, or returns {@link Optional#empty()} if no
* work was found.
*
* <p>If work is returned, the calling thread should call reportWorkItemStatus after completing it
Expand Down Expand Up @@ -116,11 +116,11 @@ public Optional<WorkItem> getWorkItem() throws IOException {
if (!workItem.isPresent()) {
// Normal case, this means that the response contained no work, i.e. no work is available
// at this time.
return Optional.absent();
return Optional.empty();
}
if (workItem.isPresent() && workItem.get().getId() == null) {
logger.debug("Discarding invalid work item {}", workItem.orNull());
return Optional.absent();
if (workItem.get().getId() == null) {
logger.debug("Discarding invalid work item {}", workItem.get());
return Optional.empty();
}

WorkItem work = workItem.get();
Expand Down Expand Up @@ -148,7 +148,7 @@ public Optional<WorkItem> getWorkItem() throws IOException {

/**
* Gets a global streaming config {@link WorkItem} from the Dataflow service, or returns {@link
* Optional#absent()} if no work was found.
* Optional#empty()} if no work was found.
*/
@Override
public Optional<WorkItem> getGlobalStreamingConfigWorkItem() throws IOException {
Expand All @@ -158,7 +158,7 @@ public Optional<WorkItem> getGlobalStreamingConfigWorkItem() throws IOException

/**
* Gets a streaming config {@link WorkItem} for the given computation from the Dataflow service,
* or returns {@link Optional#absent()} if no work was found.
* or returns {@link Optional#empty()} if no work was found.
*/
@Override
public Optional<WorkItem> getStreamingConfigWorkItem(String computationId) throws IOException {
Expand Down Expand Up @@ -197,7 +197,7 @@ private Optional<WorkItem> getWorkItemInternal(
List<WorkItem> workItems = response.getWorkItems();
if (workItems == null || workItems.isEmpty()) {
// We didn't lease any work.
return Optional.absent();
return Optional.empty();
} else if (workItems.size() > 1) {
throw new IOException(
"This version of the SDK expects no more than one work item from the service: "
Expand Down
Loading
Loading