Skip to content

Commit

Permalink
secret manager integration added (#1806)
Browse files Browse the repository at this point in the history
* secret manager integration added

* added tests

* added tests

---------

Co-authored-by: Aditya Bharadwaj <[email protected]>
  • Loading branch information
bharadwaj-aditya and Aditya Bharadwaj authored Aug 22, 2024
1 parent 9dabee7 commit 8cdf602
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ private static PTransform<PBegin, PCollection<SourceRow>> getReadWithUniformPart
private static DataSourceConfiguration getDataSourceConfiguration(JdbcIOWrapperConfig config) {
DataSourceConfiguration dataSourceConfig =
DataSourceConfiguration.create(new JdbcDataSource(config));
LOG.info("Final DatasourceConfiguration: {}", dataSourceConfig);
return dataSourceConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
public class ShardFileReader {

private static final Logger LOG = LoggerFactory.getLogger(ShardFileReader.class);

private static final Pattern partialPattern = Pattern.compile("projects/.*/secrets/.*");
private static final Pattern fullPattern = Pattern.compile("projects/.*/secrets/.*/versions/.*");
private static final Pattern partialWithSlash = Pattern.compile("projects/.*/secrets/.*/");

private ISecretManagerAccessor secretManagerAccessor;

public ShardFileReader(ISecretManagerAccessor secretManagerAccessor) {
Expand All @@ -59,69 +64,23 @@ public List<Shard> getOrderedShardDetails(String sourceShardsFilePath) {
.setFieldNamingPolicy(FieldNamingPolicy.IDENTITY)
.create()
.fromJson(result, listOfShardObject);
Pattern partialPattern = Pattern.compile("projects/.*/secrets/.*");
Pattern fullPattern = Pattern.compile("projects/.*/secrets/.*/versions/.*");
Pattern partialWithSlash = Pattern.compile("projects/.*/secrets/.*/");

for (Shard shard : shardList) {
LOG.info(" The shard is: {} ", shard);
String secretManagerUri = shard.getSecretManagerUri();
if (secretManagerUri != null && !secretManagerUri.isEmpty()) {
LOG.info(
"Secret Manager will be used to get password for shard {} having secret {}",
shard.getLogicalShardId(),
secretManagerUri);
if (partialPattern.matcher(secretManagerUri).matches()) {
LOG.info(
"The matched secret for shard {} is : {}",
shard.getLogicalShardId(),
secretManagerUri);
if (fullPattern.matcher(secretManagerUri).matches()) {
LOG.info(
"The secret for shard {} is : {}", shard.getLogicalShardId(), secretManagerUri);
shard.setPassword(secretManagerAccessor.getSecret(secretManagerUri));
} else {
// partial match hence get the latest version
String versionToAppend = "versions/latest";
if (partialWithSlash.matcher(secretManagerUri).matches()) {
secretManagerUri += versionToAppend;
} else {
secretManagerUri += "/" + versionToAppend;
}

LOG.info(
"The generated secret for shard {} is : {}",
shard.getLogicalShardId(),
secretManagerUri);
shard.setPassword(secretManagerAccessor.getSecret(secretManagerUri));
}
} else {
LOG.error(
"The secretManagerUri field with value {} for shard {} , specified in file {} does"
+ " not adhere to expected pattern projects/.*/secrets/.*/versions/.*",
secretManagerUri,
String password =
resolvePassword(
sourceShardsFilePath,
shard.getSecretManagerUri(),
shard.getLogicalShardId(),
sourceShardsFilePath);
throw new RuntimeException(
"The secretManagerUri field with value "
+ secretManagerUri
+ " for shard "
+ shard.getLogicalShardId()
+ ", specified in file "
+ sourceShardsFilePath
+ " does not adhere to expected pattern"
+ " projects/.*/secrets/.*/versions/.*");
}
} else {
String password = shard.getPassword();
if (password == null || password.isEmpty()) {
throw new RuntimeException(
"Neither password nor secretManagerUri was found in the shard file "
+ sourceShardsFilePath
+ " for shard "
+ shard.getLogicalShardId());
}
shard.getPassword());
if (password == null || password.isEmpty()) {
throw new RuntimeException(
"Neither password nor secretManagerUri was found in the shard file "
+ sourceShardsFilePath
+ " for shard "
+ shard.getLogicalShardId());
}
shard.setPassword(password);
}

Collections.sort(
Expand All @@ -146,6 +105,55 @@ public int compare(Shard s1, Shard s2) {
}
}

private String resolvePassword(
String sourceShardsFilePath,
String secretManagerUri,
String logicalShardId,
String password) {
if (secretManagerUri != null && !secretManagerUri.isEmpty()) {
LOG.info(
"Secret Manager will be used to get password for shard {} having secret {}",
logicalShardId,
secretManagerUri);
if (partialPattern.matcher(secretManagerUri).matches()) {
LOG.info("The matched secret for shard {} is : {}", logicalShardId, secretManagerUri);
if (fullPattern.matcher(secretManagerUri).matches()) {
LOG.info("The secret for shard {} is : {}", logicalShardId, secretManagerUri);
return secretManagerAccessor.getSecret(secretManagerUri);
} else {
// partial match hence get the latest version
String versionToAppend = "versions/latest";
if (partialWithSlash.matcher(secretManagerUri).matches()) {
secretManagerUri += versionToAppend;
} else {
secretManagerUri += "/" + versionToAppend;
}

LOG.info("The generated secret for shard {} is : {}", logicalShardId, secretManagerUri);
return secretManagerAccessor.getSecret(secretManagerUri);
}
} else {
LOG.error(
"The secretManagerUri field with value {} for shard {} , specified in file {} does"
+ " not adhere to expected pattern projects/.*/secrets/.*/versions/.*",
secretManagerUri,
logicalShardId,
sourceShardsFilePath);
throw new RuntimeException(
"The secretManagerUri field with value "
+ secretManagerUri
+ " for shard "
+ logicalShardId
+ ", specified in file "
+ sourceShardsFilePath
+ " does not adhere to expected pattern"
+ " projects/.*/secrets/.*/versions/.*");
}
}
LOG.info("using plaintext password for shard: {}", logicalShardId);
return password;
}

/**
* Read the sharded migration config and return a list of physical shards.
*
Expand All @@ -171,7 +179,6 @@ public List<Shard> readForwardMigrationShardingConfig(String sourceShardsFilePat
e);
}

// TODO - add secret manager integration
// TODO - create a structure for the shard config and map directly to the object
Type shardConfiguration = new TypeToken<Map>() {}.getType();
Map shardConfigMap =
Expand All @@ -189,13 +196,34 @@ public List<Shard> readForwardMigrationShardingConfig(String sourceShardsFilePat
for (Map dataShard : dataShards) {
List<Map> databases = (List) (dataShard.getOrDefault("databases", new ArrayList<>()));

String host = (String) (dataShard.get("host"));
if (databases.isEmpty()) {
LOG.warn("no databases found for host: {}", host);
throw new RuntimeException("no databases found for host: " + String.valueOf(host));
}

String password =
resolvePassword(
sourceShardsFilePath,
(String) dataShard.get("secretManagerUri"),
host,
(String) dataShard.get("password"));
if (password == null || password.isEmpty()) {
LOG.warn("could not fetch password for host: {}", host);
throw new RuntimeException(
"Neither password nor secretManagerUri was found in the shard file "
+ sourceShardsFilePath
+ " for host "
+ host);
}

Shard shard =
new Shard(
"",
(String) (dataShard.get("host")),
host,
dataShard.getOrDefault("port", 0).toString(),
(String) (dataShard.get("user")),
(String) (dataShard.get("password")),
password,
"",
(String) (dataShard.get("secretManagerUri")));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,41 @@ public void readBulkMigrationShardFile() {

assertEquals(shards, expectedShards);
}

@Test
public void readBulkMigrationShardFileWithSecrets() {
when(secretManagerAccessorMockImpl.getSecret("projects/123/secrets/secretA/versions/latest"))
.thenReturn("secretA");
when(secretManagerAccessorMockImpl.getSecret("projects/123/secrets/secretB/versions/latest"))
.thenReturn("secretB");
ShardFileReader shardFileReader = new ShardFileReader(secretManagerAccessorMockImpl);
List<Shard> shards =
shardFileReader.readForwardMigrationShardingConfig(
"src/test/resources/bulk-migration-shards-secret.json");
Shard shard1 =
new Shard(
"",
"1.1.1.1",
"3306",
"test1",
"secretA",
"",
"projects/123/secrets/secretA/versions/latest");
shard1.getDbNameToLogicalShardIdMap().put("person1", "1-1-1-1-person");
shard1.getDbNameToLogicalShardIdMap().put("person2", "1-1-1-1-person2");
Shard shard2 =
new Shard(
"",
"1.1.1.2",
"3306",
"test1",
"secretB",
"",
"projects/123/secrets/secretB/versions/latest");
shard2.getDbNameToLogicalShardIdMap().put("person1", "1-1-1-2-person");
shard2.getDbNameToLogicalShardIdMap().put("person20", "1-1-1-2-person2");
List<Shard> expectedShards = new ArrayList<>(Arrays.asList(shard1, shard2));

assertEquals(shards, expectedShards);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"configType": "dataflow",
"shardConfigurationBulk": {
"schemaSource": {
"dataShardId": "",
"host": "",
"user": "",
"password": "",
"port": "",
"dbName": ""
},
"dataShards": [
{
"dataShardId": "1-1-1-1",
"host": "1.1.1.1",
"user": "test1",
"secretManagerUri": "projects/123/secrets/secretA/versions/latest",
"port": "3306",
"dbName": "",
"databases": [
{
"dbName": "person1",
"databaseId": "1-1-1-1-person",
"refDataShardId": "1-1-1-1"
},
{
"dbName": "person2",
"databaseId": "1-1-1-1-person2",
"refDataShardId": "1-1-1-1"
}
]
},
{
"dataShardId": "2-2-2-2",
"host": "1.1.1.2",
"user": "test1",
"secretManagerUri": "projects/123/secrets/secretB/versions/latest",
"port": "3306",
"dbName": "",
"databases": [
{
"dbName": "person1",
"databaseId": "1-1-1-2-person",
"refDataShardId": "1-1-1-2"
},
{
"dbName": "person20",
"databaseId": "1-1-1-2-person2",
"refDataShardId": "1-1-1-2"
}
]
}
]
}
}

0 comments on commit 8cdf602

Please sign in to comment.