Skip to content

Commit

Permalink
Use java.util.Optional instead of google Optional
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Sep 19, 2023
1 parent 18018c3 commit efbdc65
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,8 @@
})
public class BatchDataflowWorker implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(BatchDataflowWorker.class);

/** A client to get and update work items. */
private final WorkUnitClient workUnitClient;

/**
* Pipeline options, initially provided via the constructor and partially provided via each work
* work unit.
*/
private final DataflowWorkerHarnessOptions options;

/** The factory to create {@link DataflowMapTaskExecutor DataflowMapTaskExecutors}. */
private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;

/** The idGenerator to generate unique id globally. */
private static final IdGenerator idGenerator = IdGenerators.decrementingLongs();

/**
* Function which converts map tasks to their network representation for execution.
*
Expand All @@ -90,64 +76,50 @@ public class BatchDataflowWorker implements Closeable {
new FixMultiOutputInfosOnParDoInstructions(idGenerator)
.andThen(new MapTaskToNetworkFunction(idGenerator));

/** Registry of known {@link ReaderFactory ReaderFactories}. */
private final ReaderRegistry readerRegistry = ReaderRegistry.defaultRegistry();

/** Registry of known {@link SinkFactory SinkFactories}. */
private final SinkRegistry sinkRegistry = SinkRegistry.defaultRegistry();

/** A side input cache shared between all execution contexts. */
private final Cache<?, WeightedValue<?>> sideInputDataCache;

/**
* A side input cache shared between all execution contexts. This cache is meant to store values
* as weak references. This allows for insertion of logical keys with zero weight since they will
* only be scoped to the lifetime of the value being cached.
*/
private final Cache<?, ?> sideInputWeakReferenceCache;

private static final int DEFAULT_STATUS_PORT = 8081;

/** Status pages returning health of worker. */
private WorkerStatusPages statusPages;

/** Periodic sender of debug information to the debug capture service. */
private DebugCapture.Manager debugCaptureManager = null;

/**
* A weight in "bytes" for the overhead of a {@link Weighted} wrapper in the cache. It is just an
* approximation so it is OK for it to be fairly arbitrary as long as it is nonzero.
*/
private static final int OVERHEAD_WEIGHT = 8;

private static final long MEGABYTES = 1024 * 1024;

/**
* Limit the number of logical references. Weak references may never be cleared if the object is
* long lived irrespective if the user actually is interested in the key lookup anymore.
*/
private static final int MAX_LOGICAL_REFERENCES = 1_000_000;

/** How many concurrent write operations to a cache should we allow. */
private static final int CACHE_CONCURRENCY_LEVEL = 4 * Runtime.getRuntime().availableProcessors();
/** A client to get and update work items. */
private final WorkUnitClient workUnitClient;
/**
* Pipeline options, initially provided via the constructor and partially provided via each work
* work unit.
*/
private final DataflowWorkerHarnessOptions options;
/** The factory to create {@link DataflowMapTaskExecutor DataflowMapTaskExecutors}. */
private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
/** Registry of known {@link ReaderFactory ReaderFactories}. */
private final ReaderRegistry readerRegistry = ReaderRegistry.defaultRegistry();
/** Registry of known {@link SinkFactory SinkFactories}. */
private final SinkRegistry sinkRegistry = SinkRegistry.defaultRegistry();
/** A side input cache shared between all execution contexts. */
private final Cache<?, WeightedValue<?>> sideInputDataCache;
/**
* A side input cache shared between all execution contexts. This cache is meant to store values
* as weak references. This allows for insertion of logical keys with zero weight since they will
* only be scoped to the lifetime of the value being cached.
*/
private final Cache<?, ?> sideInputWeakReferenceCache;

private final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToNetwork;

private final MemoryMonitor memoryMonitor;
private final Thread memoryMonitorThread;

/**
* Returns a {@link BatchDataflowWorker} configured to execute user functions via intrinsic Java
* execution.
*
* <p>This is also known as the "legacy" or "pre-portability" approach. It is not yet deprecated
* as there is not a compatible path forward for users.
*/
static BatchDataflowWorker forBatchIntrinsicWorkerHarness(
WorkUnitClient workUnitClient, DataflowWorkerHarnessOptions options) {
return new BatchDataflowWorker(
workUnitClient, IntrinsicMapTaskExecutorFactory.defaultFactory(), options);
}
/** Status pages returning health of worker. */
private final WorkerStatusPages statusPages;
/** Periodic sender of debug information to the debug capture service. */
private DebugCapture.Manager debugCaptureManager = null;

protected BatchDataflowWorker(
WorkUnitClient workUnitClient,
Expand Down Expand Up @@ -188,6 +160,19 @@ protected BatchDataflowWorker(
ExecutionStateSampler.instance().start();
}

/**
* Returns a {@link BatchDataflowWorker} configured to execute user functions via intrinsic Java
* execution.
*
* <p>This is also known as the "legacy" or "pre-portability" approach. It is not yet deprecated
* as there is not a compatible path forward for users.
*/
static BatchDataflowWorker forBatchIntrinsicWorkerHarness(
WorkUnitClient workUnitClient, DataflowWorkerHarnessOptions options) {
return new BatchDataflowWorker(
workUnitClient, IntrinsicMapTaskExecutorFactory.defaultFactory(), options);
}

private static DebugCapture.Manager initializeAndStartDebugCaptureManager(
DataflowWorkerHarnessOptions options, Collection<Capturable> debugCapturePages) {
DebugCapture.Manager result = new DebugCapture.Manager(options, debugCapturePages);
Expand Down Expand Up @@ -215,7 +200,7 @@ private static Thread startMemoryMonitorThread(MemoryMonitor memoryMonitor) {
*/
public boolean getAndPerformWork() throws IOException {
while (true) {
Optional<WorkItem> work = workUnitClient.getWorkItem();
Optional<WorkItem> work = Optional.fromJavaUtil(workUnitClient.getWorkItem());
if (work.isPresent()) {
WorkItemStatusClient statusProvider = new WorkItemStatusClient(workUnitClient, work.get());
return doWork(work.get(), statusProvider);
Expand Down Expand Up @@ -243,7 +228,7 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) thr
} else if (workItem.getSourceOperationTask() != null) {
stageName = workItem.getSourceOperationTask().getStageName();
} else {
throw new RuntimeException("Unknown kind of work item: " + workItem.toString());
throw new RuntimeException("Unknown kind of work item: " + workItem);
}

CounterSet counterSet = new CounterSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkProgressUpdater;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -87,7 +87,7 @@ class DataflowWorkUnitClient implements WorkUnitClient {
}

/**
* Gets a {@link WorkItem} from the Dataflow service, or returns {@link Optional#absent()} if no
* Gets a {@link WorkItem} from the Dataflow service, or returns {@link Optional#empty()} if no
* work was found.
*
* <p>If work is returned, the calling thread should call reportWorkItemStatus after completing it
Expand Down Expand Up @@ -116,11 +116,11 @@ public Optional<WorkItem> getWorkItem() throws IOException {
if (!workItem.isPresent()) {
// Normal case, this means that the response contained no work, i.e. no work is available
// at this time.
return Optional.absent();
return Optional.empty();
}
if (workItem.isPresent() && workItem.get().getId() == null) {
logger.debug("Discarding invalid work item {}", workItem.orNull());
return Optional.absent();
if (workItem.get().getId() == null) {
logger.debug("Discarding invalid work item {}", workItem.get());
return Optional.empty();
}

WorkItem work = workItem.get();
Expand Down Expand Up @@ -148,7 +148,7 @@ public Optional<WorkItem> getWorkItem() throws IOException {

/**
* Gets a global streaming config {@link WorkItem} from the Dataflow service, or returns {@link
* Optional#absent()} if no work was found.
* Optional#empty()} if no work was found.
*/
@Override
public Optional<WorkItem> getGlobalStreamingConfigWorkItem() throws IOException {
Expand All @@ -158,7 +158,7 @@ public Optional<WorkItem> getGlobalStreamingConfigWorkItem() throws IOException

/**
* Gets a streaming config {@link WorkItem} for the given computation from the Dataflow service,
* or returns {@link Optional#absent()} if no work was found.
* or returns {@link Optional#empty()} if no work was found.
*/
@Override
public Optional<WorkItem> getStreamingConfigWorkItem(String computationId) throws IOException {
Expand Down Expand Up @@ -197,7 +197,7 @@ private Optional<WorkItem> getWorkItemInternal(
List<WorkItem> workItems = response.getWorkItems();
if (workItems == null || workItems.isEmpty()) {
// We didn't lease any work.
return Optional.absent();
return Optional.empty();
} else if (workItems.size() > 1) {
throw new IOException(
"This version of the SDK expects no more than one work item from the service: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.google.api.services.dataflow.model.WorkItemServiceState;
import com.google.api.services.dataflow.model.WorkItemStatus;
import java.io.IOException;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional;
import java.util.Optional;

/** Abstract base class describing a client for WorkItem work units. */
interface WorkUnitClient {
Expand All @@ -31,14 +31,14 @@ interface WorkUnitClient {
Optional<WorkItem> getWorkItem() throws IOException;

/**
* Returns a new global streaming config WorkItem, or returns {@link Optional#absent()} if no work
* Returns a new global streaming config WorkItem, or returns {@link Optional#empty()} if no work
* was found.
*/
Optional<WorkItem> getGlobalStreamingConfigWorkItem() throws IOException;

/**
* Returns a streaming config WorkItem for the given computation, or returns {@link
* Optional#absent()} if no work was found.
* Optional#empty()} if no work was found.
*/
Optional<WorkItem> getStreamingConfigWorkItem(String computationId) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ public abstract class NameContext {
* systemName} and a {@code userName}.
*/
public static NameContext create(
String stageName, String originalName, String systemName, String userName) {
String stageName,
@Nullable String originalName,
String systemName,
@Nullable String userName) {
return new AutoValue_NameContext(stageName, originalName, systemName, userName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
import com.google.api.services.dataflow.model.WorkItemStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Optional;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
Expand All @@ -61,16 +61,10 @@
@RunWith(JUnit4.class)
public class BatchDataflowWorkerTest {

private static class WorkerException extends Exception {}

@Rule public FastNanoClockAndSleeper clockAndSleeper = new FastNanoClockAndSleeper();

@Mock WorkUnitClient mockWorkUnitClient;

@Mock DataflowWorkProgressUpdater mockProgressUpdater;

@Mock DataflowWorkExecutor mockWorkExecutor;

DataflowWorkerHarnessOptions options;

@Before
Expand Down Expand Up @@ -98,7 +92,7 @@ public void testWhenNoWorkIsReturnedThatWeImmediatelyRetry() throws Exception {
workItem.setReportStatusInterval(TimeUtil.toCloudDuration(Duration.standardMinutes(1)));

when(mockWorkUnitClient.getWorkItem())
.thenReturn(Optional.<WorkItem>absent())
.thenReturn(Optional.empty())
.thenReturn(Optional.of(workItem));

assertTrue(worker.getAndPerformWork());
Expand Down Expand Up @@ -138,7 +132,7 @@ public void testWhenProcessingWorkUnitFailsWeReportStatus() throws Exception {

Throwable error = errorCaptor.getValue();
assertThat(error, notNullValue());
assertThat(error.getMessage(), equalTo("Unknown kind of work item: " + workItem.toString()));
assertThat(error.getMessage(), equalTo("Unknown kind of work item: " + workItem));
}

@Test
Expand Down Expand Up @@ -168,8 +162,9 @@ public void testStopProgressReportInCaseOfFailure() throws Exception {
@Test
public void testIsSplitResponseTooLarge() throws IOException {
SourceSplitResponse splitResponse = new SourceSplitResponse();
splitResponse.setShards(
ImmutableList.<SourceSplitShard>of(new SourceSplitShard(), new SourceSplitShard()));
splitResponse.setShards(ImmutableList.of(new SourceSplitShard(), new SourceSplitShard()));
assertThat(DataflowApiUtils.computeSerializedSizeBytes(splitResponse), greaterThan(0L));
}

private static class WorkerException extends Exception {}
}
Loading

0 comments on commit efbdc65

Please sign in to comment.