Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#341] implement custom provider #356

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

public class ConnectorCredentialsProvider implements CredentialsProvider {
private static final List<String> GCP_SCOPE =
Expand All @@ -38,17 +39,19 @@ private ConnectorCredentialsProvider(CredentialsProvider impl) {
public static ConnectorCredentialsProvider fromConfig(Map<String, Object> config) {
String credentialsPath = config.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG).toString();
String credentialsJson = config.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG).toString();
String credentialsClass = config.get(ConnectorUtils.GCP_CREDENTIALS_CLASS_CONFIG).toString();
long setOptsCount = Stream.of(credentialsPath, credentialsJson, credentialsClass).filter(s -> !s.isEmpty()).count();

if (setOptsCount > 1) {
throw new IllegalArgumentException("More than one of the credentials config are set");
}

if (!credentialsPath.isEmpty()) {
if (!credentialsJson.isEmpty()) {
throw new IllegalArgumentException(
"May not set both "
+ ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG
+ " and "
+ ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
}
return ConnectorCredentialsProvider.fromFile(credentialsPath);
} else if (!credentialsJson.isEmpty()) {
return ConnectorCredentialsProvider.fromJson(credentialsJson);
} else if (!credentialsClass.isEmpty()) {
return ConnectorCredentialsProvider.fromClass(credentialsClass);
} else {
return ConnectorCredentialsProvider.fromDefault();
}
Expand All @@ -68,6 +71,21 @@ public static ConnectorCredentialsProvider fromJson(String credentialsJson) {
.createScoped(GCP_SCOPE));
}

public static ConnectorCredentialsProvider fromClass(String credentialsClass) {
try {
final Class<?> klass = Class.forName(credentialsClass);
final Object obj = klass.getDeclaredConstructor().newInstance();

if (!obj instanceof CredentialsProvider) {
throw new IllegalArgumentException(String.format("Supplied class %s is not a CredentialsProvider", credentialsClass));
}

return new ConnectorCredentialsProvider(() -> ((CredentialsProvider) obj).getCredentials());
} catch (Exception e) {
throw new RuntimeException("Error loading class: " + e);
}
}

public static ConnectorCredentialsProvider fromDefault() {
return new ConnectorCredentialsProvider(
() -> GoogleCredentials.getApplicationDefault().createScoped(GCP_SCOPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ConnectorUtils {
public static final String CPS_ORDERING_KEY_ATTRIBUTE = "orderingKey";
public static final String GCP_CREDENTIALS_FILE_PATH_CONFIG = "gcp.credentials.file.path";
public static final String GCP_CREDENTIALS_JSON_CONFIG = "gcp.credentials.json";
public static final String GCP_CREDENTIALS_CLASS_CONFIG = "gcp.credentials.class";
public static final String KAFKA_MESSAGE_CPS_BODY_FIELD = "message";
public static final String KAFKA_TOPIC_ATTRIBUTE = "kafka.topic";
public static final String KAFKA_PARTITION_ATTRIBUTE = "kafka.partition";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ public ConfigDef config() {
"",
Importance.HIGH,
"GCP JSON credentials")
.define(
ConnectorUtils.GCP_CREDENTIALS_CLASS_CONFIG,
Type.STRING,
"",
Importance.HIGH,
"Name of the class for custom credentials provider")
.define(
ORDERING_KEY_SOURCE,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ public ConfigDef config() {
"",
Importance.HIGH,
"GCP JSON credentials")
.define(
ConnectorUtils.GCP_CREDENTIALS_CLASS_CONFIG,
Type.STRING,
"",
Importance.HIGH,
"Name of the class for custom credentials provider")
.define(
USE_KAFKA_HEADERS,
Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ static ConfigDef config() {
ConfigDef.Type.STRING,
"",
Importance.HIGH,
"GCP JSON credentials");
"GCP JSON credentials")
.define(
ConnectorUtils.GCP_CREDENTIALS_CLASS_CONFIG,
Type.STRING,
"",
Importance.HIGH,
"Name of the class for custom credentials provider");
}
}