-
Notifications
You must be signed in to change notification settings - Fork 86
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
20 changed files
with
582 additions
and
4 deletions.
There are no files selected for viewing
209 changes: 209 additions & 0 deletions
209
src/e2e-test/features/bigquery/source/BigQuerySqlEngine.feature
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,209 @@ | ||
# Copyright © 2024 Cask Data, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
# use this file except in compliance with the License. You may obtain a copy of | ||
# the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
# License for the specific language governing permissions and limitations under | ||
# the License. | ||
|
||
@BigQuery_Sink | ||
Feature: BigQuery sink - Verification of BigQuery to BigQuery successful data transfer | ||
|
||
@BQ_SOURCE_JOINER_TEST @BQ_SOURCE_JOINER2_TEST @BQ_DELETE_JOIN @BQ_SINK_TEST @EXISTING_BQ_CONNECTION | ||
Scenario:Validate successful records transfer from BigQuery source to BigQuery sink using Join | ||
Given Open Datafusion Project to configure pipeline | ||
When Expand Plugin group in the LHS plugins list: "Source" | ||
When Select plugin: "BigQuery" from the plugins list as: "Source" | ||
When Expand Plugin group in the LHS plugins list: "Analytics" | ||
When Select plugin: "Joiner" from the plugins list as: "Analytics" | ||
Then Navigate to the properties page of plugin: "BigQuery" | ||
Then Click plugin property: "switch-useConnection" | ||
Then Click on the Browse Connections button | ||
Then Select connection: "bqConnectionName" | ||
Then Enter input plugin property: "referenceName" with value: "BQReferenceName" | ||
And Replace input plugin property: "dataset" with value: "dataset" | ||
And Replace input plugin property: "table" with value: "bqSourceTable" | ||
Then Click on the Get Schema button | ||
Then Validate "BigQuery" plugin properties | ||
And Close the Plugin Properties page | ||
When Expand Plugin group in the LHS plugins list: "Source" | ||
When Select plugin: "BigQuery" from the plugins list as: "Source" | ||
When Expand Plugin group in the LHS plugins list: "Sink" | ||
When Select plugin: "BigQuery" from the plugins list as: "Sink" | ||
Then Connect plugins: "BigQuery" and "Joiner" to establish connection | ||
Then Connect plugins: "BigQuery2" and "Joiner" to establish connection | ||
Then Connect plugins: "Joiner" and "BigQuery3" to establish connection | ||
Then Navigate to the properties page of plugin: "BigQuery2" | ||
Then Click plugin property: "useConnection" | ||
Then Click on the Browse Connections button | ||
Then Select connection: "bqConnectionName" | ||
Then Enter input plugin property: "referenceName" with value: "BQRefName" | ||
Then Enter input plugin property: "dataset" with value: "dataset" | ||
Then Enter input plugin property: "table" with value: "bqSourceTable2" | ||
Then Validate "BigQuery2" plugin properties | ||
And Close the Plugin Properties page | ||
Then Navigate to the properties page of plugin: "Joiner" | ||
Then Select radio button plugin property: "conditionType" with value: "basic" | ||
Then Click on the Get Schema button | ||
Then Validate "Joiner" plugin properties | ||
Then Close the Plugin Properties page | ||
Then Navigate to the properties page of plugin: "BigQuery3" | ||
Then Click plugin property: "useConnection" | ||
Then Click on the Browse Connections button | ||
Then Select connection: "bqConnectionName" | ||
Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" | ||
Then Enter input plugin property: "dataset" with value: "dataset" | ||
Then Enter input plugin property: "table" with value: "bqTargetTable" | ||
Then Validate "BigQuery3" plugin properties | ||
Then Close the Plugin Properties page | ||
Then Save the pipeline | ||
Then Preview and run the pipeline | ||
Then Wait till pipeline preview is in running state | ||
Then Open and capture pipeline preview logs | ||
Then Verify the preview run status of pipeline in the logs is "succeeded" | ||
Then Close the pipeline logs | ||
Then Close the preview | ||
Then Deploy the pipeline | ||
Then Click on "Configure" button | ||
Then Click on "Transformation Pushdown" button | ||
Then Click on "Enable Transformation Pushdown" button | ||
Then Enter input plugin property: "dataset" with value: "test_sqlengine" | ||
Then Click on "Advanced" button | ||
Then Click plugin property: "useConnection" | ||
Then Click on the Browse Connections button | ||
Then Select connection: "bqConnectionName" | ||
Then Click on "Save" button | ||
Then Run the Pipeline in Runtime | ||
Then Wait till pipeline is in running state | ||
Then Open and capture logs | ||
Then Close the pipeline logs | ||
Then Verify the pipeline status is "Succeeded" | ||
Then Validate The Data From BQ To BQ With Actual And Expected File for: "bqExpectedFileJoin" | ||
|
||
@BQ_SOURCE_SQLENGINE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION | ||
Scenario:Validate successful records transfer from BigQuery source to BigQuery sink using group by | ||
Given Open Datafusion Project to configure pipeline | ||
When Expand Plugin group in the LHS plugins list: "Source" | ||
When Select plugin: "BigQuery" from the plugins list as: "Source" | ||
When Expand Plugin group in the LHS plugins list: "Sink" | ||
When Select plugin: "BigQuery" from the plugins list as: "Sink" | ||
When Expand Plugin group in the LHS plugins list: "Analytics" | ||
When Select plugin: "Group By" from the plugins list as: "Analytics" | ||
Then Navigate to the properties page of plugin: "BigQuery" | ||
Then Click plugin property: "switch-useConnection" | ||
Then Click on the Browse Connections button | ||
Then Select connection: "bqConnectionName" | ||
Then Enter input plugin property: "referenceName" with value: "BQReferenceName" | ||
And Replace input plugin property: "dataset" with value: "dataset" | ||
And Replace input plugin property: "table" with value: "bqSourceTable" | ||
Then Click on the Get Schema button | ||
Then Validate "BigQuery" plugin properties | ||
And Close the Plugin Properties page | ||
Then Connect plugins: "BigQuery" and "Group By" to establish connection | ||
Then Connect plugins: "Group By" and "BigQuery2" to establish connection | ||
Then Navigate to the properties page of plugin: "Group By" | ||
Then Select dropdown plugin property: "groupByFields" with option value: "groupByValidFirstField" | ||
Then Press Escape Key | ||
Then Select dropdown plugin property: "groupByFields" with option value: "groupByValidSecondField" | ||
Then Press Escape Key | ||
Then Enter GroupBy plugin Fields to be Aggregate "groupByGcsAggregateFields" | ||
Then Click on the Get Schema button | ||
Then Click on the Validate button | ||
Then Close the Plugin Properties page | ||
Then Navigate to the properties page of plugin: "BigQuery2" | ||
Then Click plugin property: "useConnection" | ||
Then Click on the Browse Connections button | ||
Then Select connection: "bqConnectionName" | ||
Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" | ||
Then Enter input plugin property: "dataset" with value: "dataset" | ||
Then Enter input plugin property: "table" with value: "bqTargetTable" | ||
Then Validate "BigQuery" plugin properties | ||
And Close the Plugin Properties page | ||
Then Save the pipeline | ||
Then Preview and run the pipeline | ||
Then Wait till pipeline preview is in running state | ||
Then Open and capture pipeline preview logs | ||
Then Verify the preview run status of pipeline in the logs is "succeeded" | ||
Then Close the pipeline logs | ||
Then Close the preview | ||
Then Deploy the pipeline | ||
Then Click on "Configure" button | ||
Then Click on "Transformation Pushdown" button | ||
Then Click on "Enable Transformation Pushdown" button | ||
Then Enter input plugin property: "dataset" with value: "test_sqlengine" | ||
Then Click on "Advanced" button | ||
Then Click plugin property: "useConnection" | ||
Then Click on the Browse Connections button | ||
Then Select connection: "bqConnectionName" | ||
Then Click on "Save" button | ||
Then Run the Pipeline in Runtime | ||
Then Wait till pipeline is in running state | ||
Then Open and capture logs | ||
Then Close the pipeline logs | ||
Then Verify the pipeline status is "Succeeded" | ||
Then Validate The Data From BQ To BQ With Actual And Expected File for: "groupByTestOutputFile" | ||
|
||
@BQ_SOURCE_SQLENGINE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION | ||
Scenario:Validate successful records transfer from BigQuery source to BigQuery sink using deduplicate | ||
Given Open Datafusion Project to configure pipeline | ||
When Expand Plugin group in the LHS plugins list: "Source" | ||
When Select plugin: "BigQuery" from the plugins list as: "Source" | ||
When Expand Plugin group in the LHS plugins list: "Sink" | ||
When Select plugin: "BigQuery" from the plugins list as: "Sink" | ||
When Expand Plugin group in the LHS plugins list: "Analytics" | ||
When Select plugin: "Deduplicate" from the plugins list as: "Analytics" | ||
Then Navigate to the properties page of plugin: "BigQuery" | ||
Then Click plugin property: "switch-useConnection" | ||
Then Click on the Browse Connections button | ||
Then Select connection: "bqConnectionName" | ||
Then Enter input plugin property: "referenceName" with value: "BQReferenceName" | ||
And Replace input plugin property: "dataset" with value: "dataset" | ||
And Replace input plugin property: "table" with value: "bqSourceTable" | ||
Then Click on the Get Schema button | ||
Then Validate "BigQuery" plugin properties | ||
And Close the Plugin Properties page | ||
Then Connect plugins: "BigQuery" and "Deduplicate" to establish connection | ||
Then Connect plugins: "Deduplicate" and "BigQuery2" to establish connection | ||
Then Navigate to the properties page of plugin: "Deduplicate" | ||
Then Select dropdown plugin property: "uniqueFields" with option value: "DeduplicateValidFirstField" | ||
Then Press Escape Key | ||
Then Click on the Validate button | ||
Then Close the Plugin Properties page | ||
Then Navigate to the properties page of plugin: "BigQuery2" | ||
Then Click plugin property: "useConnection" | ||
Then Click on the Browse Connections button | ||
Then Select connection: "bqConnectionName" | ||
Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" | ||
Then Enter input plugin property: "dataset" with value: "dataset" | ||
Then Enter input plugin property: "table" with value: "bqTargetTable" | ||
Then Validate "BigQuery" plugin properties | ||
And Close the Plugin Properties page | ||
Then Save the pipeline | ||
Then Preview and run the pipeline | ||
Then Wait till pipeline preview is in running state | ||
Then Open and capture pipeline preview logs | ||
Then Verify the preview run status of pipeline in the logs is "succeeded" | ||
Then Close the pipeline logs | ||
Then Close the preview | ||
Then Deploy the pipeline | ||
Then Click on "Configure" button | ||
Then Click on "Transformation Pushdown" button | ||
Then Click on "Enable Transformation Pushdown" button | ||
Then Enter input plugin property: "dataset" with value: "test_sqlengine" | ||
Then Click on "Advanced" button | ||
Then Click plugin property: "useConnection" | ||
Then Click on the Browse Connections button | ||
Then Select connection: "bqConnectionName" | ||
Then Click on "Save" button | ||
Then Run the Pipeline in Runtime | ||
Then Wait till pipeline is in running state | ||
Then Open and capture logs | ||
Then Close the pipeline logs | ||
Then Verify the pipeline status is "Succeeded" | ||
Then Validate The Data From BQ To BQ With Actual And Expected File for: "deduplicateTestOutputFile" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
112 changes: 112 additions & 0 deletions
112
src/e2e-test/java/io/cdap/plugin/bigquery/stepsdesign/ValidationHelperSqlEngine.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
/* | ||
* Copyright © 2024 Cask Data, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
package io.cdap.plugin.bigquery.stepsdesign; | ||
|
||
import com.esotericsoftware.minlog.Log; | ||
import com.google.cloud.bigquery.FieldValueList; | ||
import com.google.cloud.bigquery.TableResult; | ||
import com.google.gson.Gson; | ||
import com.google.gson.JsonElement; | ||
import com.google.gson.JsonObject; | ||
import io.cdap.e2e.utils.BigQueryClient; | ||
import io.cdap.e2e.utils.PluginPropertyUtils; | ||
import io.cucumber.core.logging.Logger; | ||
import io.cucumber.core.logging.LoggerFactory; | ||
|
||
import java.io.BufferedReader; | ||
import java.io.FileReader; | ||
import java.io.IOException; | ||
import java.net.URISyntaxException; | ||
import java.nio.file.Path; | ||
import java.nio.file.Paths; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
/** | ||
* Validation Helper. | ||
*/ | ||
public class ValidationHelperSqlEngine { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(ValidationHelperSqlEngine.class); | ||
static Gson gson = new Gson(); | ||
|
||
/** | ||
* Validates the actual data from a BigQuery table against the expected data from a file. | ||
* | ||
* @param table The name of the BigQuery table to fetch data from | ||
* @param fileName The name of the file containing the expected data | ||
* @return True if the actual data matches the expected data, otherwise false | ||
*/ | ||
public static boolean validateActualDataToExpectedData(String table, String fileName) throws IOException, | ||
InterruptedException, URISyntaxException { | ||
// Initialize maps to store data from BigQuery and file | ||
Map<String, JsonObject> bigQueryMap = new HashMap<>(); | ||
Map<String, JsonObject> fileMap = new HashMap<>(); | ||
// Get the path of the expected file | ||
Path importExpectedFile = Paths.get(ValidationHelperSqlEngine.class.getResource("/" + fileName).toURI()); | ||
|
||
getBigQueryTableData(table, bigQueryMap); | ||
getFileData(importExpectedFile.toString(), fileMap); | ||
|
||
// Compare the data from BigQuery with the data from the file | ||
boolean isMatched = bigQueryMap.equals(fileMap); | ||
|
||
return isMatched; | ||
} | ||
|
||
public static void getFileData(String fileName, Map<String, JsonObject> fileMap) { | ||
try (BufferedReader br = new BufferedReader(new FileReader(fileName))) { | ||
String line; | ||
while ((line = br.readLine()) != null) { | ||
JsonObject json = gson.fromJson(line, JsonObject.class); | ||
if (json.has("id")) { // Check if the JSON object has the "id" key | ||
JsonElement idElement = json.get("id"); | ||
if (idElement.isJsonPrimitive()) { | ||
String idKey = idElement.getAsString(); | ||
fileMap.put(idKey, json); | ||
} else { | ||
Log.error("ID key not found"); | ||
} | ||
} | ||
} | ||
} catch (IOException e) { | ||
System.err.println("Error reading the file: " + e.getMessage()); | ||
} | ||
} | ||
|
||
private static void getBigQueryTableData(String targetTable, Map<String, JsonObject> bigQueryMap) | ||
throws IOException, InterruptedException { | ||
String dataset = PluginPropertyUtils.pluginProp("dataset"); | ||
String projectId = PluginPropertyUtils.pluginProp("projectId"); | ||
String selectQuery = "SELECT TO_JSON(t) FROM `" + projectId + "." + dataset + "." + targetTable + "` AS t"; | ||
TableResult result = BigQueryClient.getQueryResult(selectQuery); | ||
|
||
for (FieldValueList row : result.iterateAll()) { | ||
JsonObject json = gson.fromJson(row.get(0).getStringValue(), JsonObject.class); | ||
if (json.has("id")) { // Check if the JSON object has the "id" key | ||
JsonElement idElement = json.get("id"); | ||
if (idElement.isJsonPrimitive()) { | ||
String idKey = idElement.getAsString(); | ||
bigQueryMap.put(idKey, json); | ||
} else { | ||
LOG.error("Data Mismatched"); | ||
} | ||
} else { | ||
LOG.error("ID Key not found in JSON object"); | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.