From efbdc65176331004c5db8306d2b6e8f6d4bbf445 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Tue, 19 Sep 2023 12:05:23 -0700 Subject: [PATCH] Use java.util.Optional instead of google Optional --- .../dataflow/worker/BatchDataflowWorker.java | 95 ++++++++----------- .../worker/DataflowWorkUnitClient.java | 18 ++-- .../dataflow/worker/WorkUnitClient.java | 6 +- .../dataflow/worker/counters/NameContext.java | 5 +- .../worker/BatchDataflowWorkerTest.java | 17 ++-- .../worker/DataflowWorkUnitClientTest.java | 31 +++--- 6 files changed, 77 insertions(+), 95 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java index 9144729faca21..7407c97619b4d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java @@ -60,22 +60,8 @@ }) public class BatchDataflowWorker implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(BatchDataflowWorker.class); - - /** A client to get and update work items. */ - private final WorkUnitClient workUnitClient; - - /** - * Pipeline options, initially provided via the constructor and partially provided via each work - * work unit. - */ - private final DataflowWorkerHarnessOptions options; - - /** The factory to create {@link DataflowMapTaskExecutor DataflowMapTaskExecutors}. */ - private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory; - /** The idGenerator to generate unique id globally. */ private static final IdGenerator idGenerator = IdGenerators.decrementingLongs(); - /** * Function which converts map tasks to their network representation for execution. * @@ -90,30 +76,7 @@ public class BatchDataflowWorker implements Closeable { new FixMultiOutputInfosOnParDoInstructions(idGenerator) .andThen(new MapTaskToNetworkFunction(idGenerator)); - /** Registry of known {@link ReaderFactory ReaderFactories}. */ - private final ReaderRegistry readerRegistry = ReaderRegistry.defaultRegistry(); - - /** Registry of known {@link SinkFactory SinkFactories}. */ - private final SinkRegistry sinkRegistry = SinkRegistry.defaultRegistry(); - - /** A side input cache shared between all execution contexts. */ - private final Cache> sideInputDataCache; - - /** - * A side input cache shared between all execution contexts. This cache is meant to store values - * as weak references. This allows for insertion of logical keys with zero weight since they will - * only be scoped to the lifetime of the value being cached. - */ - private final Cache sideInputWeakReferenceCache; - private static final int DEFAULT_STATUS_PORT = 8081; - - /** Status pages returning health of worker. */ - private WorkerStatusPages statusPages; - - /** Periodic sender of debug information to the debug capture service. */ - private DebugCapture.Manager debugCaptureManager = null; - /** * A weight in "bytes" for the overhead of a {@link Weighted} wrapper in the cache. It is just an * approximation so it is OK for it to be fairly arbitrary as long as it is nonzero. @@ -121,33 +84,42 @@ public class BatchDataflowWorker implements Closeable { private static final int OVERHEAD_WEIGHT = 8; private static final long MEGABYTES = 1024 * 1024; - /** * Limit the number of logical references. Weak references may never be cleared if the object is * long lived irrespective if the user actually is interested in the key lookup anymore. */ private static final int MAX_LOGICAL_REFERENCES = 1_000_000; - /** How many concurrent write operations to a cache should we allow. */ private static final int CACHE_CONCURRENCY_LEVEL = 4 * Runtime.getRuntime().availableProcessors(); + /** A client to get and update work items. */ + private final WorkUnitClient workUnitClient; + /** + * Pipeline options, initially provided via the constructor and partially provided via each work + * work unit. + */ + private final DataflowWorkerHarnessOptions options; + /** The factory to create {@link DataflowMapTaskExecutor DataflowMapTaskExecutors}. */ + private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory; + /** Registry of known {@link ReaderFactory ReaderFactories}. */ + private final ReaderRegistry readerRegistry = ReaderRegistry.defaultRegistry(); + /** Registry of known {@link SinkFactory SinkFactories}. */ + private final SinkRegistry sinkRegistry = SinkRegistry.defaultRegistry(); + /** A side input cache shared between all execution contexts. */ + private final Cache> sideInputDataCache; + /** + * A side input cache shared between all execution contexts. This cache is meant to store values + * as weak references. This allows for insertion of logical keys with zero weight since they will + * only be scoped to the lifetime of the value being cached. + */ + private final Cache sideInputWeakReferenceCache; private final Function> mapTaskToNetwork; - private final MemoryMonitor memoryMonitor; private final Thread memoryMonitorThread; - - /** - * Returns a {@link BatchDataflowWorker} configured to execute user functions via intrinsic Java - * execution. - * - *

This is also known as the "legacy" or "pre-portability" approach. It is not yet deprecated - * as there is not a compatible path forward for users. - */ - static BatchDataflowWorker forBatchIntrinsicWorkerHarness( - WorkUnitClient workUnitClient, DataflowWorkerHarnessOptions options) { - return new BatchDataflowWorker( - workUnitClient, IntrinsicMapTaskExecutorFactory.defaultFactory(), options); - } + /** Status pages returning health of worker. */ + private final WorkerStatusPages statusPages; + /** Periodic sender of debug information to the debug capture service. */ + private DebugCapture.Manager debugCaptureManager = null; protected BatchDataflowWorker( WorkUnitClient workUnitClient, @@ -188,6 +160,19 @@ protected BatchDataflowWorker( ExecutionStateSampler.instance().start(); } + /** + * Returns a {@link BatchDataflowWorker} configured to execute user functions via intrinsic Java + * execution. + * + *

This is also known as the "legacy" or "pre-portability" approach. It is not yet deprecated + * as there is not a compatible path forward for users. + */ + static BatchDataflowWorker forBatchIntrinsicWorkerHarness( + WorkUnitClient workUnitClient, DataflowWorkerHarnessOptions options) { + return new BatchDataflowWorker( + workUnitClient, IntrinsicMapTaskExecutorFactory.defaultFactory(), options); + } + private static DebugCapture.Manager initializeAndStartDebugCaptureManager( DataflowWorkerHarnessOptions options, Collection debugCapturePages) { DebugCapture.Manager result = new DebugCapture.Manager(options, debugCapturePages); @@ -215,7 +200,7 @@ private static Thread startMemoryMonitorThread(MemoryMonitor memoryMonitor) { */ public boolean getAndPerformWork() throws IOException { while (true) { - Optional work = workUnitClient.getWorkItem(); + Optional work = Optional.fromJavaUtil(workUnitClient.getWorkItem()); if (work.isPresent()) { WorkItemStatusClient statusProvider = new WorkItemStatusClient(workUnitClient, work.get()); return doWork(work.get(), statusProvider); @@ -243,7 +228,7 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) thr } else if (workItem.getSourceOperationTask() != null) { stageName = workItem.getSourceOperationTask().getStageName(); } else { - throw new RuntimeException("Unknown kind of work item: " + workItem.toString()); + throw new RuntimeException("Unknown kind of work item: " + workItem); } CounterSet counterSet = new CounterSet(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java index bf809cfd0121e..ffa377fd3f828 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java @@ -39,13 +39,13 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC; import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkProgressUpdater; import org.apache.beam.sdk.extensions.gcp.util.Transport; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.joda.time.DateTime; @@ -87,7 +87,7 @@ class DataflowWorkUnitClient implements WorkUnitClient { } /** - * Gets a {@link WorkItem} from the Dataflow service, or returns {@link Optional#absent()} if no + * Gets a {@link WorkItem} from the Dataflow service, or returns {@link Optional#empty()} if no * work was found. * *

If work is returned, the calling thread should call reportWorkItemStatus after completing it @@ -116,11 +116,11 @@ public Optional getWorkItem() throws IOException { if (!workItem.isPresent()) { // Normal case, this means that the response contained no work, i.e. no work is available // at this time. - return Optional.absent(); + return Optional.empty(); } - if (workItem.isPresent() && workItem.get().getId() == null) { - logger.debug("Discarding invalid work item {}", workItem.orNull()); - return Optional.absent(); + if (workItem.get().getId() == null) { + logger.debug("Discarding invalid work item {}", workItem.get()); + return Optional.empty(); } WorkItem work = workItem.get(); @@ -148,7 +148,7 @@ public Optional getWorkItem() throws IOException { /** * Gets a global streaming config {@link WorkItem} from the Dataflow service, or returns {@link - * Optional#absent()} if no work was found. + * Optional#empty()} if no work was found. */ @Override public Optional getGlobalStreamingConfigWorkItem() throws IOException { @@ -158,7 +158,7 @@ public Optional getGlobalStreamingConfigWorkItem() throws IOException /** * Gets a streaming config {@link WorkItem} for the given computation from the Dataflow service, - * or returns {@link Optional#absent()} if no work was found. + * or returns {@link Optional#empty()} if no work was found. */ @Override public Optional getStreamingConfigWorkItem(String computationId) throws IOException { @@ -197,7 +197,7 @@ private Optional getWorkItemInternal( List workItems = response.getWorkItems(); if (workItems == null || workItems.isEmpty()) { // We didn't lease any work. - return Optional.absent(); + return Optional.empty(); } else if (workItems.size() > 1) { throw new IOException( "This version of the SDK expects no more than one work item from the service: " diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java index c3c11716e4c71..82fbcd82c131a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java @@ -21,7 +21,7 @@ import com.google.api.services.dataflow.model.WorkItemServiceState; import com.google.api.services.dataflow.model.WorkItemStatus; import java.io.IOException; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional; +import java.util.Optional; /** Abstract base class describing a client for WorkItem work units. */ interface WorkUnitClient { @@ -31,14 +31,14 @@ interface WorkUnitClient { Optional getWorkItem() throws IOException; /** - * Returns a new global streaming config WorkItem, or returns {@link Optional#absent()} if no work + * Returns a new global streaming config WorkItem, or returns {@link Optional#empty()} if no work * was found. */ Optional getGlobalStreamingConfigWorkItem() throws IOException; /** * Returns a streaming config WorkItem for the given computation, or returns {@link - * Optional#absent()} if no work was found. + * Optional#empty()} if no work was found. */ Optional getStreamingConfigWorkItem(String computationId) throws IOException; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/NameContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/NameContext.java index 6188386a4e67a..d8c3790de6cbf 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/NameContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/NameContext.java @@ -31,7 +31,10 @@ public abstract class NameContext { * systemName} and a {@code userName}. */ public static NameContext create( - String stageName, String originalName, String systemName, String userName) { + String stageName, + @Nullable String originalName, + String systemName, + @Nullable String userName) { return new AutoValue_NameContext(stageName, originalName, systemName, userName); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java index 8d5660548e75b..b4f544129db67 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorkerTest.java @@ -37,11 +37,11 @@ import com.google.api.services.dataflow.model.WorkItemStatus; import java.io.IOException; import java.util.ArrayList; +import java.util.Optional; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.util.TimeUtil; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.FastNanoClockAndSleeper; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.hamcrest.Description; import org.hamcrest.TypeSafeMatcher; @@ -61,16 +61,10 @@ @RunWith(JUnit4.class) public class BatchDataflowWorkerTest { - private static class WorkerException extends Exception {} - @Rule public FastNanoClockAndSleeper clockAndSleeper = new FastNanoClockAndSleeper(); - @Mock WorkUnitClient mockWorkUnitClient; - @Mock DataflowWorkProgressUpdater mockProgressUpdater; - @Mock DataflowWorkExecutor mockWorkExecutor; - DataflowWorkerHarnessOptions options; @Before @@ -98,7 +92,7 @@ public void testWhenNoWorkIsReturnedThatWeImmediatelyRetry() throws Exception { workItem.setReportStatusInterval(TimeUtil.toCloudDuration(Duration.standardMinutes(1))); when(mockWorkUnitClient.getWorkItem()) - .thenReturn(Optional.absent()) + .thenReturn(Optional.empty()) .thenReturn(Optional.of(workItem)); assertTrue(worker.getAndPerformWork()); @@ -138,7 +132,7 @@ public void testWhenProcessingWorkUnitFailsWeReportStatus() throws Exception { Throwable error = errorCaptor.getValue(); assertThat(error, notNullValue()); - assertThat(error.getMessage(), equalTo("Unknown kind of work item: " + workItem.toString())); + assertThat(error.getMessage(), equalTo("Unknown kind of work item: " + workItem)); } @Test @@ -168,8 +162,9 @@ public void testStopProgressReportInCaseOfFailure() throws Exception { @Test public void testIsSplitResponseTooLarge() throws IOException { SourceSplitResponse splitResponse = new SourceSplitResponse(); - splitResponse.setShards( - ImmutableList.of(new SourceSplitShard(), new SourceSplitShard())); + splitResponse.setShards(ImmutableList.of(new SourceSplitShard(), new SourceSplitShard())); assertThat(DataflowApiUtils.computeSerializedSizeBytes(splitResponse), greaterThan(0L)); } + + private static class WorkerException extends Exception {} } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java index e8b5ce8d0df2d..3c63f3cc19d29 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java @@ -34,6 +34,7 @@ import com.google.api.services.dataflow.model.SeqMapTask; import com.google.api.services.dataflow.model.WorkItem; import java.io.IOException; +import java.util.Optional; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC; import org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC; @@ -42,7 +43,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.RestoreSystemProperties; import org.apache.beam.sdk.util.FastNanoClockAndSleeper; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.junit.Before; @@ -61,6 +61,9 @@ @RunWith(JUnit4.class) public class DataflowWorkUnitClientTest { private static final Logger LOG = LoggerFactory.getLogger(DataflowWorkUnitClientTest.class); + private static final String PROJECT_ID = "TEST_PROJECT_ID"; + private static final String JOB_ID = "TEST_JOB_ID"; + private static final String WORKER_ID = "TEST_WORKER_ID"; @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); @Rule public TestRule restoreLogging = new RestoreDataflowLoggingMDC(); @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -69,10 +72,6 @@ public class DataflowWorkUnitClientTest { @Mock private MockLowLevelHttpRequest request; private DataflowWorkerHarnessOptions pipelineOptions; - private static final String PROJECT_ID = "TEST_PROJECT_ID"; - private static final String JOB_ID = "TEST_JOB_ID"; - private static final String WORKER_ID = "TEST_WORKER_ID"; - @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); @@ -104,10 +103,10 @@ public void testCloudServiceCall() throws Exception { .fromString(request.getContentAsString(), LeaseWorkItemRequest.class); assertEquals(WORKER_ID, actualRequest.getWorkerId()); assertEquals( - ImmutableList.of(WORKER_ID, "remote_source", "custom_source"), + ImmutableList.of(WORKER_ID, "remote_source", "custom_source"), actualRequest.getWorkerCapabilities()); assertEquals( - ImmutableList.of("map_task", "seq_map_task", "remote_source_task"), + ImmutableList.of("map_task", "seq_map_task", "remote_source_task"), actualRequest.getWorkItemTypes()); assertEquals("1234", DataflowWorkerLoggingMDC.getWorkId()); } @@ -151,17 +150,17 @@ public void testCloudServiceCallNoWorkPresent() throws Exception { WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); - assertEquals(Optional.absent(), client.getWorkItem()); + assertEquals(Optional.empty(), client.getWorkItem()); LeaseWorkItemRequest actualRequest = Transport.getJsonFactory() .fromString(request.getContentAsString(), LeaseWorkItemRequest.class); assertEquals(WORKER_ID, actualRequest.getWorkerId()); assertEquals( - ImmutableList.of(WORKER_ID, "remote_source", "custom_source"), + ImmutableList.of(WORKER_ID, "remote_source", "custom_source"), actualRequest.getWorkerCapabilities()); assertEquals( - ImmutableList.of("map_task", "seq_map_task", "remote_source_task"), + ImmutableList.of("map_task", "seq_map_task", "remote_source_task"), actualRequest.getWorkItemTypes()); } @@ -175,17 +174,17 @@ public void testCloudServiceCallNoWorkId() throws Exception { WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); - assertEquals(Optional.absent(), client.getWorkItem()); + assertEquals(Optional.empty(), client.getWorkItem()); LeaseWorkItemRequest actualRequest = Transport.getJsonFactory() .fromString(request.getContentAsString(), LeaseWorkItemRequest.class); assertEquals(WORKER_ID, actualRequest.getWorkerId()); assertEquals( - ImmutableList.of(WORKER_ID, "remote_source", "custom_source"), + ImmutableList.of(WORKER_ID, "remote_source", "custom_source"), actualRequest.getWorkerCapabilities()); assertEquals( - ImmutableList.of("map_task", "seq_map_task", "remote_source_task"), + ImmutableList.of("map_task", "seq_map_task", "remote_source_task"), actualRequest.getWorkItemTypes()); } @@ -195,17 +194,17 @@ public void testCloudServiceCallNoWorkItem() throws Exception { WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG); - assertEquals(Optional.absent(), client.getWorkItem()); + assertEquals(Optional.empty(), client.getWorkItem()); LeaseWorkItemRequest actualRequest = Transport.getJsonFactory() .fromString(request.getContentAsString(), LeaseWorkItemRequest.class); assertEquals(WORKER_ID, actualRequest.getWorkerId()); assertEquals( - ImmutableList.of(WORKER_ID, "remote_source", "custom_source"), + ImmutableList.of(WORKER_ID, "remote_source", "custom_source"), actualRequest.getWorkerCapabilities()); assertEquals( - ImmutableList.of("map_task", "seq_map_task", "remote_source_task"), + ImmutableList.of("map_task", "seq_map_task", "remote_source_task"), actualRequest.getWorkItemTypes()); }