Skip to content

Commit

Permalink
Merge pull request #1167 from data-integrations/feat/CDAP-19804
Browse files Browse the repository at this point in the history
stop auto-populating reference name for BQ, GCS sink, and spanner sink
  • Loading branch information
mrahanjam authored Oct 21, 2022
2 parents 4cf8516 + f80409e commit 097fd84
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
import io.cdap.cdap.etl.api.engine.sql.BatchSQLEngine;
import io.cdap.cdap.etl.api.validation.ValidationException;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.Constants;
import io.cdap.plugin.common.ReferenceNames;
import io.cdap.plugin.gcp.bigquery.sink.BigQueryMultiSink;
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySink;
import io.cdap.plugin.gcp.bigquery.source.BigQuerySource;
Expand Down Expand Up @@ -364,8 +362,6 @@ public ConnectorSpec generateSpec(ConnectorContext context,
if (definition.getType() != TableDefinition.Type.TABLE) {
properties.put(BigQuerySourceConfig.NAME_ENABLE_QUERYING_VIEWS, "true");
}
properties.put(Constants.Reference.REFERENCE_NAME,
ReferenceNames.cleanseReferenceName(datasetName + "." + tableName));
}
return specBuilder
.addRelatedPlugin(new PluginSpec(BigQuerySource.NAME, BatchSource.PLUGIN_TYPE, properties))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ protected void setConnectorSpec(ConnectorSpecRequest request, ConnectorSpec.Buil
GCSPath gcsPath = GCSPath.from(path);
String referenceName = ReferenceNames.cleanseReferenceName(gcsPath.getBucket() + "." + gcsPath.getName());
sourceProperties.put(Constants.Reference.REFERENCE_NAME, referenceName);
sinkProperties.put(Constants.Reference.REFERENCE_NAME, referenceName);
}
builder.addRelatedPlugin(new PluginSpec(GCSSource.NAME, BatchSource.PLUGIN_TYPE, sourceProperties));
builder.addRelatedPlugin(new PluginSpec(GCSBatchSink.NAME, BatchSink.PLUGIN_TYPE, sinkProperties));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,29 +155,35 @@ public ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest
throws IOException {
SpannerPath path = new SpannerPath(connectorSpecRequest.getPath());
ConnectorSpec.Builder specBuilder = ConnectorSpec.builder();
Map<String, String> properties = new HashMap<>();
properties.put(ConfigUtil.NAME_USE_CONNECTION, "true");
properties.put(ConfigUtil.NAME_CONNECTION, connectorSpecRequest.getConnectionWithMacro());
Map<String, String> sourceProperties = new HashMap<>();
Map<String, String> sinkProperties = new HashMap<>();
sourceProperties.put(ConfigUtil.NAME_USE_CONNECTION, "true");
sinkProperties.put(ConfigUtil.NAME_USE_CONNECTION, "true");
sourceProperties.put(ConfigUtil.NAME_CONNECTION, connectorSpecRequest.getConnectionWithMacro());
sinkProperties.put(ConfigUtil.NAME_CONNECTION, connectorSpecRequest.getConnectionWithMacro());

String instanceName = path.getInstance();
if (instanceName != null) {
properties.put(SpannerSourceConfig.NAME_INSTANCE, instanceName);
sourceProperties.put(SpannerSourceConfig.NAME_INSTANCE, instanceName);
sinkProperties.put(SpannerSourceConfig.NAME_INSTANCE, instanceName);
}
String databaseName = path.getDatabase();
if (databaseName != null) {
properties.put(SpannerSourceConfig.NAME_DATABASE, databaseName);
sourceProperties.put(SpannerSourceConfig.NAME_DATABASE, databaseName);
sinkProperties.put(SpannerSourceConfig.NAME_DATABASE, databaseName);
}
String tableName = path.getTable();
if (tableName != null) {
properties.put(SpannerSourceConfig.NAME_TABLE, tableName);
properties.put(Constants.Reference.REFERENCE_NAME,
sourceProperties.put(SpannerSourceConfig.NAME_TABLE, tableName);
sinkProperties.put(SpannerSourceConfig.NAME_TABLE, tableName);
sourceProperties.put(Constants.Reference.REFERENCE_NAME,
ReferenceNames.cleanseReferenceName(instanceName + "." + databaseName + "." + tableName));
Schema schema = getTableSchema(instanceName, databaseName, tableName, context.getFailureCollector());
specBuilder.setSchema(schema);
}
return specBuilder
.addRelatedPlugin(new PluginSpec(SpannerSource.NAME, BatchSource.PLUGIN_TYPE, properties))
.addRelatedPlugin(new PluginSpec(SpannerSink.NAME, BatchSink.PLUGIN_TYPE, properties))
.addRelatedPlugin(new PluginSpec(SpannerSource.NAME, BatchSource.PLUGIN_TYPE, sourceProperties))
.addRelatedPlugin(new PluginSpec(SpannerSink.NAME, BatchSink.PLUGIN_TYPE, sinkProperties))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public class SpannerSourceConfig extends PluginConfig {
public static final String NAME_SCHEMA = "schema";

@Name(Constants.Reference.REFERENCE_NAME)
@Nullable
@Description("This will be used to uniquely identify this source for lineage, annotating metadata, etc.")
public String referenceName;

Expand Down Expand Up @@ -163,9 +162,7 @@ public boolean autoServiceAccountUnavailable() {
}

public void validate(FailureCollector collector) {
if (!Strings.isNullOrEmpty(referenceName)) {
IdUtils.validateReferenceName(referenceName, collector);
}
IdUtils.validateReferenceName(referenceName, collector);
ConfigUtil.validateConnection(this, useConnection, connection, collector);
Schema schema = getSchema(collector);
if (!containsMacro(NAME_SCHEMA) && schema != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ private void testGenerateSpec(BigQueryConnector connector) throws IOException {
pluginProperties.put("connection", "${conn(connection-id)}");
pluginProperties.put("dataset", dataset);
pluginProperties.put("table", table);
pluginProperties.put("referenceName", dataset + "." + table);
expectedRelatedPlugins.add(new PluginSpec(BigQueryMultiSink.NAME, BatchSink.PLUGIN_TYPE, pluginProperties));
expectedRelatedPlugins.add(new PluginSpec(BigQuerySink.NAME, BatchSink.PLUGIN_TYPE, pluginProperties));
expectedRelatedPlugins.add(new PluginSpec(BigQuerySource.NAME, BigQuerySource.PLUGIN_TYPE, pluginProperties));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,11 @@ private void testGenerateSpec(SpannerConnector connector) throws IOException {
properties.put("table", table);
properties.put("connection", null);
properties.put("useConnection", "true");
properties.put("referenceName", String.format("%s.%s.%s", instance, database, table));
Map<String, String> sourceProperties = new HashMap<>(properties);
sourceProperties.put("referenceName", String.format("%s.%s.%s", instance, database, table));
ConnectorSpec expected = ConnectorSpec.builder()
.setSchema(expectedSchema)
.addRelatedPlugin(new PluginSpec(SpannerSource.NAME, BatchSource.PLUGIN_TYPE, properties))
.addRelatedPlugin(new PluginSpec(SpannerSource.NAME, BatchSource.PLUGIN_TYPE, sourceProperties))
.addRelatedPlugin(new PluginSpec(SpannerSink.NAME, BatchSink.PLUGIN_TYPE, properties))
.build();

Expand Down

0 comments on commit 097fd84

Please sign in to comment.