Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: BigQuery_To_JDBC #1000

Merged
merged 21 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1daa4a5
addition for BigQuery to JDBC
hhasija Oct 19, 2024
4c997b0
Bogquery to JDBC Java file
hhasija Oct 19, 2024
df6ffc1
Merge branch 'main' of https://github.com/GoogleCloudPlatform/datapro…
hhasija Oct 21, 2024
5ef89e2
Merge branch 'main' into BigQuery-To-JDBC
rajc242 Oct 23, 2024
ec3d642
Merge branch 'main' of https://github.com/GoogleCloudPlatform/datapro…
hhasija Oct 23, 2024
d6ed2ff
fix: enabling two properties for performance optimization
hhasija Oct 23, 2024
385c7a8
fix: adding sparkconf variable and defining properties for optimization
hhasija Oct 23, 2024
2c0921a
fix: enhancing read me file for better understanding
hhasija Oct 23, 2024
6b469ac
Merge remote-tracking branch 'origin/BigQuery-To-JDBC' into BigQuery-…
hhasija Oct 23, 2024
fcf0b0e
fix: providing validation through a separate class BigQueryToJDBCConfig
hhasija Oct 23, 2024
4b00f14
fix: added a new entry in main Read me file
hhasija Oct 23, 2024
3bc6e5d
fix: taking dataset name via parameter
hhasija Oct 24, 2024
6092e32
Merge branch 'main' of https://github.com/GoogleCloudPlatform/datapro…
hhasija Oct 24, 2024
386d6cc
Merge branch 'main' of https://github.com/GoogleCloudPlatform/datapro…
hhasija Oct 25, 2024
503ea96
fix: adding test cases towards jenkins file for integration
hhasija Oct 25, 2024
bc8afe8
chnaging the databasename to avoid any conflicts
hhasija Oct 25, 2024
a87d4c5
correcting the primary key as the database got overwrite in previous run
hhasija Oct 25, 2024
4ab6421
fix: removing not required test cases for performance
hhasija Oct 25, 2024
8c1384b
fix: reverting back all the changes
hhasija Oct 26, 2024
dabad3e
fix: as it is copying files to remove all space and tab related issues
hhasija Oct 26, 2024
5845ea0
fix: adding one integration test for new template BIGQUEYTOJDBC
hhasija Oct 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Google provides this collection of pre-implemented Dataproc templates as a refer
Please refer to the [Dataproc Templates (Java - Spark) README](/java) for more information

* [BigQueryToGCS](java/src/main/java/com/google/cloud/dataproc/templates/bigquery#BigQuery-To-GCS) (blogpost [link](https://medium.com/google-cloud/how-to-migrate-data-from-bigquery-to-cloud-storage-using-dataproc-serverless-java-5ead91ffa47f))
* [BigQueryToJDBC](java/src/main/java/com/google/cloud/dataproc/templates/bigquery#BigQuery-To-JDBC)
* [CassandraToBigQuery](java/src/main/java/com/google/cloud/dataproc/templates/databases#executing-cassandra-to-bigquery-template) (blogpost [link](https://medium.com/google-cloud/migrate-data-from-cassandra-to-bigquery-using-java-and-dataproc-serverless-926110c44413))
* [CassandraToGCS](java/src/main/java/com/google/cloud/dataproc/templates/databases#executing-cassandra-to-gcs-template) (blogpost [link](https://medium.com/google-cloud/migrate-data-from-cassandra-to-gcs-using-java-and-dataproc-serverless-5358ef498f6b))
* [DataplexGCStoBQ](/java/src/main//java/com/google/cloud/dataproc/templates/dataplex#dataplex-gcs-to-bigquery)(blogpost [link](https://medium.com/google-cloud/using-dataproc-serverless-to-migrate-your-dataplex-gcs-data-to-bigquery-1e47bc8de74c))
Expand Down
24 changes: 24 additions & 0 deletions java/.ci/Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,30 @@ EOF
}
}
}
stage('BIGQUERY TO JDBC') {
steps{
retry(count: stageRetryCount) {
sh '''

export JARS="gs://dataproc-templates/jars/mysql-connector-java.jar"
export SKIP_BUILD=true

cd java

bin/start.sh \
-- --template BIGQUERYTOJDBC \
--templateProperty project.id=$GCP_PROJECT \
--templateProperty bigquery.jdbc.input.table="$GCP_PROJECT.dataproc_templates.emp_table" \
--templateProperty bigquery.jdbc.dataset.name='dataproc_templates' \
--templateProperty bigquery.jdbc.url="$TEST_JDBC_URL" \
--templateProperty bigquery.jdbc.output.table='emp_table' \
--templateProperty bigquery.jdbc.batch.size=10 \
--templateProperty bigquery.jdbc.output.driver='com.mysql.jdbc.Driver' \
--templateProperty bigquery.jdbc.output.mode='Overwrite'
'''
}
}
}
stage('JDBC TO SPANNER POSTGRESQL DIALECT'){
steps {
retry(count: stageRetryCount) {
Expand Down
1 change: 1 addition & 0 deletions java/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Please refer to the [Dataproc Templates (Java - Spark) README](java/README.md) for more information.

* [BigQueryToGCS](src/main/java/com/google/cloud/dataproc/templates/bigquery#BigQuery-To-GCS)
rajc242 marked this conversation as resolved.
Show resolved Hide resolved
* [BigQueryToJDBC](src/main/java/com/google/cloud/dataproc/templates/bigquery#BigQuery-To-JDBC)
* [CassandraToBigQuery](src/main/java/com/google/cloud/dataproc/templates/databases#executing-cassandra-to-bigquery-template)
* [CassandraToGCS](src/main/java/com/google/cloud/dataproc/templates/databases#executing-cassandra-to-gcs-template) (blogpost [link](https://medium.com/google-cloud/migrate-data-from-cassandra-to-gcs-using-java-and-dataproc-serverless-5358ef498f6b))
* [DataplexGCStoBQ](/java/src/main//java/com/google/cloud/dataproc/templates/dataplex#dataplex-gcs-to-bigquery) (blogpost [link](https://medium.com/google-cloud/using-dataproc-serverless-to-migrate-your-dataplex-gcs-data-to-bigquery-1e47bc8de74c))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ enum TemplateName {
PUBSUBLITETOBIGTABLE,
KAFKATOBQDSTREAM,
KAFKATOGCSDSTREAM,
MONGOTOBQ
MONGOTOBQ,
BIGQUERYTOJDBC
}

default Properties getProperties() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataproc.templates.bigquery;

import static com.google.cloud.dataproc.templates.util.TemplateConstants.*;

import com.google.cloud.dataproc.templates.BaseTemplate;
import com.google.cloud.dataproc.templates.util.PropertyUtil;
import com.google.cloud.dataproc.templates.util.ValidationUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryToJDBC implements BaseTemplate {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryToJDBC.class);
private final BigQueryToJDBCConfig config;

public BigQueryToJDBC(BigQueryToJDBCConfig config) {
this.config = config;
}

public static BigQueryToJDBC of(String... args) {
BigQueryToJDBCConfig config = BigQueryToJDBCConfig.fromProperties(PropertyUtil.getProperties());
ValidationUtil.validateOrThrow(config);
LOGGER.info("Config loaded\n{}", config);
return new BigQueryToJDBC(config);
}

@Override
public void runTemplate() {

// Create a SparkConf object and set configurations
SparkConf conf =
new SparkConf()
.setAppName("BigQuery to JDBC")
.set("spark.sql.viewsEnabled", "true")
.set("spark.sql.materializationDataset", config.getDatasetName());

// Initialize SparkSession using SparkConf
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

// Set Spark properties for performance optimization
spark.sparkContext().setLogLevel(config.getSparklogLevel());

Dataset<Row> inputData =
spark.read().format(SPARK_READ_FORMAT_BIGQUERY).load(config.getInputTableName());

if (StringUtils.isNotBlank(config.getTempTable())
&& StringUtils.isNotBlank(config.getTempQuery())) {
inputData.createOrReplaceGlobalTempView(config.getTempTable());
inputData = spark.sql(config.getTempQuery());
}

write(inputData);
}

public void write(Dataset<Row> dataset) {
dataset
.write()
.format("jdbc")
.option(JDBCOptions.JDBC_URL(), config.getOutputJDBCURL())
.option(JDBCOptions.JDBC_TABLE_NAME(), config.getOutputTableName())
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE(), config.getOutputBatchSize())
.option(JDBCOptions.JDBC_DRIVER_CLASS(), config.getOutputJDBCDriver())
.mode(config.getOutputSaveMode())
.save();
}

public void validateInput() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataproc.templates.bigquery;

import static com.google.cloud.dataproc.templates.util.TemplateConstants.SPARK_LOG_LEVEL;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.MoreObjects;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.Pattern;
import java.util.Properties;

public class BigQueryToJDBCConfig {

public static final String BQ_JDBC_INPUT_TABLE_NAME = "bigquery.jdbc.input.table";
public static final String BQ_JDBC_OUTPUT_URL = "bigquery.jdbc.url";
public static final String BQ_JDBC_OUTPUT_BATCH_SIZE = "bigquery.jdbc.batch.size";
public static final String BQ_JDBC_OUTPUT_DRIVER = "bigquery.jdbc.output.driver";
public static final String BQ_JDBC_OUTPUT_TABLE_NAME = "bigquery.jdbc.output.table";
public static final String BQ_JDBC_OUTPUT_MODE = "bigquery.jdbc.output.mode";
public static final String BQ_JDBC_DATASET_NAME = "bigquery.jdbc.dataset.name";
public static final String BQ_JDBC_TEMP_TABLE = "bigquery.jdbc.temp.table";
public static final String BQ_JDBC_TEMP_QUERY = "bigquery.jdbc.temp.query";

static final ObjectMapper mapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

@JsonProperty(value = BQ_JDBC_INPUT_TABLE_NAME)
@NotEmpty
private String inputTableName;

@JsonProperty(value = BQ_JDBC_OUTPUT_URL)
@NotEmpty
private String outputJDBCURL;

@JsonProperty(value = BQ_JDBC_OUTPUT_BATCH_SIZE)
@Min(value = 1)
private long outputBatchSize;

@JsonProperty(value = BQ_JDBC_OUTPUT_DRIVER)
@NotEmpty
private String outputJDBCDriver;

@JsonProperty(value = BQ_JDBC_OUTPUT_TABLE_NAME)
@NotEmpty
private String outputTableName;

@JsonProperty(value = BQ_JDBC_DATASET_NAME)
@NotEmpty
private String datasetName;

@JsonProperty(value = BQ_JDBC_TEMP_TABLE)
private String tempTable;

@JsonProperty(value = BQ_JDBC_TEMP_QUERY)
private String tempQuery;

@JsonProperty(value = BQ_JDBC_OUTPUT_MODE)
@NotEmpty
@Pattern(regexp = "Overwrite|ErrorIfExists|Append|Ignore")
private String outputSaveMode;

@JsonProperty(value = SPARK_LOG_LEVEL)
@Pattern(regexp = "ALL|DEBUG|ERROR|FATAL|INFO|OFF|TRACE|WARN")
private String sparklogLevel;

public String getInputTableName() {
return inputTableName;
}

public String getOutputJDBCURL() {
return outputJDBCURL;
}

public long getOutputBatchSize() {
return outputBatchSize;
}

public String getOutputJDBCDriver() {
return outputJDBCDriver;
}

public String getOutputTableName() {
return outputTableName;
}

public String getOutputSaveMode() {
return outputSaveMode;
}

public String getDatasetName() {
return datasetName;
}

public String getTempTable() {
return tempTable;
}

public String getTempQuery() {
return tempQuery;
}

public String getSparklogLevel() {
return sparklogLevel;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("inputTableName", inputTableName)
.add("outputJDBCURL", outputJDBCURL)
.add("outputBatchSize", outputBatchSize)
.add("outputJDBCDriver", outputJDBCDriver)
.add("outputTableName", outputTableName)
.add("outputSaveMode", outputSaveMode)
.add("datasetName", datasetName)
.add("sparklogLevel", sparklogLevel)
.toString();
}

public static BigQueryToJDBCConfig fromProperties(Properties properties) {
return mapper.convertValue(properties, BigQueryToJDBCConfig.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,60 @@ bin/start.sh \
--templateProperty bigquery.gcs.output.format=csv \
--templateProperty bigquery.gcs.output.location=gs://output/csv \
--templateProperty bigquery.gcs.output.mode=Overwrite
```


## BigQuery To JDBC

General Execution:

```
export GCP_PROJECT=<gcp-project-id>
export REGION=<region>
export SUBNET=<subnet>
export GCS_STAGING_LOCATION=<gcs-staging-bucket-folder>
export HISTORY_SERVER_CLUSTER=<history-server>

bin/start.sh \
-- --template BIGQUERYTOJDBC \
--templateProperty project.id=<gcp-project-id> \
--templateProperty bigquery.jdbc.input.table=<projectId:datasetId.tableName> \
rajc242 marked this conversation as resolved.
Show resolved Hide resolved
--templateProperty bigquery.jdbc.output.table=<jdbc table name> \
--templateProperty bigquery.jdbc.url=<JDBC URL> \
--templateProperty bigquery.jdbc.batch.size=<JDBC Batch Size> \
--templateProperty bigquery.jdbc.output.driver=<JDBC Driver> \
--templateProperty bigquery.jdbc.output.mode=<Append|Overwrite|ErrorIfExists|Ignore>
```

### Example Submission:
```
export GCP_PROJECT=myproject
export REGION=us-central1
export SUBNET=projects/myproject/regions/us-central1/subnetworks/default
export GCS_STAGING_LOCATION=gs://staging

Example 1 :-

bin/start.sh \
-- --template BIGQUERYTOJDBC \
--templateProperty project.id=myproject \
--templateProperty bigquery.jdbc.input.table=myproject:myDataset.empTable \
--templateProperty bigquery.jdbc.output.table=targetTable \
--templateProperty bigquery.jdbc.url='jdbc:mysql://IPAddress:portNumber/databaseName?user=user_id&password=PASSWORD' \
--templateProperty bigquery.jdbc.output.driver='com.mysql.jdbc.Driver' \
--templateProperty bigquery.jdbc.output.mode=Append

Example 2 :-

bin/start.sh \
-- --template BIGQUERYTOJDBC \
--templateProperty project.id=myproject \
--templateProperty bigquery.jdbc.input.table=myproject:myDataset.empTable \
--templateProperty bigquery.jdbc.output.table=targetTable \
--templateProperty bigquery.jdbc.url='jdbc:mysql://IPAddress:portNumber/databaseName?user=user_id&password=PASSWORD' \
--templateProperty bigquery.jdbc.batch.size=100 \
--templateProperty bigquery.jdbc.output.driver='com.mysql.jdbc.Driver' \
--templateProperty bigquery.jdbc.output.mode=Append \
--templateProperty bigquery.jdbc.temp.table='temporary_view_name' \
--templateProperty bigquery.jdbc.temp.query='select * from global_temp.temporary_view_name where id>=5'
```
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.cloud.dataproc.templates.BaseTemplate;
import com.google.cloud.dataproc.templates.BaseTemplate.TemplateName;
import com.google.cloud.dataproc.templates.bigquery.BigQueryToGCS;
import com.google.cloud.dataproc.templates.bigquery.BigQueryToJDBC;
import com.google.cloud.dataproc.templates.databases.CassandraToBQ;
import com.google.cloud.dataproc.templates.databases.CassandraToGCS;
import com.google.cloud.dataproc.templates.databases.MongoToBQ;
Expand Down Expand Up @@ -98,6 +99,7 @@ public class DataProcTemplate {
.put(TemplateName.KAFKATOBQ, (args) -> new KafkaToBQ())
.put(TemplateName.KAFKATOGCS, (args) -> new KafkaToGCS())
.put(TemplateName.GCSTOJDBC, GCSToJDBC::of)
.put(TemplateName.BIGQUERYTOJDBC, BigQueryToJDBC::of)
.put(TemplateName.GCSTOSPANNER, GCSToSpanner::of)
.put(TemplateName.GENERAL, GeneralTemplate::of)
.put(TemplateName.DATAPLEXGCSTOBQ, DataplexGCStoBQ::of)
Expand Down
11 changes: 11 additions & 0 deletions java/src/main/resources/template.properties
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,17 @@ gcs.jdbc.output.table=
gcs.jdbc.output.saveMode=ErrorIfExists
gcs.jdbc.output.batchInsertSize=1000

# BigQuery to JDBC
bigquery.jdbc.input.table=
bigquery.jdbc.output.table=
bigquery.jdbc.jdbc.url=
bigquery.jdbc.batch.size=1000
bigquery.jdbc.output.driver=
bigquery.jdbc.output.mode=ErrorIfExists
bigquery.jdbc.dataset.name=
bigquery.jdbc.temp.table=
bigquery.jdbc.temp.query

# GCS to GCS
gcs.gcs.input.location=
gcs.gcs.input.format=
Expand Down
Loading