From a2c6b3187a764b8506232698222e213095b87968 Mon Sep 17 00:00:00 2001 From: Rahul De Date: Sun, 3 Nov 2024 14:52:38 +0000 Subject: [PATCH] [#341] implement custom provider scaffolding --- .../common/ConnectorCredentialsProvider.java | 32 +++++++++++++++---- .../pubsub/kafka/common/ConnectorUtils.java | 1 + .../kafka/sink/CloudPubSubSinkConnector.java | 6 ++++ .../source/CloudPubSubSourceConnector.java | 6 ++++ .../pubsublite/kafka/sink/ConfigDefs.java | 8 ++++- 5 files changed, 45 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java b/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java index 6fccf7fe..a5815d56 100644 --- a/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java +++ b/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.stream.Stream; public class ConnectorCredentialsProvider implements CredentialsProvider { private static final List GCP_SCOPE = @@ -38,17 +39,19 @@ private ConnectorCredentialsProvider(CredentialsProvider impl) { public static ConnectorCredentialsProvider fromConfig(Map config) { String credentialsPath = config.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG).toString(); String credentialsJson = config.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG).toString(); + String credentialsClass = config.get(ConnectorUtils.GCP_CREDENTIALS_CLASS_CONFIG).toString(); + long setOptsCount = Stream.of(credentialsPath, credentialsJson, credentialsClass).filter(s -> !s.isEmpty()).count(); + + if (setOptsCount > 1) { + throw new IllegalArgumentException("More than one of the credentials config are set"); + } + if (!credentialsPath.isEmpty()) { - if (!credentialsJson.isEmpty()) { - throw new IllegalArgumentException( - "May not set both " - + ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG - + " and " - + ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG); - } return ConnectorCredentialsProvider.fromFile(credentialsPath); } else if (!credentialsJson.isEmpty()) { return ConnectorCredentialsProvider.fromJson(credentialsJson); + } else if (!credentialsClass.isEmpty()) { + return ConnectorCredentialsProvider.fromClass(credentialsClass); } else { return ConnectorCredentialsProvider.fromDefault(); } @@ -68,6 +71,21 @@ public static ConnectorCredentialsProvider fromJson(String credentialsJson) { .createScoped(GCP_SCOPE)); } + public static ConnectorCredentialsProvider fromClass(String credentialsClass) { + try { + final Class klass = Class.forName(credentialsClass); + final Object obj = klass.getDeclaredConstructor().newInstance(); + + if (!obj instanceof CredentialsProvider) { + throw new IllegalArgumentException(String.format("Supplied class %s is not a CredentialsProvider", credentialsClass)); + } + + return new ConnectorCredentialsProvider(() -> ((CredentialsProvider) obj).getCredentials()); + } catch (Exception e) { + throw new RuntimeException("Error loading class: " + e); + } + } + public static ConnectorCredentialsProvider fromDefault() { return new ConnectorCredentialsProvider( () -> GoogleCredentials.getApplicationDefault().createScoped(GCP_SCOPE)); diff --git a/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java b/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java index 597ff5ef..e1330b8d 100644 --- a/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java +++ b/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java @@ -33,6 +33,7 @@ public class ConnectorUtils { public static final String CPS_ORDERING_KEY_ATTRIBUTE = "orderingKey"; public static final String GCP_CREDENTIALS_FILE_PATH_CONFIG = "gcp.credentials.file.path"; public static final String GCP_CREDENTIALS_JSON_CONFIG = "gcp.credentials.json"; + public static final String GCP_CREDENTIALS_CLASS_CONFIG = "gcp.credentials.class"; public static final String KAFKA_MESSAGE_CPS_BODY_FIELD = "message"; public static final String KAFKA_TOPIC_ATTRIBUTE = "kafka.topic"; public static final String KAFKA_PARTITION_ATTRIBUTE = "kafka.partition"; diff --git a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java index d9736fe1..d9842e4d 100644 --- a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java +++ b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java @@ -246,6 +246,12 @@ public ConfigDef config() { "", Importance.HIGH, "GCP JSON credentials") + .define( + ConnectorUtils.GCP_CREDENTIALS_CLASS_CONFIG, + Type.STRING, + "", + Importance.HIGH, + "Name of the class for custom credentials provider") .define( ORDERING_KEY_SOURCE, Type.STRING, diff --git a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java index e77831f7..7081c9ce 100644 --- a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java +++ b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java @@ -265,6 +265,12 @@ public ConfigDef config() { "", Importance.HIGH, "GCP JSON credentials") + .define( + ConnectorUtils.GCP_CREDENTIALS_CLASS_CONFIG, + Type.STRING, + "", + Importance.HIGH, + "Name of the class for custom credentials provider") .define( USE_KAFKA_HEADERS, Type.BOOLEAN, diff --git a/src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java b/src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java index 12d59eb9..97543b1e 100644 --- a/src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java +++ b/src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java @@ -62,6 +62,12 @@ static ConfigDef config() { ConfigDef.Type.STRING, "", Importance.HIGH, - "GCP JSON credentials"); + "GCP JSON credentials") + .define( + ConnectorUtils.GCP_CREDENTIALS_CLASS_CONFIG, + Type.STRING, + "", + Importance.HIGH, + "Name of the class for custom credentials provider"); } }