Skip to content

Commit

Permalink
Merge pull request #1208 from ron-gal:bigtable_to_vertex
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 589182957
  • Loading branch information
cloud-teleport committed Dec 8, 2023
2 parents 8a124b4 + 8c7375d commit d0db184
Show file tree
Hide file tree
Showing 3 changed files with 476 additions and 0 deletions.
166 changes: 166 additions & 0 deletions v1/README_Cloud_Bigtable_to_GCS_Json.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@

Cloud Bigtable to JSON template
---
The Bigtable to JSON template is a pipeline that reads data from a Bigtable table
and writes it to a Cloud Storage bucket in JSON format.


:memo: This is a Google-provided template! Please
check [Provided templates documentation](https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-json)
on how to use it without having to build from sources using [Create job from template](https://console.cloud.google.com/dataflow/createjob?template=Cloud_Bigtable_to_GCS_Json).

:bulb: This is a generated documentation based
on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplates#metadata-annotations)
. Do not change this file directly.

## Parameters

### Required Parameters

* **bigtableProjectId** (Project ID): The ID of the Google Cloud project of the Cloud Bigtable instance that you want to read data from.
* **bigtableInstanceId** (Instance ID): The ID of the Cloud Bigtable instance that contains the table.
* **bigtableTableId** (Table ID): The ID of the Cloud Bigtable table to read.
* **filenamePrefix** (JSON file prefix): The prefix of the JSON file name. For example, "table1-". Defaults to: part.

### Optional Parameters

* **outputDirectory** (Cloud Storage directory for storing JSON files): The Cloud Storage path where the output JSON files can be stored. (Example: gs://your-bucket/your-path/).



## Getting Started

### Requirements

* Java 11
* Maven
* [gcloud CLI](https://cloud.google.com/sdk/gcloud), and execution of the
following commands:
* `gcloud auth login`
* `gcloud auth application-default login`

:star2: Those dependencies are pre-installed if you use Google Cloud Shell!
[![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/editor?cloudshell_git_repo=https%3A%2F%2Fgithub.com%2FGoogleCloudPlatform%2FDataflowTemplates.git&cloudshell_open_in_editor=v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToJson.java)

### Templates Plugin

This README provides instructions using
the [Templates Plugin](https://github.com/GoogleCloudPlatform/DataflowTemplates#templates-plugin)
. Install the plugin with the following command before proceeding:

```shell
mvn clean install -pl plugins/templates-maven-plugin -am
```

### Building Template

This template is a Classic Template, meaning that the pipeline code will be
executed only once and the pipeline will be saved to Google Cloud Storage for
further reuse. Please check [Creating classic Dataflow templates](https://cloud.google.com/dataflow/docs/guides/templates/creating-templates)
and [Running classic templates](https://cloud.google.com/dataflow/docs/guides/templates/running-templates)
for more information.

#### Staging the Template

If the plan is to just stage the template (i.e., make it available to use) by
the `gcloud` command or Dataflow "Create job from template" UI,
the `-PtemplatesStage` profile should be used:

```shell
export PROJECT=<my-project>
export BUCKET_NAME=<bucket-name>

mvn clean package -PtemplatesStage \
-DskipTests \
-DprojectId="$PROJECT" \
-DbucketName="$BUCKET_NAME" \
-DstagePrefix="templates" \
-DtemplateName="Cloud_Bigtable_to_GCS_Json" \
-pl v1 \
-am
```

The `-DgcpTempLocation=<temp-bucket-name>` parameter can be specified to set the GCS bucket used by the DataflowRunner to write
temp files to during serialization. The path used will be `gs://<temp-bucket-name>/temp/`.

The command should build and save the template to Google Cloud, and then print
the complete location on Cloud Storage:

```
Classic Template was staged! gs://<bucket-name>/templates/Cloud_Bigtable_to_GCS_Json
```

The specific path should be copied as it will be used in the following steps.

#### Running the Template

**Using the staged template**:

You can use the path above run the template (or share with others for execution).

To start a job with the template at any time using `gcloud`, you are going to
need valid resources for the required parameters.

Provided that, the following command line can be used:

```shell
export PROJECT=<my-project>
export BUCKET_NAME=<bucket-name>
export REGION=us-central1
export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/Cloud_Bigtable_to_GCS_Json"

### Required
export BIGTABLE_PROJECT_ID=<bigtableProjectId>
export BIGTABLE_INSTANCE_ID=<bigtableInstanceId>
export BIGTABLE_TABLE_ID=<bigtableTableId>
export FILENAME_PREFIX=part

### Optional
export OUTPUT_DIRECTORY=<outputDirectory>

gcloud dataflow jobs run "cloud-bigtable-to-gcs-json-job" \
--project "$PROJECT" \
--region "$REGION" \
--gcs-location "$TEMPLATE_SPEC_GCSPATH" \
--parameters "bigtableProjectId=$BIGTABLE_PROJECT_ID" \
--parameters "bigtableInstanceId=$BIGTABLE_INSTANCE_ID" \
--parameters "bigtableTableId=$BIGTABLE_TABLE_ID" \
--parameters "outputDirectory=$OUTPUT_DIRECTORY" \
--parameters "filenamePrefix=$FILENAME_PREFIX"
```

For more information about the command, please check:
https://cloud.google.com/sdk/gcloud/reference/dataflow/jobs/run


**Using the plugin**:

Instead of just generating the template in the folder, it is possible to stage
and run the template in a single command. This may be useful for testing when
changing the templates.

```shell
export PROJECT=<my-project>
export BUCKET_NAME=<bucket-name>
export REGION=us-central1

### Required
export BIGTABLE_PROJECT_ID=<bigtableProjectId>
export BIGTABLE_INSTANCE_ID=<bigtableInstanceId>
export BIGTABLE_TABLE_ID=<bigtableTableId>
export FILENAME_PREFIX=part

### Optional
export OUTPUT_DIRECTORY=<outputDirectory>

mvn clean package -PtemplatesRun \
-DskipTests \
-DprojectId="$PROJECT" \
-DbucketName="$BUCKET_NAME" \
-Dregion="$REGION" \
-DjobName="cloud-bigtable-to-gcs-json-job" \
-DtemplateName="Cloud_Bigtable_to_GCS_Json" \
-Dparameters="bigtableProjectId=$BIGTABLE_PROJECT_ID,bigtableInstanceId=$BIGTABLE_INSTANCE_ID,bigtableTableId=$BIGTABLE_TABLE_ID,outputDirectory=$OUTPUT_DIRECTORY,filenamePrefix=$FILENAME_PREFIX" \
-pl v1 \
-am
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.bigtable;

import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Row;
import com.google.cloud.teleport.bigtable.BigtableToJson.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.io.StringWriter;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Dataflow pipeline that exports data from a Cloud Bigtable table to JSON files in GCS. Currently,
* filtering on Cloud Bigtable table is not supported.
*
* <p>Check out <a href=
* "https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Bigtable_to_GCS_JSON.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Cloud_Bigtable_to_GCS_Json",
category = TemplateCategory.BATCH,
displayName = "Cloud Bigtable to JSON",
description =
"The Bigtable to JSON template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in JSON format",
optionsClass = Options.class,
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-json", // TODO(rongal): create guide.
contactInformation = "https://cloud.google.com/support",
requirements = {
"The Bigtable table must exist.",
"The output Cloud Storage bucket must exist before running the pipeline."
})
public class BigtableToJson {
private static final Logger LOG = LoggerFactory.getLogger(BigtableToJson.class);

/** Options for the export pipeline. */
public interface Options extends PipelineOptions {
@TemplateParameter.ProjectId(
order = 1,
description = "Project ID",
helpText =
"The ID of the Google Cloud project of the Cloud Bigtable instance that you want to"
+ " read data from")
ValueProvider<String> getBigtableProjectId();

@SuppressWarnings("unused")
void setBigtableProjectId(ValueProvider<String> projectId);

@TemplateParameter.Text(
order = 2,
regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
description = "Instance ID",
helpText = "The ID of the Cloud Bigtable instance that contains the table")
ValueProvider<String> getBigtableInstanceId();

@SuppressWarnings("unused")
void setBigtableInstanceId(ValueProvider<String> instanceId);

@TemplateParameter.Text(
order = 3,
regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description = "Table ID",
helpText = "The ID of the Cloud Bigtable table to read")
ValueProvider<String> getBigtableTableId();

@SuppressWarnings("unused")
void setBigtableTableId(ValueProvider<String> tableId);

@TemplateParameter.GcsWriteFolder(
order = 4,
optional = true,
description = "Cloud Storage directory for storing JSON files",
helpText = "The Cloud Storage path where the output JSON files can be stored.",
example = "gs://your-bucket/your-path/")
ValueProvider<String> getOutputDirectory();

@SuppressWarnings("unused")
void setOutputDirectory(ValueProvider<String> outputDirectory);

@TemplateParameter.Text(
order = 5,
description = "JSON file prefix",
helpText = "The prefix of the JSON file name. For example, \"table1-\"")
@Default.String("part")
ValueProvider<String> getFilenamePrefix();

@SuppressWarnings("unused")
void setFilenamePrefix(ValueProvider<String> filenamePrefix);
}

/**
* Runs a pipeline to export data from a Cloud Bigtable table to JSON files in GCS in JSON format.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

PipelineResult result = run(options);

// Wait for pipeline to finish only if it is not constructing a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
result.waitUntilFinish();
}
LOG.info("Completed pipeline setup");
}

public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));

BigtableIO.Read read =
BigtableIO.read()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId());

// Do not validate input fields if it is running as a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() != null) {
read = read.withoutValidation();
}

// Concatenating cloud storage folder with file prefix to get complete path
ValueProvider<String> outputFilePrefix = options.getFilenamePrefix();

ValueProvider<String> outputFilePathWithPrefix =
ValueProvider.NestedValueProvider.of(
options.getOutputDirectory(),
(SerializableFunction<String, String>)
folder -> {
if (!folder.endsWith("/")) {
// Appending the slash if not provided by user
folder = folder + "/";
}
return folder + outputFilePrefix.get();
});
pipeline
.apply("Read from Bigtable", read)
.apply("Transform to JSON", MapElements.via(new BigtableToJsonFn()))
.apply("Write to storage", TextIO.write().to(outputFilePathWithPrefix).withSuffix(".json"));

return pipeline.run();
}

/** Translates Bigtable {@link Row} to JSON. */
static class BigtableToJsonFn extends SimpleFunction<Row, String> {
@Override
public String apply(Row row) {
StringWriter stringWriter = new StringWriter();
JsonWriter jsonWriter = new JsonWriter(stringWriter);
try {
jsonWriter.beginObject();
jsonWriter.name(row.getKey().toStringUtf8());
jsonWriter.beginObject();
for (Family family : row.getFamiliesList()) {
String familyName = family.getName();
jsonWriter.name(familyName);
jsonWriter.beginObject();
for (Column column : family.getColumnsList()) {
for (Cell cell : column.getCellsList()) {
jsonWriter
.name(column.getQualifier().toStringUtf8())
.value(cell.getValue().toStringUtf8());
}
}
jsonWriter.endObject();
}
jsonWriter.endObject();
jsonWriter.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
}
return stringWriter.toString();
}
}
}
Loading

0 comments on commit d0db184

Please sign in to comment.