From 3b3ca195f7f7f75f6c535cd87838fd80bb7f5f27 Mon Sep 17 00:00:00 2001 From: Florent Biville <445792+fbiville@users.noreply.github.com> Date: Fri, 9 Aug 2024 16:39:27 +0200 Subject: [PATCH] fix: serialization issues with Cypher actions (#1778) --- .../v2/neo4j/actions/function/CypherActionFn.java | 6 +++--- .../v2/neo4j/actions/preload/PreloadCypherAction.java | 8 +++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/function/CypherActionFn.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/function/CypherActionFn.java index 9ca77feb30..547b0ab876 100644 --- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/function/CypherActionFn.java +++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/function/CypherActionFn.java @@ -18,9 +18,9 @@ import com.google.cloud.teleport.v2.neo4j.database.Neo4jConnection; import com.google.cloud.teleport.v2.neo4j.model.job.ActionContext; import com.google.cloud.teleport.v2.neo4j.telemetry.Neo4jTelemetry; +import com.google.cloud.teleport.v2.neo4j.utils.SerializableSupplier; import com.google.common.annotations.VisibleForTesting; import java.util.Map; -import java.util.function.Supplier; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.Row; import org.apache.commons.lang3.StringUtils; @@ -34,7 +34,7 @@ public class CypherActionFn extends DoFn { private static final Logger LOG = LoggerFactory.getLogger(CypherActionFn.class); private final String cypher; - private final Supplier connectionProvider; + private final SerializableSupplier connectionProvider; private Neo4jConnection directConnect; @@ -44,7 +44,7 @@ public CypherActionFn(ActionContext context) { } @VisibleForTesting - CypherActionFn(ActionContext context, Supplier connectionProvider) { + CypherActionFn(ActionContext context, SerializableSupplier connectionProvider) { String cypher = context.action.options.get("cypher"); if (StringUtils.isEmpty(cypher)) { throw new RuntimeException("Options 'cypher' not provided for cypher action transform."); diff --git a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadCypherAction.java b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadCypherAction.java index a254a3dec8..83e67c9cc8 100644 --- a/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadCypherAction.java +++ b/v2/googlecloud-to-neo4j/src/main/java/com/google/cloud/teleport/v2/neo4j/actions/preload/PreloadCypherAction.java @@ -23,7 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.List; import java.util.Map; -import java.util.function.BiFunction; +import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.commons.lang3.StringUtils; import org.neo4j.driver.TransactionConfig; import org.slf4j.Logger; @@ -33,7 +33,8 @@ public class PreloadCypherAction implements PreloadAction { private static final Logger LOG = LoggerFactory.getLogger(PreloadCypherAction.class); - private final BiFunction connectionProvider; + private final SerializableBiFunction + connectionProvider; private String cypher; private ActionContext context; @@ -43,7 +44,8 @@ public PreloadCypherAction() { } @VisibleForTesting - PreloadCypherAction(BiFunction connectionProvider) { + PreloadCypherAction( + SerializableBiFunction connectionProvider) { this.connectionProvider = connectionProvider; }