diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnector.java b/src/main/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnector.java index 6a66068082..17c13a73bb 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnector.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnector.java @@ -54,8 +54,6 @@ import io.cdap.cdap.etl.api.engine.sql.BatchSQLEngine; import io.cdap.cdap.etl.api.validation.ValidationException; import io.cdap.plugin.common.ConfigUtil; -import io.cdap.plugin.common.Constants; -import io.cdap.plugin.common.ReferenceNames; import io.cdap.plugin.gcp.bigquery.sink.BigQueryMultiSink; import io.cdap.plugin.gcp.bigquery.sink.BigQuerySink; import io.cdap.plugin.gcp.bigquery.source.BigQuerySource; @@ -364,8 +362,6 @@ public ConnectorSpec generateSpec(ConnectorContext context, if (definition.getType() != TableDefinition.Type.TABLE) { properties.put(BigQuerySourceConfig.NAME_ENABLE_QUERYING_VIEWS, "true"); } - properties.put(Constants.Reference.REFERENCE_NAME, - ReferenceNames.cleanseReferenceName(datasetName + "." + tableName)); } return specBuilder .addRelatedPlugin(new PluginSpec(BigQuerySource.NAME, BatchSource.PLUGIN_TYPE, properties)) diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/connector/GCSConnector.java b/src/main/java/io/cdap/plugin/gcp/gcs/connector/GCSConnector.java index 7b35d0e8bd..1611c2b2a1 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/connector/GCSConnector.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/connector/GCSConnector.java @@ -173,7 +173,6 @@ protected void setConnectorSpec(ConnectorSpecRequest request, ConnectorSpec.Buil GCSPath gcsPath = GCSPath.from(path); String referenceName = ReferenceNames.cleanseReferenceName(gcsPath.getBucket() + "." + gcsPath.getName()); sourceProperties.put(Constants.Reference.REFERENCE_NAME, referenceName); - sinkProperties.put(Constants.Reference.REFERENCE_NAME, referenceName); } builder.addRelatedPlugin(new PluginSpec(GCSSource.NAME, BatchSource.PLUGIN_TYPE, sourceProperties)); builder.addRelatedPlugin(new PluginSpec(GCSBatchSink.NAME, BatchSink.PLUGIN_TYPE, sinkProperties)); diff --git a/src/main/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnector.java b/src/main/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnector.java index 926f9f54e8..cd4baf3aa5 100644 --- a/src/main/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnector.java +++ b/src/main/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnector.java @@ -155,29 +155,35 @@ public ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest throws IOException { SpannerPath path = new SpannerPath(connectorSpecRequest.getPath()); ConnectorSpec.Builder specBuilder = ConnectorSpec.builder(); - Map properties = new HashMap<>(); - properties.put(ConfigUtil.NAME_USE_CONNECTION, "true"); - properties.put(ConfigUtil.NAME_CONNECTION, connectorSpecRequest.getConnectionWithMacro()); + Map sourceProperties = new HashMap<>(); + Map sinkProperties = new HashMap<>(); + sourceProperties.put(ConfigUtil.NAME_USE_CONNECTION, "true"); + sinkProperties.put(ConfigUtil.NAME_USE_CONNECTION, "true"); + sourceProperties.put(ConfigUtil.NAME_CONNECTION, connectorSpecRequest.getConnectionWithMacro()); + sinkProperties.put(ConfigUtil.NAME_CONNECTION, connectorSpecRequest.getConnectionWithMacro()); String instanceName = path.getInstance(); if (instanceName != null) { - properties.put(SpannerSourceConfig.NAME_INSTANCE, instanceName); + sourceProperties.put(SpannerSourceConfig.NAME_INSTANCE, instanceName); + sinkProperties.put(SpannerSourceConfig.NAME_INSTANCE, instanceName); } String databaseName = path.getDatabase(); if (databaseName != null) { - properties.put(SpannerSourceConfig.NAME_DATABASE, databaseName); + sourceProperties.put(SpannerSourceConfig.NAME_DATABASE, databaseName); + sinkProperties.put(SpannerSourceConfig.NAME_DATABASE, databaseName); } String tableName = path.getTable(); if (tableName != null) { - properties.put(SpannerSourceConfig.NAME_TABLE, tableName); - properties.put(Constants.Reference.REFERENCE_NAME, + sourceProperties.put(SpannerSourceConfig.NAME_TABLE, tableName); + sinkProperties.put(SpannerSourceConfig.NAME_TABLE, tableName); + sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(instanceName + "." + databaseName + "." + tableName)); Schema schema = getTableSchema(instanceName, databaseName, tableName, context.getFailureCollector()); specBuilder.setSchema(schema); } return specBuilder - .addRelatedPlugin(new PluginSpec(SpannerSource.NAME, BatchSource.PLUGIN_TYPE, properties)) - .addRelatedPlugin(new PluginSpec(SpannerSink.NAME, BatchSink.PLUGIN_TYPE, properties)) + .addRelatedPlugin(new PluginSpec(SpannerSource.NAME, BatchSource.PLUGIN_TYPE, sourceProperties)) + .addRelatedPlugin(new PluginSpec(SpannerSink.NAME, BatchSink.PLUGIN_TYPE, sinkProperties)) .build(); } diff --git a/src/main/java/io/cdap/plugin/gcp/spanner/source/SpannerSourceConfig.java b/src/main/java/io/cdap/plugin/gcp/spanner/source/SpannerSourceConfig.java index bfb6271a41..b2f3e00b09 100644 --- a/src/main/java/io/cdap/plugin/gcp/spanner/source/SpannerSourceConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/spanner/source/SpannerSourceConfig.java @@ -54,7 +54,6 @@ public class SpannerSourceConfig extends PluginConfig { public static final String NAME_SCHEMA = "schema"; @Name(Constants.Reference.REFERENCE_NAME) - @Nullable @Description("This will be used to uniquely identify this source for lineage, annotating metadata, etc.") public String referenceName; @@ -163,9 +162,7 @@ public boolean autoServiceAccountUnavailable() { } public void validate(FailureCollector collector) { - if (!Strings.isNullOrEmpty(referenceName)) { - IdUtils.validateReferenceName(referenceName, collector); - } + IdUtils.validateReferenceName(referenceName, collector); ConfigUtil.validateConnection(this, useConnection, connection, collector); Schema schema = getSchema(collector); if (!containsMacro(NAME_SCHEMA) && schema != null) { diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnectorTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnectorTest.java index d74bdadd81..1b6013e033 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnectorTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnectorTest.java @@ -171,7 +171,6 @@ private void testGenerateSpec(BigQueryConnector connector) throws IOException { pluginProperties.put("connection", "${conn(connection-id)}"); pluginProperties.put("dataset", dataset); pluginProperties.put("table", table); - pluginProperties.put("referenceName", dataset + "." + table); expectedRelatedPlugins.add(new PluginSpec(BigQueryMultiSink.NAME, BatchSink.PLUGIN_TYPE, pluginProperties)); expectedRelatedPlugins.add(new PluginSpec(BigQuerySink.NAME, BatchSink.PLUGIN_TYPE, pluginProperties)); expectedRelatedPlugins.add(new PluginSpec(BigQuerySource.NAME, BigQuerySource.PLUGIN_TYPE, pluginProperties)); diff --git a/src/test/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnectorTest.java b/src/test/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnectorTest.java index 951b2013e1..b277a69a34 100644 --- a/src/test/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnectorTest.java +++ b/src/test/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnectorTest.java @@ -160,10 +160,11 @@ private void testGenerateSpec(SpannerConnector connector) throws IOException { properties.put("table", table); properties.put("connection", null); properties.put("useConnection", "true"); - properties.put("referenceName", String.format("%s.%s.%s", instance, database, table)); + Map sourceProperties = new HashMap<>(properties); + sourceProperties.put("referenceName", String.format("%s.%s.%s", instance, database, table)); ConnectorSpec expected = ConnectorSpec.builder() .setSchema(expectedSchema) - .addRelatedPlugin(new PluginSpec(SpannerSource.NAME, BatchSource.PLUGIN_TYPE, properties)) + .addRelatedPlugin(new PluginSpec(SpannerSource.NAME, BatchSource.PLUGIN_TYPE, sourceProperties)) .addRelatedPlugin(new PluginSpec(SpannerSink.NAME, BatchSink.PLUGIN_TYPE, properties)) .build();