Skip to content

Commit

Permalink
fixed the display issue for impersonateServiceAccount with Java (apac…
Browse files Browse the repository at this point in the history
…he#30283)

* fixed the display issue for impersonateServiceAccount when using Beam Java SDK

* Added the comment to improve how to handle impersonateServiceAccount
  • Loading branch information
liferoad authored Feb 14, 2024
1 parent e4d9098 commit bfe6168
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ public static void main(String[] args) throws Exception {
"%s cannot be main() class with beam_fn_api enabled",
StreamingDataflowWorker.class.getSimpleName());

LOG.debug("Creating StreamingDataflowWorker from options: {}", options);
StreamingDataflowWorker worker = StreamingDataflowWorker.fromOptions(options);

// Use the MetricsLogger container which is used by BigQueryIO to periodically log process-wide
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ public static <T extends DataflowWorkerHarnessOptions> T createFromSystemPropert
options.setWorkerPool(System.getProperty("worker_pool"));
}

// Remove impersonate information from workers
// More details:
// https://cloud.google.com/dataflow/docs/reference/pipeline-options#security_and_networking
if (options.getImpersonateServiceAccount() != null) {
LOG.info(
"Remove the impersonateServiceAccount pipeline option ({}) when starting the Worker harness.",
options.getImpersonateServiceAccount());
options.setImpersonateServiceAccount(null);
}

return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ public List<String> create(PipelineOptions options) {
+ " either a single service account as the impersonator, or a"
+ " comma-separated list of service accounts to create an"
+ " impersonation delegation chain.")
@JsonIgnore
@Nullable
String getImpersonateServiceAccount();

Expand Down
1 change: 1 addition & 0 deletions sdks/java/harness/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
provided project(path: ":sdks:java:core", configuration: "shadow")
provided project(path: ":sdks:java:transform-service:launcher")
provided library.java.avro
provided library.java.jackson_databind
provided library.java.joda_time
provided library.java.slf4j_api
provided library.java.vendored_grpc_1_60_1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
*/
package org.apache.beam.fn.harness;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -107,6 +111,29 @@ private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String des
return apiServiceDescriptorBuilder.build();
}

public static String removeNestedKey(String jsonString, String keyToRemove) throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(jsonString);

removeKeyRecursively(rootNode, keyToRemove);

return mapper.writeValueAsString(rootNode);
}

private static void removeKeyRecursively(JsonNode node, String keyToRemove) {
if (node.isObject()) {
Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
while (iterator.hasNext()) {
Map.Entry<String, JsonNode> field = iterator.next();
if (field.getKey().equals(keyToRemove)) {
iterator.remove(); // Safe removal using Iterator
} else {
removeKeyRecursively(field.getValue(), keyToRemove);
}
}
}
}

public static void main(String[] args) throws Exception {
main(System::getenv);
}
Expand Down Expand Up @@ -144,6 +171,9 @@ public static void main(Function<String, String> environmentVarGetter) throws Ex
}

System.out.format("Pipeline options %s%n", pipelineOptionsJson);
// TODO: https://github.com/apache/beam/issues/30301
pipelineOptionsJson = removeNestedKey(pipelineOptionsJson, "impersonateServiceAccount");

PipelineOptions options = PipelineOptionsTranslation.fromJson(pipelineOptionsJson);

Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
Expand Down

0 comments on commit bfe6168

Please sign in to comment.