From fd25f7e6966f173baddb0cb71350cc5f31a69855 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 20 Sep 2024 13:16:22 -0400 Subject: [PATCH 1/3] Invoke teardown when DoFn throws in portable runners --- .../beam_PostCommit_Java_PVR_Flink_Batch.json | 2 +- ...beam_PostCommit_Java_PVR_Flink_Streaming.json | 2 +- .../beam_PostCommit_Java_PVR_Samza.json | 2 +- ...eam_PostCommit_Java_PVR_Spark3_Streaming.json | 2 +- .../beam_PostCommit_Java_PVR_Spark_Batch.json | 2 +- runners/flink/job-server/flink_job_server.gradle | 1 - runners/google-cloud-dataflow-java/build.gradle | 2 +- runners/samza/job-server/build.gradle | 3 ++- runners/spark/job-server/spark_job_server.gradle | 2 -- .../fn/harness/control/ProcessBundleHandler.java | 16 +++++++++++++++- 10 files changed, 23 insertions(+), 11 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Batch.json b/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Batch.json index b970762c83970..e3d6056a5de96 100644 --- a/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Batch.json +++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Batch.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test" + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Streaming.json index b60f5c4cc3c80..e3d6056a5de96 100644 --- a/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 0 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Java_PVR_Samza.json b/.github/trigger_files/beam_PostCommit_Java_PVR_Samza.json index b60f5c4cc3c80..e3d6056a5de96 100644 --- a/.github/trigger_files/beam_PostCommit_Java_PVR_Samza.json +++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Samza.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 0 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json index b60f5c4cc3c80..e3d6056a5de96 100644 --- a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark3_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 0 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark_Batch.json b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark_Batch.json index b60f5c4cc3c80..e3d6056a5de96 100644 --- a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark_Batch.json +++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark_Batch.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 0 + "modification": 1 } diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 56a58df4fb093..1c610477a4442 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -171,7 +171,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpoi excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage' excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' - excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle' excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index df2270d3b653f..4906d9cf9cb83 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -185,7 +185,7 @@ def commonLegacyExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesGaugeMetrics', 'org.apache.beam.sdk.testing.UsesMultimapState', 'org.apache.beam.sdk.testing.UsesTestStream', - 'org.apache.beam.sdk.testing.UsesParDoLifecycle', + 'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner 'org.apache.beam.sdk.testing.UsesMetricsPusher', 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ] diff --git a/runners/samza/job-server/build.gradle b/runners/samza/job-server/build.gradle index f972f376e5c8c..6fc8db98a4f9c 100644 --- a/runners/samza/job-server/build.gradle +++ b/runners/samza/job-server/build.gradle @@ -90,7 +90,6 @@ def portableValidatesRunnerTask(String name, boolean docker) { excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage' excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' - excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle' excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' @@ -127,6 +126,8 @@ def portableValidatesRunnerTask(String name, boolean docker) { excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testParDoShiftTimestampInvalid' // TODO(https://github.com/apache/beam/issues/21144) excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testParDoShiftTimestampInvalidZeroAllowed' + // TODO(https://github.com/apache/beam/issues/32520) + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionIn*Stateful' // TODO(https://github.com/apache/beam/issues/21145) excludeTestsMatching 'org.apache.beam.sdk.transforms.DeduplicateTest.testEventTime' // TODO(https://github.com/apache/beam/issues/21146) diff --git a/runners/spark/job-server/spark_job_server.gradle b/runners/spark/job-server/spark_job_server.gradle index 6d2d4b2bafbf6..5ed5f4277bf4b 100644 --- a/runners/spark/job-server/spark_job_server.gradle +++ b/runners/spark/job-server/spark_job_server.gradle @@ -118,7 +118,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean docker, excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage' excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery' - excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle' excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' @@ -187,7 +186,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean docker, excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery' excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle' - excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle' excludeCategories 'org.apache.beam.sdk.testing.UsesMapState' excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' excludeCategories 'org.apache.beam.sdk.testing.UsesSetState' diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 0d520dcf7f5c3..809746a157f6c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -596,7 +596,7 @@ public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.Instruction request.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor); return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response); } catch (Exception e) { - // Make sure we clean-up from the active set of bundle processors. + // Make sure we clean up from the active set of bundle processors. bundleProcessorCache.discard(bundleProcessor); throw e; } @@ -1168,6 +1168,19 @@ void discard() { if (this.bundleCache != null) { this.bundleCache.clear(); } + // setupFunction called in createBundleProcessor when BundleProcessorCache.get returns null. + // call teardownFunction here as the BundleProcessor is already removed from cache and isn't + // going to be re-used. + for (ThrowingRunnable teardownFunction : Lists.reverse(this.getTearDownFunctions())) { + try { + teardownFunction.run(); + } catch (Throwable e) { + LOG.error( + "Exceptions are thrown from DoFn.teardown method. Note that it will not fail the" + + " pipeline execution,", + e); + } + } getMetricsEnvironmentStateForBundle().discard(); for (BeamFnDataOutboundAggregator aggregator : getOutboundAggregators().values()) { aggregator.discard(); @@ -1175,6 +1188,7 @@ void discard() { } } + // this is called in cachedBundleProcessors removal listener void shutdown() { for (ThrowingRunnable tearDownFunction : getTearDownFunctions()) { LOG.debug("Tearing down function {}", tearDownFunction); From 1fdd4b8279e19194d82fe883c004b48e0c755b5b Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 23 Sep 2024 14:41:11 -0400 Subject: [PATCH 2/3] update CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 364b1a5fbdef5..5f7706b077f61 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -79,6 +79,7 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners ([#18592](https://github.com/apache/beam/issues/18592), [#31381](https://github.com/apache/beam/issues/31381)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). From 413ffe99d43b54c1d7f0a784d0636dcc918fea85 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 26 Sep 2024 11:20:32 -0400 Subject: [PATCH 3/3] adjusted comment and logging --- .../fn/harness/control/ProcessBundleHandler.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 809746a157f6c..c91d5ba71b89e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -597,6 +597,10 @@ public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.Instruction return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response); } catch (Exception e) { // Make sure we clean up from the active set of bundle processors. + LOG.debug( + "Discard bundleProcessor for {} after exception: {}", + request.getProcessBundle().getProcessBundleDescriptorId(), + e.getMessage()); bundleProcessorCache.discard(bundleProcessor); throw e; } @@ -1168,16 +1172,15 @@ void discard() { if (this.bundleCache != null) { this.bundleCache.clear(); } - // setupFunction called in createBundleProcessor when BundleProcessorCache.get returns null. - // call teardownFunction here as the BundleProcessor is already removed from cache and isn't - // going to be re-used. + // setupFunctions are invoked in createBundleProcessor. Invoke teardownFunction here as the + // BundleProcessor is already removed from cache and won't be re-used. for (ThrowingRunnable teardownFunction : Lists.reverse(this.getTearDownFunctions())) { try { teardownFunction.run(); } catch (Throwable e) { - LOG.error( - "Exceptions are thrown from DoFn.teardown method. Note that it will not fail the" - + " pipeline execution,", + LOG.warn( + "Exceptions are thrown from DoFn.teardown method when trying to discard " + + "ProcessBundleHandler", e); } }