Skip to content

Commit

Permalink
feat!: Add Spark BigQuery connector options (#1027)
Browse files Browse the repository at this point in the history
  • Loading branch information
rohilla-anuj authored Nov 20, 2024
1 parent 8839d15 commit 6af6cc4
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 23 deletions.
77 changes: 72 additions & 5 deletions python/dataproc_templates/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ This template has been tested with the following versions of the above mentioned
- `es.bq.input.api.key`: API Key for Elasticsearch Authorization
- `es.bq.output.dataset`: BigQuery dataset id (format: Dataset_id)
- `es.bq.output.table`: BigQuery table name (format: Table_name)
- `es.bq.temp.bucket.name`: Temporary bucket for the Spark BigQuery connector
- `es.bq.output.temporarygcsbucket`: The GCS bucket that temporarily holds the data before it is loaded to BigQuery
- `es.bq.output.persistentgcsbucket`: The GCS bucket that holds the data before it is loaded to BigQuery. If informed, the data won't be deleted after write data into BigQuery.

#### Optional Arguments

Expand Down Expand Up @@ -414,9 +415,26 @@ This template has been tested with the following versions of the above mentioned
- `es.bq.flatten.struct.fields`: Flatten the struct fields
- `es.bq.flatten.array.fields`: Flatten the n-D array fields to 1-D array fields, it needs es.bq.flatten.struct.fields option to be passed
- `es.bq.output.mode`: Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append)
- `es.bq.output.bigquerytablelabel`: Used to add labels to the table while writing to a table. Multiple labels can be set.
- `es.bq.output.createdisposition`: Specifies whether the job is allowed to create new tables.
- `es.bq.output.persistentgcspath`: The GCS path that holds the data before it is loaded to BigQuery. Used only with es.bq.output.persistentgcsbucket
- `es.bq.output.datepartition`: The date partition the data is going to be written to.
- `es.bq.output.partitionfield`: If this field is specified, the table is partitioned by this field.
- `es.bq.output.partitionexpirationms`: Number of milliseconds for which to keep the storage for partitions in the table.
- `es.bq.output.partitiontype`: Used to specify Time partitioning. Supported types are: HOUR, DAY, MONTH, YEAR. This option is mandatory for a target table to be Time partitioned. Defaults to DAY if es.bq.output.partitionfield is specified
- `es.bq.output.partitionrangestart`: Used to specify Integer-range partitioning. This option is mandatory for a target table to be Integer-range partitioned. Pass es.bq.output.partitionrangeend and es.bq.output.partitionrangeinterval along with this option.
- `es.bq.output.partitionrangeend`: Used to specify Integer-range partitioning. This option is mandatory for a target table to be Integer-range partitioned. Pass es.bq.output.partitionrangestart and es.bq.output.partitionrangeinterval along with this option.
- `es.bq.output.partitionrangeinterval`: Used to specify Integer-range partitioning. This option is mandatory for a target table to be Integer-range partitioned. Pass es.bq.output.partitionrangestart and es.bq.output.partitionrangeend along with this option.
- `es.bq.output.clusteredfields`: A string of non-repeated, top level columns seperated by comma.
- `es.bq.output.allowfieldaddition`: Adds the ALLOW_FIELD_ADDITION SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true and false. Default to false
- `es.bq.output.allowfieldrelaxation`: Adds the ALLOW_FIELD_RELAXATION SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true and false.
- `es.bq.output.bignumericdefaultprecision`: An alternative default precision for BigNumeric fields, as the BigQuery default is too wide for Spark. Values can be between 1 and 38.
- `es.bq.output.bignumericdefaultscale`: An alternative default scale for BigNumeric fields. Values can be between 0 and 38, and less than bigNumericFieldsPrecision. This default is used only when the field has an unparameterized BigNumeric type.

**Note:** Make sure that either ```es.bq.input.api.key``` or both ```es.bq.input.user``` and ```es.bq.input.password``` is provided. Setting or not setting all three properties at the same time will throw an error.

Pass either ```es.bq.output.temporarygcsbucket``` or ```es.bq.output.persistentgcsbucket```.

## Usage

```
Expand All @@ -429,7 +447,8 @@ usage: main.py [-h]
--es.bq.input.api.key ES.BQ.INPUT.API.KEY
--es.bq.output.dataset ES.BQ.OUTPUT.DATASET
--es.bq.output.table ES.BQ.OUTPUT.TABLE
--es.bq.temp.bucket.name ES.BQ.TEMP.BUCKET.NAME
--es.bq.output.temporarygcsbucket ES.BQ.OUTPUT.TEMPORARYGCSBUCKET
--es.bq.output.persistentgcsbucket ES.BQ.OUTPUT.PERSISTENTGCSBUCKET
--es.bq.output.mode {overwrite,append,ignore,errorifexists}
[--es.bq.input.es.nodes.path.prefix ES.BQ.INPUT.ES.NODES.PATH.PREFIX]
[--es.bq.input.es.query ES.BQ.INPUT.ES.QUERY]
Expand Down Expand Up @@ -477,6 +496,21 @@ usage: main.py [-h]
[--es.bq.input.es.net.proxy.socks.use.system.props ES.BQ.INPUT.ES.NET.PROXY.SOCKS.USE.SYSTEM.PROPS]
[--es.bq.flatten.struct.fields]
[--es.bq.flatten.array.fields]
[--es.bq.output.bigquerytablelabel ES.BQ.OUTPUT.BIGQUERYTABLELABEL]
[--es.bq.output.createdisposition ES.BQ.OUTPUT.CREATEDISPOSITION]
[--es.bq.output.persistentgcspath ES.BQ.OUTPUT.PERSISTENTGCSPATH]
[--es.bq.output.datepartition ES.BQ.OUTPUT.DATEPARTITION]
[--es.bq.output.partitionfield ES.BQ.OUTPUT.PARTITIONFIELD]
[--es.bq.output.partitionexpirationms ES.BQ.OUTPUT.PARTITIONEXPIRATIONMS]
[--es.bq.output.partitiontype ES.BQ.OUTPUT.PARTITIONTYPE]
[--es.bq.output.partitionrangestart ES.BQ.OUTPUT.PARTITIONRANGESTART]
[--es.bq.output.partitionrangeend ES.BQ.OUTPUT.PARTITIONRANGEEND]
[--es.bq.output.partitionrangeinterval ES.BQ.OUTPUT.PARTITIONRANGEINTERVAL]
[--es.bq.output.clusteredfields ES.BQ.OUTPUT.CLUSTEREDFIELDS]
[--es.bq.output.allowfieldaddition ES.BQ.OUTPUT.ALLOWFIELDADDITION]
[--es.bq.output.allowfieldrelaxation ES.BQ.OUTPUT.ALLOWFIELDRELAXATION]
[--es.bq.output.bignumericdefaultprecision ES.BQ.OUTPUT.BIGNUMERICDEFAULTPRECISION]
[--es.bq.output.bignumericdefaultscale ES.BQ.OUTPUT.BIGNUMERICDEFAULTSCALE]
options:
-h, --help show this help message and exit
Expand Down Expand Up @@ -586,12 +620,45 @@ options:
BigQuery Output Dataset Name
--es.bq.output.table ES.BQ.OUTPUT.TABLE
BigQuery Output Table Name
--es.bq.temp.bucket.name ES.BIGQUERY.TEMP.BUCKET.NAME
Spark BigQuery connector temporary bucket
--es.bq.output.mode {overwrite,append,ignore,errorifexists}
BigQuery Output write mode (one of:
append,overwrite,ignore,errorifexists) (Defaults to
append)
--es.bq.output.temporarygcsbucket ES.BQ.OUTPUT.TEMPORARYGCSBUCKET
The GCS bucket that temporarily holds the data before it is loaded to BigQuery
--es.bq.output.persistentgcsbucket ES.BQ.OUTPUT.PERSISTENTGCSBUCKET
The GCS bucket that holds the data before it is loaded to BigQuery. If informed, the data won't be deleted after write data into BigQuery.
--es.bq.output.bigquerytablelabel ES.BQ.OUTPUT.BIGQUERYTABLELABEL
Used to add labels to the table while writing to a table. Multiple labels can be set.
--es.bq.output.createdisposition ES.BQ.OUTPUT.CREATEDISPOSITION
Specifies whether the job is allowed to create new tables.
--es.bq.output.persistentgcspath ES.BQ.OUTPUT.PERSISTENTGCSPATH
The GCS path that holds the data before it is loaded to BigQuery.
Used only with es.bq.output.persistentgcsbucket
--es.bq.output.datepartition ES.BQ.OUTPUT.DATEPARTITION
The date partition the data is going to be written to.
--es.bq.output.partitionfield ES.BQ.OUTPUT.PARTITIONFIELD
If this field is specified, the table is partitioned by this field.
--es.bq.output.partitionexpirationms ES.BQ.OUTPUT.PARTITIONEXPIRATIONMS
Number of milliseconds for which to keep the storage for partitions in the table.
--es.bq.output.partitiontype ES.BQ.OUTPUT.PARTITIONTYPE
Used to specify Time partitioning. Supported types are: HOUR, DAY, MONTH, YEAR. This option is mandatory for a target table to be Time partitioned. Defaults to DAY if es.bq.output.partitionfield is specified
--es.bq.output.partitionrangestart ES.BQ.OUTPUT.PARTITIONRANGESTART
Used to specify Integer-range partitioning. This option is mandatory for a target table to be Integer-range partitioned. Pass es.bq.output.partitionrangeend and es.bq.output.partitionrangeinterval along with this option.
--es.bq.output.partitionrangeend ES.BQ.OUTPUT.PARTITIONRANGEEND
Used to specify Integer-range partitioning. This option is mandatory for a target table to be Integer-range partitioned. Pass es.bq.output.partitionrangestart and es.bq.output.partitionrangeinterval along with this option.
--es.bq.output.partitionrangeinterval ES.BQ.OUTPUT.PARTITIONRANGEINTERVAL
Used to specify Integer-range partitioning. This option is mandatory for a target table to be Integer-range partitioned. Pass es.bq.output.partitionrangestart and es.bq.output.partitionrangeend along with this option.
--es.bq.output.clusteredfields ES.BQ.OUTPUT.CLUSTEREDFIELDS
A string of non-repeated, top level columns seperated by comma.
--es.bq.output.allowfieldaddition ES.BQ.OUTPUT.ALLOWFIELDADDITION
Adds the ALLOW_FIELD_ADDITION SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true and false. Default to false
--es.bq.output.allowfieldrelaxation ES.BQ.OUTPUT.ALLOWFIELDRELAXATION
Adds the ALLOW_FIELD_RELAXATION SchemaUpdateOption to the BigQuery LoadJob. Allowed values are true and false.
--es.bq.output.bignumericdefaultprecision ES.BQ.OUTPUT.BIGNUMERICDEFAULTPRECISION
An alternative default precision for BigNumeric fields, as the BigQuery default is too wide for Spark. Values can be between 1 and 38.
--es.bq.output.bignumericdefaultscale ES.BQ.OUTPUT.BIGNUMERICDEFAULTSCALE
An alternative default scale for BigNumeric fields. Values can be between 0 and 38, and less than bigNumericFieldsPrecision. This default is used only when the field has an unparameterized BigNumeric type.
```

Expand All @@ -612,7 +679,7 @@ export SUBNET=projects/my-project/regions/us-central1/subnetworks/test-subnet
--es.bq.input.password="demo" \
--es.bq.output.dataset="my-project.test_dataset" \
--es.bq.output.table="dummyusers" \
--es.bq.temp.bucket.name="<temp-bq-bucket-name>" \
--es.bq.output.temporarygcsbucket="<temp-bq-bucket-name>" \
--es.bq.output.mode="append"
```
# Elasticsearch To Bigtable
Expand Down
16 changes: 7 additions & 9 deletions python/dataproc_templates/elasticsearch/elasticsearch_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pyspark.sql import SparkSession

from dataproc_templates import BaseTemplate
from dataproc_templates.util.argument_parsing import add_es_spark_connector_options
from dataproc_templates.util.argument_parsing import add_spark_options, add_es_spark_connector_options
from dataproc_templates.util.dataframe_reader_wrappers import ingest_dataframe_from_elasticsearch
from dataproc_templates.util.elasticsearch_transformations import flatten_struct_fields, flatten_array_fields
import dataproc_templates.util.template_constants as constants
Expand Down Expand Up @@ -97,12 +97,6 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
required=True,
help='BigQuery Output Table Name'
)
parser.add_argument(
f'--{constants.ES_BQ_LD_TEMP_BUCKET_NAME}',
dest=constants.ES_BQ_LD_TEMP_BUCKET_NAME,
required=True,
help='Spark BigQuery connector temporary bucket'
)
parser.add_argument(
f'--{constants.ES_BQ_OUTPUT_MODE}',
dest=constants.ES_BQ_OUTPUT_MODE,
Expand All @@ -121,6 +115,8 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
]
)

add_spark_options(parser, constants.get_bq_output_spark_options("es.bq.output."))

known_args: argparse.Namespace
known_args, _ = parser.parse_known_args(args)

Expand Down Expand Up @@ -152,7 +148,6 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:
es_api_key: str = args[constants.ES_BQ_NODE_API_KEY]
flatten_struct = args[constants.ES_BQ_FLATTEN_STRUCT]
flatten_array = args[constants.ES_BQ_FLATTEN_ARRAY]
bq_temp_bucket: str = args[constants.ES_BQ_LD_TEMP_BUCKET_NAME]
output_mode: str = args[constants.ES_BQ_OUTPUT_MODE]
big_query_output_dataset: str = args[constants.ES_BQ_OUTPUT_DATASET]
big_query_output_table: str = args[constants.ES_BQ_OUTPUT_TABLE]
Expand Down Expand Up @@ -180,12 +175,15 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:
if not input_data.head(1):
logger.info("No records in dataframe, Skipping the BigQuery Load")
return

bq_output_constant_options: dict = constants.get_bq_output_spark_options("es.bq.output.")
spark_options = {bq_output_constant_options[k]: v for k, v in args.items() if k in bq_output_constant_options and v}

# Write
input_data.write \
.format(constants.FORMAT_BIGQUERY) \
.option(constants.TABLE, big_query_output_dataset + "." + big_query_output_table) \
.option(constants.ES_BQ_TEMP_BUCKET, bq_temp_bucket) \
.option("enableListInference", True) \
.mode(output_mode) \
.options(**spark_options) \
.save()
Loading

0 comments on commit 6af6cc4

Please sign in to comment.