Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Use spark bigtable connector instead of hbase connector #1004

Merged
merged 11 commits into from
Oct 24, 2024
60 changes: 0 additions & 60 deletions python/dataproc_templates/elasticsearch/Dockerfile

This file was deleted.

94 changes: 35 additions & 59 deletions python/dataproc_templates/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -614,43 +614,22 @@ export SUBNET=projects/my-project/regions/us-central1/subnetworks/test-subnet

Template for exporting an Elasticsearch Index to a BigTable table.

It uses the Apache HBase Spark Connector to write to Bigtable.
It uses the Spark BigTable Connector to write to Bigtable.

This [tutorial](https://cloud.google.com/dataproc/docs/tutorials/spark-hbase#dataproc_hbase_tutorial_view_code-python) shows how to run a Spark/PySpark job connecting to Bigtable.
However, it focuses in running the job using a Dataproc cluster, and not Dataproc Serverless.
Here in this template, you will notice that there are different configuration steps for the PySpark job to successfully run using Dataproc Serverless, connecting to Bigtable using the HBase interface.
Here in this template, you will notice that there are different configuration steps for the PySpark job to successfully run using Dataproc Serverless, connecting to Bigtable using Bigtable connector.

You can also check out the [differences between HBase and Cloud Bigtable](https://cloud.google.com/bigtable/docs/hbase-differences).

## Requirements

1) Configure the [hbase-site.xml](./hbase-site.xml) ([reference](https://cloud.google.com/bigtable/docs/hbase-connecting#creating_the_hbase-sitexml_file)) with your Bigtable instance reference
- The hbase-site.xml needs to be available in some path of the container image used by Dataproc Serverless.
- For that, you need to build and host a [customer container image](https://cloud.google.com/dataproc-serverless/docs/guides/custom-containers#submit_a_spark_batch_workload_using_a_custom_container_image) in GCP Container Registry.
- Add the following layer to the [Dockerfile](./Dockerfile), for it to copy your local hbase-site.xml to the container image (already done):
```
COPY hbase-site.xml /etc/hbase/conf/
```
- Build the [Dockerfile](./Dockerfile), building and pushing it to GCP Container Registry with:
```
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
IMAGE=us-central1-docker.pkg.dev/<your_project>/<repository>/<your_custom_image>:<your_version>
docker build --platform linux/amd64 -t "${IMAGE}" .
docker push "${IMAGE}"
```
- An SPARK_EXTRA_CLASSPATH environment variable should also be set to the same path when submitting the job.
```
(./bin/start.sh ...)
--container-image="us-central1-docker.pkg.dev/<your_project>/<repository>/<your_custom_image>:<your_version>" # image with hbase-site.xml in /etc/hbase/conf/
--properties='spark.dataproc.driverEnv.SPARK_EXTRA_CLASSPATH=/etc/hbase/conf/'
```

2) Configure the desired HBase catalog json to passed as an argument (table reference and schema)
- The hbase-catalog.json should be passed using the --gcs.bigtable.hbase.catalog.json
1) `export JARS="gs://spark-lib/bigtable/spark-bigtable_2.12-0.1.0.jar"` and also required `spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36` Please refer example from official [documentation](https://github.com/GoogleCloudDataproc/spark-bigtable-connector/tree/main/examples/python).

2) Configure the desired BigTable catalog json to passed as an argument (table reference and schema)
- The catalog.json should be passed using the --es.bt.catalog.json
```
(./bin/start.sh ...)
-- --gcs.bigtable.hbase.catalog.json='''{
"table":{"namespace":"default","name":"<table_id>"},
-- --es.bt.catalog.json='''{
"table":{"name":"<table_id>"},
"rowkey":"key",
"columns":{
"key":{"cf":"rowkey", "col":"key", "type":"string"},
Expand All @@ -659,7 +638,7 @@ You can also check out the [differences between HBase and Cloud Bigtable](https:
}'''
```

3) [Create and manage](https://cloud.google.com/bigtable/docs/managing-tables) your Bigtable table schema, column families, etc, to match the provided HBase catalog.
3) [Create and manage](https://cloud.google.com/bigtable/docs/managing-tables) your Bigtable table schema, column families, etc, to match the provided Bigtable catalog.

## Required JAR files

Expand All @@ -669,32 +648,24 @@ Depending upon the versions of Elasticsearch, PySpark and Scala in the environme

The template can support the Elasticsearch versions >= 7.12.0, using the appropriate Elasticsearch Spark Connector

Some HBase and Bigtable dependencies are required to be passed when submitting the job.
Spark Bigtable connector dependencies are required to be passed when submitting the job.
These dependencies need to be passed by using the --jars flag, or, in the case of Dataproc Templates, using the JARS environment variable.
Some dependencies (jars) must be downloaded from [MVN Repository](https://mvnrepository.com/) and stored your Cloud Storage bucket (create one to store the dependencies).

- **[Apache HBase Spark Connector](https://mvnrepository.com/artifact/org.apache.hbase.connectors.spark/hbase-spark) dependencies (already mounted in Dataproc Serverless, so you refer to them using file://):**
- file:///usr/lib/spark/external/hbase-spark-protocol-shaded.jar
- file:///usr/lib/spark/external/hbase-spark.jar

- **Bigtable dependency:**
- gs://<your_bucket_to_store_dependencies>/bigtable-hbase-2.x-shaded-2.3.0.jar
- Download it using ``` wget https://repo1.maven.org/maven2/com/google/cloud/bigtable/bigtable-hbase-2.x-shaded/2.3.0/bigtable-hbase-2.x-shaded-2.3.0.jar```

- **HBase dependencies:**
- gs://<your_bucket_to_store_dependencies>/hbase-client-2.4.12.jar
- Download it using ``` wget https://repo1.maven.org/maven2/org/apache/hbase/hbase-client/2.4.12/hbase-client-2.4.12.jar```
- gs://<your_bucket_to_store_dependencies>/hbase-shaded-mapreduce-2.4.12.jar
- Download it using ``` wget https://repo1.maven.org/maven2/org/apache/hbase/hbase-shaded-mapreduce/2.4.12/hbase-shaded-mapreduce-2.4.12.jar```
- **[Spark BigTable Connector](https://cloud.google.com/bigtable/docs/use-bigtable-spark-connector)**
- gs://spark-lib/bigtable/spark-bigtable_2.12-0.1.0.jar

It also requires [DeltaIO dependencies](https://docs.delta.io/latest/releases.html) to be available in the Dataproc cluster if using delta format.


## Arguments
- `es.bt.input.node`: Elasticsearch Node Uri (format: mynode:9600)
- `es.bt.input.index`: Elasticsearch Input Index Name (format: <index>/<type>)
- `es.bt.input.user`: Elasticsearch Username
- `es.bt.input.password`: Elasticsearch Password
- `es.bt.hbase.catalog.json`: HBase catalog inline json
- `spark.bigtable.project.id`: GCP project where BigTable instance is running
- `spark.bigtable.instance.id`: BigTable instance id
- `es.bt.catalog.json`: BigTable catalog inline json
#### Optional Arguments
- `es.bt.input.es.nodes.path.prefix`: Prefix to add to all requests made to Elasticsearch
- `es.bt.input.es.query`: Holds the query used for reading data from the specified Index
Expand Down Expand Up @@ -743,6 +714,8 @@ Some dependencies (jars) must be downloaded from [MVN Repository](https://mvnrep
- `es.bt.input.es.net.proxy.socks.use.system.props`: Whether use the system Socks proxy properties (namely socksProxyHost and socksProxyHost) or not (default yes)
- `es.bt.flatten.struct.fields`: Flatten the struct fields
- `es.bt.flatten.array.fields`: Flatten the n-D array fields to 1-D array fields, it needs es.bt.flatten.struct.fields option to be passed
- `spark.bigtable.create.new.table`: Set True if you want to create a BigTable table from catalog. Default value is False means table must be present.
- `spark.bigtable.batch.mutate.size`: BigTable batch mutation size. Maximum allowed value is `100000`. Default is `100`. Rererence [documentation](https://github.com/GoogleCloudDataproc/spark-bigtable-connector/blob/main/spark-bigtable_2.12/src/main/scala/com/google/cloud/spark/bigtable/datasources/BigtableSparkConf.scala#L86)

## Usage

Expand All @@ -754,7 +727,9 @@ usage: main.py [-h]
--es.bt.input.index ES.BT.INPUT.INDEX
--es.bt.input.user ES.BT.INPUT.USER
--es.bt.input.password ES.BT.INPUT.PASSWORD
--es.bt.hbase.catalog.json ES.BT.HBASE.CATALOG.JSON
--spark.bigtable.project.id ES.BT.PROJECT.ID
--spark.bigtable.instance.id ES.BT.INSTANCE.ID
--es.bt.catalog.json ES.BT.CATALOG.JSON
[--es.bt.input.es.nodes.path.prefix ES.BT.INPUT.ES.NODES.PATH.PREFIX]
[--es.bt.input.es.query ES.BT.INPUT.ES.QUERY]
[--es.bt.input.es.mapping.date.rich ES.BT.INPUT.ES.MAPPING.DATE.RICH]
Expand Down Expand Up @@ -802,6 +777,8 @@ usage: main.py [-h]
[--es.bt.input.es.net.proxy.socks.use.system.props ES.BT.INPUT.ES.NET.PROXY.SOCKS.USE.SYSTEM.PROPS]
[--es.bt.flatten.struct.fields]
[--es.bt.flatten.array.fields]
[--spark.bigtable.create.new.table ES.BT.CREATE.NEW.TABLE]
[--spark.bigtable.batch.mutate.size ES.BT.BATCH.MUTATE.SIZE]


options:
Expand Down Expand Up @@ -908,8 +885,12 @@ options:
Flatten the struct fields
--es.bt.flatten.array.fields
Flatten the n-D array fields to 1-D array fields, it needs es.bt.flatten.struct.fields option to be passed
--es.bt.hbase.catalog.json ES.BT.HBASE.CATALOG.JSON
HBase catalog inline json
--spark.bigtable.project.id ES.BT.PROJECT.ID
GCP project id where BigTable instance is running
--spark.bigtable.instance.id ES.BT.INSTANCE.ID
BigTable instance id
--es.bt.catalog.json ES.BT.CATALOG.JSON
BigTable catalog inline json
```

## Example submission
Expand All @@ -918,24 +899,19 @@ options:
export GCP_PROJECT=<project_id>
export REGION=<region>
export GCS_STAGING_LOCATION=<gcs-staging-bucket-folder>
export JARS="gs://<your_bucket_to_store_dependencies>/elasticsearch-spark-30_2.12-8.11.4.jar,\
gs://<your_bucket_to_store_dependencies>/bigtable-hbase-2.x-hadoop-2.3.0.jar,\
gs://<your_bucket_to_store_dependencies>/hbase-client-2.4.12.jar,\
gs://<your_bucket_to_store_dependencies>/hbase-shaded-mapreduce-2.4.12.jar,\
file:///usr/lib/spark/external/hbase-spark-protocol-shaded.jar,\
file:///usr/lib/spark/external/hbase-spark.jar"
export SUBNET=projects/my-project/regions/us-central1/subnetworks/test-subnet
export JARS="gs://<your_bucket_to_store_dependencies>/elasticsearch-spark-30_2.12-8.11.4.jar,gs://spark-lib/bigtable/spark-bigtable_2.12-0.1.0.jar"
export SPARK_PROPERTIES="spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36"

./bin/start.sh \
--container-image="gcr.io/<your_project>/<your_custom_image>:<your_version>" \
--properties='spark.dataproc.driverEnv.SPARK_EXTRA_CLASSPATH=/etc/hbase/conf/,spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ # image with hbase-site.xml in /etc/hbase/conf/
-- --template=ELASTICSEARCHTOBIGTABLE \
--es.bt.input.node="xxxxxxxxxxxx.us-central1.gcp.cloud.es.io:9243" \
--es.bt.input.index="demo" \
--es.bt.input.user="demo" \
--es.bt.input.password="demo" \
--es.bt.hbase.catalog.json='''{
"table":{"namespace":"default","name":"my_table"},
--spark.bigtable.project.id="demo-project" \
--spark.bigtable.instance.id="bt-instance-id" \
--es.bt.catalog.json='''{
"table":{"name":"my_table"},
"rowkey":"key",
"columns":{
"key":{"cf":"rowkey", "col":"key", "type":"string"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,36 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
)
)
parser.add_argument(
f'--{constants.ES_BT_HBASE_CATALOG_JSON}',
dest=constants.ES_BT_HBASE_CATALOG_JSON,
f'--{constants.ES_BT_PROJECT_ID}',
dest=constants.ES_BT_PROJECT_ID,
required=True,
help='HBase catalog inline json'
help='BigTable project ID'
)
parser.add_argument(
f'--{constants.ES_BT_INSTANCE_ID}',
dest=constants.ES_BT_INSTANCE_ID,
required=True,
help='BigTable instance ID'
)
parser.add_argument(
f'--{constants.ES_BT_CREATE_NEW_TABLE}',
dest=constants.ES_BT_CREATE_NEW_TABLE,
required=False,
help='BigTable create new table flag. Default is false.',
default=False
)
parser.add_argument(
f'--{constants.ES_BT_BATCH_MUTATE_SIZE}',
dest=constants.ES_BT_BATCH_MUTATE_SIZE,
required=False,
help='BigTable batch mutate size. Maximum allowed size is 100000. Default is 100.',
default=100
)
parser.add_argument(
f'--{constants.ES_BT_CATALOG_JSON}',
dest=constants.ES_BT_CATALOG_JSON,
required=True,
help='BigTable catalog inline json'
)

known_args: argparse.Namespace
Expand All @@ -104,7 +130,11 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:
es_password: str = args[constants.ES_BT_NODE_PASSWORD]
flatten_struct = args[constants.ES_BT_FLATTEN_STRUCT]
flatten_array = args[constants.ES_BT_FLATTEN_ARRAY]
catalog: str = ''.join(args[constants.ES_BT_HBASE_CATALOG_JSON].split())
catalog: str = ''.join(args[constants.ES_BT_CATALOG_JSON].split())
project_id: str = args[constants.ES_BT_PROJECT_ID]
instance_id: str = args[constants.ES_BT_INSTANCE_ID]
create_new_table: bool = args[constants.ES_BT_CREATE_NEW_TABLE]
batch_mutate_size: int = args[constants.ES_BT_BATCH_MUTATE_SIZE]

ignore_keys = {constants.ES_BT_NODE_PASSWORD}
filtered_args = {key:val for key,val in args.items() if key not in ignore_keys}
Expand All @@ -128,7 +158,10 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:

# Write
input_data.write \
.format(constants.FORMAT_HBASE) \
.format(constants.FORMAT_BIGTABLE) \
.options(catalog=catalog) \
.option('hbase.spark.use.hbasecontext', "false") \
.option(constants.ES_BT_PROJECT_ID, project_id) \
.option(constants.ES_BT_INSTANCE_ID, instance_id) \
.option(constants.ES_BT_CREATE_NEW_TABLE, create_new_table) \
.option(constants.ES_BT_BATCH_MUTATE_SIZE, batch_mutate_size) \
.save()
34 changes: 0 additions & 34 deletions python/dataproc_templates/elasticsearch/hbase-site.xml

This file was deleted.

6 changes: 5 additions & 1 deletion python/dataproc_templates/util/template_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,13 @@ def get_es_spark_connector_input_options(prefix):
ES_BT_INPUT_INDEX = "es.bt.input.index"
ES_BT_NODE_USER = "es.bt.input.user"
ES_BT_NODE_PASSWORD = "es.bt.input.password"
ES_BT_HBASE_CATALOG_JSON = "es.bt.hbase.catalog.json"
ES_BT_FLATTEN_STRUCT = "es.bt.flatten.struct.fields"
ES_BT_FLATTEN_ARRAY = "es.bt.flatten.array.fields"
ES_BT_CATALOG_JSON = "es.bt.catalog.json"
ES_BT_PROJECT_ID = "spark.bigtable.project.id"
ES_BT_INSTANCE_ID = "spark.bigtable.instance.id"
ES_BT_CREATE_NEW_TABLE = "spark.bigtable.create.new.table"
ES_BT_BATCH_MUTATE_SIZE = "spark.bigtable.batch.mutate.size"

# GCS to BigQuery
GCS_BQ_INPUT_LOCATION = "gcs.bigquery.input.location"
Expand Down
Loading
Loading