Skip to content

Commit

Permalink
fix: serialization issues with Cypher actions (#1778)
Browse files Browse the repository at this point in the history
  • Loading branch information
fbiville authored Aug 9, 2024
1 parent 2b02fb0 commit 3b3ca19
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import com.google.cloud.teleport.v2.neo4j.database.Neo4jConnection;
import com.google.cloud.teleport.v2.neo4j.model.job.ActionContext;
import com.google.cloud.teleport.v2.neo4j.telemetry.Neo4jTelemetry;
import com.google.cloud.teleport.v2.neo4j.utils.SerializableSupplier;
import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -34,7 +34,7 @@ public class CypherActionFn extends DoFn<Integer, Row> {
private static final Logger LOG = LoggerFactory.getLogger(CypherActionFn.class);

private final String cypher;
private final Supplier<Neo4jConnection> connectionProvider;
private final SerializableSupplier<Neo4jConnection> connectionProvider;

private Neo4jConnection directConnect;

Expand All @@ -44,7 +44,7 @@ public CypherActionFn(ActionContext context) {
}

@VisibleForTesting
CypherActionFn(ActionContext context, Supplier<Neo4jConnection> connectionProvider) {
CypherActionFn(ActionContext context, SerializableSupplier<Neo4jConnection> connectionProvider) {
String cypher = context.action.options.get("cypher");
if (StringUtils.isEmpty(cypher)) {
throw new RuntimeException("Options 'cypher' not provided for cypher action transform.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.driver.TransactionConfig;
import org.slf4j.Logger;
Expand All @@ -33,7 +33,8 @@
public class PreloadCypherAction implements PreloadAction {

private static final Logger LOG = LoggerFactory.getLogger(PreloadCypherAction.class);
private final BiFunction<ConnectionParams, String, Neo4jConnection> connectionProvider;
private final SerializableBiFunction<ConnectionParams, String, Neo4jConnection>
connectionProvider;

private String cypher;
private ActionContext context;
Expand All @@ -43,7 +44,8 @@ public PreloadCypherAction() {
}

@VisibleForTesting
PreloadCypherAction(BiFunction<ConnectionParams, String, Neo4jConnection> connectionProvider) {
PreloadCypherAction(
SerializableBiFunction<ConnectionParams, String, Neo4jConnection> connectionProvider) {
this.connectionProvider = connectionProvider;
}

Expand Down

0 comments on commit 3b3ca19

Please sign in to comment.