From bfe6168505ca503a15de7e1e9de0741f48a8466d Mon Sep 17 00:00:00 2001 From: liferoad Date: Wed, 14 Feb 2024 18:01:02 -0500 Subject: [PATCH] fixed the display issue for impersonateServiceAccount with Java (#30283) * fixed the display issue for impersonateServiceAccount when using Beam Java SDK * Added the comment to improve how to handle impersonateServiceAccount --- .../worker/StreamingDataflowWorker.java | 1 + .../worker/WorkerPipelineOptionsFactory.java | 10 +++++++ .../extensions/gcp/options/GcpOptions.java | 1 - sdks/java/harness/build.gradle | 1 + .../org/apache/beam/fn/harness/FnHarness.java | 30 +++++++++++++++++++ 5 files changed, 42 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 4d2ef6a03cfe5..06450e60fc053 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -489,6 +489,7 @@ public static void main(String[] args) throws Exception { "%s cannot be main() class with beam_fn_api enabled", StreamingDataflowWorker.class.getSimpleName()); + LOG.debug("Creating StreamingDataflowWorker from options: {}", options); StreamingDataflowWorker worker = StreamingDataflowWorker.fromOptions(options); // Use the MetricsLogger container which is used by BigQueryIO to periodically log process-wide diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java index a3ec8933c3314..c9df5d96eb6ce 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java @@ -80,6 +80,16 @@ public static T createFromSystemPropert options.setWorkerPool(System.getProperty("worker_pool")); } + // Remove impersonate information from workers + // More details: + // https://cloud.google.com/dataflow/docs/reference/pipeline-options#security_and_networking + if (options.getImpersonateServiceAccount() != null) { + LOG.info( + "Remove the impersonateServiceAccount pipeline option ({}) when starting the Worker harness.", + options.getImpersonateServiceAccount()); + options.setImpersonateServiceAccount(null); + } + return options; } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index d4cff72c43f38..3c65f0fa748c7 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -220,7 +220,6 @@ public List create(PipelineOptions options) { + " either a single service account as the impersonator, or a" + " comma-separated list of service accounts to create an" + " impersonation delegation chain.") - @JsonIgnore @Nullable String getImpersonateServiceAccount(); diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle index 3c50f3c8edf25..e1f3d660bcabd 100644 --- a/sdks/java/harness/build.gradle +++ b/sdks/java/harness/build.gradle @@ -31,6 +31,7 @@ dependencies { provided project(path: ":sdks:java:core", configuration: "shadow") provided project(path: ":sdks:java:transform-service:launcher") provided library.java.avro + provided library.java.jackson_databind provided library.java.joda_time provided library.java.slf4j_api provided library.java.vendored_grpc_1_60_1 diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 6d13b3704e167..e22dd6d5b2add 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -17,12 +17,16 @@ */ package org.apache.beam.fn.harness; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; import java.util.EnumMap; +import java.util.Iterator; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -107,6 +111,29 @@ private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String des return apiServiceDescriptorBuilder.build(); } + public static String removeNestedKey(String jsonString, String keyToRemove) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(jsonString); + + removeKeyRecursively(rootNode, keyToRemove); + + return mapper.writeValueAsString(rootNode); + } + + private static void removeKeyRecursively(JsonNode node, String keyToRemove) { + if (node.isObject()) { + Iterator> iterator = node.fields(); + while (iterator.hasNext()) { + Map.Entry field = iterator.next(); + if (field.getKey().equals(keyToRemove)) { + iterator.remove(); // Safe removal using Iterator + } else { + removeKeyRecursively(field.getValue(), keyToRemove); + } + } + } + } + public static void main(String[] args) throws Exception { main(System::getenv); } @@ -144,6 +171,9 @@ public static void main(Function environmentVarGetter) throws Ex } System.out.format("Pipeline options %s%n", pipelineOptionsJson); + // TODO: https://github.com/apache/beam/issues/30301 + pipelineOptionsJson = removeNestedKey(pipelineOptionsJson, "impersonateServiceAccount"); + PipelineOptions options = PipelineOptionsTranslation.fromJson(pipelineOptionsJson); Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =