diff --git a/README.md b/README.md index 9ac17a73d..f8b818284 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,7 @@ Please refer to the [Dataproc Templates (Java - Spark) README](/java) for more i Please refer to the [Dataproc Templates (Python - PySpark) README](/python) for more information * [AzureBlobToBigQuery](/python/dataproc_templates/azure#azure-blob-to-bigquery) * [BigQueryToGCS](/python/dataproc_templates/bigquery#bigquery-to-gcs) (blogpost [link](https://medium.com/google-cloud/moving-data-from-bigquery-to-gcs-using-gcp-dataproc-serverless-and-pyspark-f6481b86bcd1)) +* [BigQueryToMemorystore](/python/dataproc_templates/bigquery#bigquery-to-memorystore) * [CassandraToBigquery](/python/dataproc_templates/cassandra) * [CassandraToGCS](/python/dataproc_templates/cassandra) (blogpost [link](https://medium.com/google-cloud/export-data-from-cassandra-to-google-cloud-storage-using-dataproc-serverless-2569a00e17fe)) * [ElasticsearchToBigQuery](/python/dataproc_templates/elasticsearch#elasticsearch-to-bq) diff --git a/python/.ci/Jenkinsfile b/python/.ci/Jenkinsfile index b8756d594..d09a776db 100644 --- a/python/.ci/Jenkinsfile +++ b/python/.ci/Jenkinsfile @@ -887,6 +887,37 @@ EOF } } } + stage('BigQuery to Memorystore'){ + steps{ + retry(count: stageRetryCount) { + sh ''' + python3.8 -m pip install --user virtualenv + + python3.8 -m venv env + + source env/bin/activate + + export GCS_STAGING_LOCATION="gs://python-dataproc-templates-temp" + export SKIP_BUILD=true + export JARS="gs://vbhatia_test/jars/spark-redis_2.12-3.0.0-jar-with-dependencies.jar" + + cd python + + ./bin/start.sh \ + -- --template=BIGQUERYTOMEMORYSTORE \ + --bigquery.memorystore.input.table=bigquery-public-data.fcc_political_ads.file_history \ + --bigquery.memorystore.output.host=10.0.0.17 \ + --bigquery.memorystore.output.port=6379 \ + --bigquery.memorystore.output.table=file_history \ + --bigquery.memorystore.output.key.column=fileHistoryId \ + --bigquery.memorystore.output.model=hash \ + --bigquery.memorystore.output.mode=overwrite \ + --bigquery.memorystore.output.ttl=3600 \ + --bigquery.memorystore.output.dbnum=0 + ''' + } + } + } } } } diff --git a/python/README.md b/python/README.md index f73c56a45..fab44f1bf 100644 --- a/python/README.md +++ b/python/README.md @@ -6,6 +6,7 @@ - [AzureBlobStorageToBigQuery](/python/dataproc_templates/azure#azure-blob-storage-to-bigquery) - [BigQueryToGCS](/python/dataproc_templates/bigquery#bigquery-to-gcs) (blogpost [link](https://medium.com/google-cloud/moving-data-from-bigquery-to-gcs-using-gcp-dataproc-serverless-and-pyspark-f6481b86bcd1)) +- [BigQueryToMemorystore](/python/dataproc_templates/bigquery#bigquery-to-memorystore) - [CassandraToBigquery](/python/dataproc_templates/cassandra#cassandra-to-bigquery) - [CassandraToGCS](/python/dataproc_templates/cassandra#cassandra-to-gcs) (blogpost [link](https://medium.com/google-cloud/export-data-from-cassandra-to-google-cloud-storage-using-dataproc-serverless-2569a00e17fe)) - [ElasticsearchToBigQuery](/python/dataproc_templates/elasticsearch#elasticsearch-to-bq) (blogpost [link](https://medium.com/@anujrohilla197/exporting-data-from-elasticsearch-to-bigquery-using-pyspark-on-dataproc-serverless-47633f620ce3)) diff --git a/python/dataproc_templates/bigquery/README.md b/python/dataproc_templates/bigquery/README.md index a1d69f143..4faf4a685 100644 --- a/python/dataproc_templates/bigquery/README.md +++ b/python/dataproc_templates/bigquery/README.md @@ -121,3 +121,87 @@ export REGION=us-central1 --bigquery.gcs.output.header=false \ --bigquery.gcs.output.timestampntzformat="yyyy-MM-dd HH:mm:ss" ``` + +## BigQuery to Memorystore + +Template for exporting data from BigQuery to Google Cloud Memorystore (Redis). This template supports writing data using hash and binary persistence model. It also supports specifying ttl for data, key column and automatic schema conversion & creation. + +It uses the [Spark BigQuery connector](https://cloud.google.com/dataproc-serverless/docs/guides/bigquery-connector-spark-example) for reading from BigQuery and [Spark-Redis](https://github.com/RedisLabs/spark-redis) for writing to Redis. + + +## Arguments + +* `bigquery.memorystore.input.table`: BigQuery Input table name (format: `project.dataset.table`) +* `bigquery.memorystore.output.host`: Redis Memorystore host +* `bigquery.memorystore.output.table`: Redis Memorystore target table name +* `bigquery.memorystore.output.key.column`: Redis Memorystore key column for target table + +#### Optional Arguments + +* `bigquery.memorystore.output.port`: Redis Memorystore port. Defaults to 6379 +* `bigquery.memorystore.output.model`: Memorystore persistence model for Dataframe (one of: hash, binary) (Defaults to hash) +* `bigquery.memorystore.output.mode`: Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append) +* `bigquery.memorystore.output.ttl`: Data time to live in seconds. Data doesn't expire if ttl is less than 1 (Defaults to 0) +* `bigquery.memorystore.output.dbnum`: Database / namespace for logical key separation (Defaults to 0) + +## Usage +``` +python main.py --template BIGQUERYTOMEMORYSTORE --help + +usage: main.py [-h] + --bigquery.memorystore.input.table BIGQUERY.MEMORYSTORE.INPUT.TABLE + --bigquery.memorystore.output.host BIGQUERY.MEMORYSTORE.OUTPUT.HOST + --bigquery.memorystore.output.table BIGQUERY.MEMORYSTORE.OUTPUT.TABLE + --bigquery.memorystore.output.key.column BIGQUERY.MEMORYSTORE.OUTPUT.KEY.COLUMN + [--bigquery.memorystore.output.port BIGQUERY.MEMORYSTORE.OUTPUT.PORT] + [--bigquery.memorystore.output.model {hash,binary}] + [--bigquery.memorystore.output.mode {overwrite,append,ignore,errorifexists}] + [--bigquery.memorystore.output.ttl BIGQUERY.MEMORYSTORE.OUTPUT.TTL] + [--bigquery.memorystore.output.dbnum BIGQUERY.MEMORYSTORE.OUTPUT.DBNUM] + +options: +-h, --help show this help message and exit +--bigquery.memorystore.input.table BIGQUERY.MEMORYSTORE.INPUT.TABLE + BigQuery Input table name +--bigquery.memorystore.output.host BIGQUERY.MEMORYSTORE.OUTPUT.HOST + Redis Memorystore host +--bigquery.memorystore.output.table BIGQUERY.MEMORYSTORE.OUTPUT.TABLE + Redis Memorystore target table name +--bigquery.memorystore.output.key.column BIGQUERY.MEMORYSTORE.OUTPUT.KEY.COLUMN + Redis Memorystore key column for target table +--bigquery.memorystore.output.port BIGQUERY.MEMORYSTORE.OUTPUT.PORT + Redis Memorystore port. Defaults to 6379 +--bigquery.memorystore.output.model {hash,binary} + Memorystore persistence model for Dataframe (one of: hash, binary) (Defaults to hash) +--bigquery.memorystore.output.mode {overwrite,append,ignore,errorifexists} + Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append) +--bigquery.memorystore.output.ttl BIGQUERY.MEMORYSTORE.OUTPUT.TTL + Data time to live in seconds. Data doesn't expire if ttl is less than 1 (Defaults to 0) +--bigquery.memorystore.output.dbnum BIGQUERY.MEMORYSTORE.OUTPUT.DBNUM + Database / namespace for logical key separation (Defaults to 0) +``` + +## Example submission + +``` +export GCP_PROJECT=myprojectid +export REGION=us-west1 +export SUBNET=projects/myprojectid/regions/us-west1/subnetworks/mysubnetid +export GCS_STAGING_LOCATION="gs://python-dataproc-templates" +export JARS="gs://mygcsstagingbkt/jars/spark-redis_2.12-3.0.0-jar-with-dependencies.jar" + +./bin/start.sh \ +-- --template=BIGQUERYTOMEMORYSTORE \ +--bigquery.memorystore.input.table=bigquery-public-data.fcc_political_ads.file_history \ +--bigquery.memorystore.output.host=10.0.0.17 \ +--bigquery.memorystore.output.port=6379 \ +--bigquery.memorystore.output.table=file_history \ +--bigquery.memorystore.output.key.column=fileHistoryId \ +--bigquery.memorystore.output.model=hash \ +--bigquery.memorystore.output.mode=overwrite \ +--bigquery.memorystore.output.ttl=360 \ +--bigquery.memorystore.output.dbnum=0 +``` + +## Known limitations +With Spark-Redis, the Hash model does not support nested fields in the DataFrame. Alternatively, you can use the Binary persistence model, which supports nested fields. \ No newline at end of file diff --git a/python/dataproc_templates/bigquery/__init__.py b/python/dataproc_templates/bigquery/__init__.py index 0652bd178..d15789598 100644 --- a/python/dataproc_templates/bigquery/__init__.py +++ b/python/dataproc_templates/bigquery/__init__.py @@ -13,3 +13,4 @@ # limitations under the License. from .bigquery_to_gcs import BigQueryToGCSTemplate +from .bigquery_to_memorystore import BigQueryToMemorystoreTemplate diff --git a/python/dataproc_templates/bigquery/bigquery_to_memorystore.py b/python/dataproc_templates/bigquery/bigquery_to_memorystore.py new file mode 100644 index 000000000..f4fe84a5a --- /dev/null +++ b/python/dataproc_templates/bigquery/bigquery_to_memorystore.py @@ -0,0 +1,168 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, Sequence, Optional, Any +from logging import Logger +import argparse +import pprint + +from pyspark.sql import SparkSession, DataFrameWriter + +from dataproc_templates import BaseTemplate +from dataproc_templates.util.argument_parsing import add_spark_options +from dataproc_templates.util.dataframe_writer_wrappers import persist_dataframe_to_cloud_storage +import dataproc_templates.util.template_constants as constants + + +__all__ = ['BigQueryToMemorystoreTemplate'] + + +class BigQueryToMemorystoreTemplate(BaseTemplate): + """ + Dataproc template implementing exports from BigQuery to Memorystore + """ + + @staticmethod + def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]: + parser: argparse.ArgumentParser = argparse.ArgumentParser() + + parser.add_argument( + f'--{constants.BQ_MEMORYSTORE_INPUT_TABLE}', + dest=constants.BQ_MEMORYSTORE_INPUT_TABLE, + required=True, + help='BigQuery Input table name' + ) + parser.add_argument( + f'--{constants.BQ_MEMORYSTORE_OUTPUT_HOST}', + dest=constants.BQ_MEMORYSTORE_OUTPUT_HOST, + required=True, + help='Redis Memorystore host', + ) + parser.add_argument( + f'--{constants.BQ_MEMORYSTORE_OUTPUT_PORT}', + dest=constants.BQ_MEMORYSTORE_OUTPUT_PORT, + required=False, + default=6379, + help='Redis Memorystore port. Defaults to 6379', + ) + #todo: add defaults + parser.add_argument( + f'--{constants.BQ_MEMORYSTORE_OUTPUT_TABLE}', + dest=constants.BQ_MEMORYSTORE_OUTPUT_TABLE, + required=True, + help='Redis Memorystore target table name', + ) + parser.add_argument( + f'--{constants.BQ_MEMORYSTORE_OUTPUT_KEY_COLUMN}', + dest=constants.BQ_MEMORYSTORE_OUTPUT_KEY_COLUMN, + required=True, + help='Redis Memorystore key column for target table', + ) + parser.add_argument( + f'--{constants.BQ_MEMORYSTORE_OUTPUT_MODEL}', + dest=constants.BQ_MEMORYSTORE_OUTPUT_MODEL, + required=False, + default=constants.BQ_MEMORYSTORE_OUTPUT_MODEL_HASH, + help=( + 'Memorystore persistence model for Dataframe' + '(one of: hash, binary) ' + '(Defaults to hash)' + ), + choices=[ + constants.BQ_MEMORYSTORE_OUTPUT_MODEL_HASH, + constants.BQ_MEMORYSTORE_OUTPUT_MODEL_BINARY + ] + ) + parser.add_argument( + f'--{constants.BQ_MEMORYSTORE_OUTPUT_MODE}', + dest=constants.BQ_MEMORYSTORE_OUTPUT_MODE, + required=False, + default=constants.OUTPUT_MODE_APPEND, + help=( + 'Output write mode ' + '(one of: append,overwrite,ignore,errorifexists) ' + '(Defaults to append)' + ), + choices=[ + constants.OUTPUT_MODE_OVERWRITE, + constants.OUTPUT_MODE_APPEND, + constants.OUTPUT_MODE_IGNORE, + constants.OUTPUT_MODE_ERRORIFEXISTS + ] + ) + parser.add_argument( + f'--{constants.BQ_MEMORYSTORE_OUTPUT_TTL}', + dest=constants.BQ_MEMORYSTORE_OUTPUT_TTL, + required=False, + default=0, + help=( + 'Data time to live in seconds. Data doesn\'t expire if ttl is less than 1' + '(Defaults to 0)' + ) + ) + parser.add_argument( + f'--{constants.BQ_MEMORYSTORE_OUTPUT_DBNUM}', + dest=constants.BQ_MEMORYSTORE_OUTPUT_DBNUM, + required=False, + default=0, + help=( + 'Database / namespace for logical key separation' + '(Defaults to 0)' + ) + ) + + known_args: argparse.Namespace + known_args, _ = parser.parse_known_args(args) + + return vars(known_args) + + def run(self, spark: SparkSession, args: Dict[str, Any]) -> None: + + logger: Logger = self.get_logger(spark=spark) + + # Arguments + input_table: str = args[constants.BQ_MEMORYSTORE_INPUT_TABLE] + + output_table: str = args[constants.BQ_MEMORYSTORE_OUTPUT_TABLE] + host: str = args[constants.BQ_MEMORYSTORE_OUTPUT_HOST] + port: str = args[constants.BQ_MEMORYSTORE_OUTPUT_PORT] + key_column: str = args[constants.BQ_MEMORYSTORE_OUTPUT_KEY_COLUMN] + model: str = args[constants.BQ_MEMORYSTORE_OUTPUT_MODEL] + ttl: int = args[constants.BQ_MEMORYSTORE_OUTPUT_TTL] + dbnum: int = args[constants.BQ_MEMORYSTORE_OUTPUT_DBNUM] + output_mode: str = args[constants.BQ_MEMORYSTORE_OUTPUT_MODE] + + logger.info( + "Starting Bigquery to Memorystore Spark job with parameters:\n" + f"{pprint.pformat(args)}" + ) + + # Read + input_data = spark.read \ + .format(constants.FORMAT_BIGQUERY) \ + .option(constants.TABLE, input_table) \ + .load() + + # Write + input_data.write \ + .format(constants.FORMAT_MEMORYSTORE) \ + .option(constants.TABLE, output_table) \ + .option(constants.MEMORYSTORE_KEY_COLUMN, key_column) \ + .option(constants.MEMORYSTORE_MODEL, model) \ + .option(constants.MEMORYSTORE_HOST, host) \ + .option(constants.MEMORYSTORE_PORT, port) \ + .option(constants.MEMORYSTORE_DBNUM, dbnum) \ + .option(constants.MEMORYSTORE_TTL, ttl) \ + .mode(output_mode) \ + .save() diff --git a/python/dataproc_templates/template_name.py b/python/dataproc_templates/template_name.py index 07ac5be94..f315792b2 100644 --- a/python/dataproc_templates/template_name.py +++ b/python/dataproc_templates/template_name.py @@ -27,6 +27,7 @@ class TemplateName(Enum): GCSTOBIGTABLE = "GCSTOBIGTABLE" GCSTOGCS = "GCSTOGCS" BIGQUERYTOGCS = "BIGQUERYTOGCS" + BIGQUERYTOMEMORYSTORE = "BIGQUERYTOMEMORYSTORE" HIVETOBIGQUERY = "HIVETOBIGQUERY" HIVETOGCS = "HIVETOGCS" TEXTTOBIGQUERY = "TEXTTOBIGQUERY" diff --git a/python/dataproc_templates/util/template_constants.py b/python/dataproc_templates/util/template_constants.py index 7111b1bae..8ae37e6fc 100644 --- a/python/dataproc_templates/util/template_constants.py +++ b/python/dataproc_templates/util/template_constants.py @@ -34,6 +34,7 @@ FORMAT_JDBC = "jdbc" FORMAT_PUBSUBLITE = "pubsublite" FORMAT_REDSHIFT = "io.github.spark_redshift_community.spark.redshift" +FORMAT_MEMORYSTORE = "org.apache.spark.sql.redis" JDBC_URL = "url" JDBC_TABLE = "dbtable" JDBC_QUERY = "query" @@ -89,6 +90,12 @@ FORMAT_MONGO = "com.mongodb.spark.sql.DefaultSource" MONGO_DEFAULT_BATCH_SIZE = 512 MONGO_BATCH_SIZE = "maxBatchSize" +MEMORYSTORE_KEY_COLUMN = "key.column" +MEMORYSTORE_MODEL = "model" +MEMORYSTORE_HOST = "host" +MEMORYSTORE_PORT = "port" +MEMORYSTORE_DBNUM = "dbNum" +MEMORYSTORE_TTL = "ttl" FORMAT_SNOWFLAKE = "snowflake" REDSHIFT_TEMPDIR = "tempdir" REDSHIFT_IAMROLE = "aws_iam_role" @@ -923,3 +930,17 @@ def get_es_spark_connector_input_options(prefix): AZ_BLOB_STORAGE_ACCOUNT = "azure.blob.storage.account" AZ_BLOB_CONTAINER_NAME = "azure.blob.container.name" AZ_BLOB_SAS_TOKEN = "azure.blob.sas.token" + +# BigQuery to Memorystore +BQ_MEMORYSTORE_INPUT_TABLE = "bigquery.memorystore.input.table" +BQ_MEMORYSTORE_OUTPUT_HOST = "bigquery.memorystore.output.host" +BQ_MEMORYSTORE_OUTPUT_PORT = "bigquery.memorystore.output.port" +BQ_MEMORYSTORE_OUTPUT_TABLE = "bigquery.memorystore.output.table" +BQ_MEMORYSTORE_OUTPUT_KEY_COLUMN = "bigquery.memorystore.output.key.column" +BQ_MEMORYSTORE_OUTPUT_MODEL = "bigquery.memorystore.output.model" +BQ_MEMORYSTORE_OUTPUT_MODE = "bigquery.memorystore.output.mode" +BQ_MEMORYSTORE_OUTPUT_TTL = "bigquery.memorystore.output.ttl" +BQ_MEMORYSTORE_OUTPUT_DBNUM = "bigquery.memorystore.output.dbnum" + +BQ_MEMORYSTORE_OUTPUT_MODEL_HASH = "hash" +BQ_MEMORYSTORE_OUTPUT_MODEL_BINARY = "binary" \ No newline at end of file diff --git a/python/main.py b/python/main.py index 676b3357a..36a3912f9 100644 --- a/python/main.py +++ b/python/main.py @@ -29,6 +29,7 @@ from dataproc_templates.gcs.gcs_to_mongo import GCSToMONGOTemplate from dataproc_templates.gcs.gcs_to_bigtable import GCSToBigTableTemplate from dataproc_templates.bigquery.bigquery_to_gcs import BigQueryToGCSTemplate +from dataproc_templates.bigquery.bigquery_to_memorystore import BigQueryToMemorystoreTemplate from dataproc_templates.hive.hive_to_bigquery import HiveToBigQueryTemplate from dataproc_templates.hive.hive_to_gcs import HiveToGCSTemplate from dataproc_templates.gcs.text_to_bigquery import TextToBigQueryTemplate @@ -60,6 +61,7 @@ TemplateName.GCSTOGCS: GCSToGCSTemplate, TemplateName.GCSTOBIGTABLE: GCSToBigTableTemplate, TemplateName.BIGQUERYTOGCS: BigQueryToGCSTemplate, + TemplateName.BIGQUERYTOMEMORYSTORE: BigQueryToMemorystoreTemplate, TemplateName.HIVETOBIGQUERY: HiveToBigQueryTemplate, TemplateName.HIVETOGCS: HiveToGCSTemplate, TemplateName.TEXTTOBIGQUERY: TextToBigQueryTemplate, diff --git a/python/test/bigquery/test_bigquery_to_memorystore.py b/python/test/bigquery/test_bigquery_to_memorystore.py new file mode 100644 index 000000000..1573f6426 --- /dev/null +++ b/python/test/bigquery/test_bigquery_to_memorystore.py @@ -0,0 +1,149 @@ +""" + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +""" + +import mock +import pyspark + +from dataproc_templates.bigquery.bigquery_to_memorystore import BigQueryToMemorystoreTemplate +import dataproc_templates.util.template_constants as constants + + +class TestBigQueryToMemorystoreTemplate: + """ + Test suite for BigQueryToMemorystoreTemplate + """ + + def test_parse_args(self): + """Tests BigQueryToMemorystoreTemplate.parse_args()""" + + bigquery_to_memorystore_template = BigQueryToMemorystoreTemplate() + parsed_args = bigquery_to_memorystore_template.parse_args( + ["--bigquery.memorystore.input.table=projectId:dataset.table", + "--bigquery.memorystore.output.host=10.0.0.1", + "--bigquery.memorystore.output.port=6379", + "--bigquery.memorystore.output.table=output_table", + "--bigquery.memorystore.output.key.column=id", + "--bigquery.memorystore.output.model=hash", + "--bigquery.memorystore.output.mode=overwrite", + "--bigquery.memorystore.output.ttl=100", + "--bigquery.memorystore.output.dbnum=1" + ]) + + assert parsed_args[constants.BQ_MEMORYSTORE_INPUT_TABLE] == "projectId:dataset.table" + assert parsed_args[constants.BQ_MEMORYSTORE_OUTPUT_HOST] == "10.0.0.1" + assert parsed_args[constants.BQ_MEMORYSTORE_OUTPUT_PORT] == "6379" + assert parsed_args[constants.BQ_MEMORYSTORE_OUTPUT_TABLE] == "output_table" + assert parsed_args[constants.BQ_MEMORYSTORE_OUTPUT_KEY_COLUMN] == "id" + assert parsed_args[constants.BQ_MEMORYSTORE_OUTPUT_MODEL] == "hash" + assert parsed_args[constants.BQ_MEMORYSTORE_OUTPUT_MODE] == "overwrite" + assert parsed_args[constants.BQ_MEMORYSTORE_OUTPUT_TTL] == "100" + assert parsed_args[constants.BQ_MEMORYSTORE_OUTPUT_DBNUM] == "1" + + + @mock.patch.object(pyspark.sql, 'SparkSession') + def test_run(self, mock_spark_session): + """Tests BigQueryToMemorystoreTemplate runs""" + + bigquery_to_memorystore_template = BigQueryToMemorystoreTemplate() + mock_parsed_args = bigquery_to_memorystore_template.parse_args( + ["--bigquery.memorystore.input.table=projectId:dataset.table", + "--bigquery.memorystore.output.host=10.0.0.1", + "--bigquery.memorystore.output.port=6379", + "--bigquery.memorystore.output.table=output_table", + "--bigquery.memorystore.output.key.column=id", + "--bigquery.memorystore.output.model=hash", + "--bigquery.memorystore.output.mode=append", + "--bigquery.memorystore.output.ttl=0", + "--bigquery.memorystore.output.dbnum=0" + ]) + + # Correctly mock the chained calls + mock_writer = mock.Mock() # Mock for DataFrameWriter + mock_df = mock.Mock() # Mock for DataFrame + mock_spark_session.read.format.return_value = mock_df + mock_df.option.return_value = mock_df # Chain .option() calls + mock_df.load.return_value = mock_df + mock_df.write.format.return_value = mock_writer + mock_writer.option.return_value = mock_writer # Chain .option() calls + mock_writer.mode.return_value = mock_writer # Chain .mode() call + + + bigquery_to_memorystore_template.run(mock_spark_session, mock_parsed_args) + + mock_spark_session.read \ + .format.assert_called_with(constants.FORMAT_BIGQUERY) + mock_spark_session.read \ + .format() \ + .option.assert_called_with(constants.TABLE, "projectId:dataset.table") + mock_spark_session.read \ + .format() \ + .option() \ + .load.assert_called_with() + + mock_df.write.format.assert_called_with(constants.FORMAT_MEMORYSTORE) + + # Get calls + write_calls = mock_writer.option.call_args_list + # Extract output options from calls + output_options = {call[0][0]: call[0][1] for call in write_calls if len(call[0]) > 1} + + # Assert output options (using stripped keys) + assert output_options['host'] == "10.0.0.1" + assert output_options['port'] == "6379" + assert output_options['table'] == "output_table" + assert output_options['key.column'] == "id" + assert output_options['model'] == "hash" + assert output_options['dbNum'] == "0" + assert output_options['ttl'] == "0" + + mock_writer.mode.assert_called_once_with(constants.OUTPUT_MODE_APPEND) + mock_writer.save.assert_called_once_with() + + @mock.patch.object(pyspark.sql, 'SparkSession') + def test_run_binary_model(self, mock_spark_session): + """Tests BigQueryToMemorystoreTemplate runs with binary model""" + + bigquery_to_memorystore_template = BigQueryToMemorystoreTemplate() + mock_parsed_args = bigquery_to_memorystore_template.parse_args( + ["--bigquery.memorystore.input.table=projectId:dataset.table", + "--bigquery.memorystore.output.host=10.0.0.1", + "--bigquery.memorystore.output.port=6379", + "--bigquery.memorystore.output.table=output_table", + "--bigquery.memorystore.output.key.column=id", + "--bigquery.memorystore.output.model=binary", + "--bigquery.memorystore.output.mode=append", + "--bigquery.memorystore.output.ttl=0", + "--bigquery.memorystore.output.dbnum=0" + ]) + + # Correctly mock the chained calls + mock_writer = mock.Mock() # Mock for DataFrameWriter + mock_df = mock.Mock() # Mock for DataFrame + mock_spark_session.read.format.return_value = mock_df + mock_df.option.return_value = mock_df # Chain .option() calls + mock_df.load.return_value = mock_df + mock_df.write.format.return_value = mock_writer + mock_writer.option.return_value = mock_writer # Chain .option() calls + mock_writer.mode.return_value = mock_writer + + bigquery_to_memorystore_template.run(mock_spark_session, mock_parsed_args) + # Get calls + write_calls = mock_writer.option.call_args_list + # Extract output options from calls + output_options = {call[0][0]: call[0][1] for call in write_calls if len(call[0]) > 1} + + # Assert output options (using stripped keys) + assert output_options['model'] == "binary" \ No newline at end of file