Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename MongoDb CDC to BigQuery template #1879

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ follow [GitHub's branch renaming guide](https://docs.github.com/en/repositories/
- [Kafka to BigQuery](https://github.com/search?q=repo%3AGoogleCloudPlatform%2FDataflowTemplates%20Kafka_to_BigQuery&type=code)
- [Kafka to Cloud Storage](https://github.com/search?q=repo%3AGoogleCloudPlatform%2FDataflowTemplates%20Kafka_to_GCS&type=code)
- [Kinesis To Pubsub](https://github.com/search?q=repo%3AGoogleCloudPlatform%2FDataflowTemplates%20Kinesis_To_Pubsub&type=code)
- [MongoDB to BigQuery (CDC)](https://github.com/search?q=repo%3AGoogleCloudPlatform%2FDataflowTemplates%20MongoDB_to_BigQuery_CDC&type=code)
- [MongoDB (CDC) to BigQuery](https://github.com/search?q=repo%3AGoogleCloudPlatform%2FDataflowTemplates%20MongoDB_to_BigQuery_CDC&type=code)
- [Mqtt to Pubsub](https://github.com/search?q=repo%3AGoogleCloudPlatform%2FDataflowTemplates%20Mqtt_to_PubSub&type=code)
- [Ordered change stream buffer to Source DB](https://github.com/search?q=repo%3AGoogleCloudPlatform%2FDataflowTemplates%20Ordered_Changestream_Buffer_to_Sourcedb&type=code)
- [Pub/Sub Avro to BigQuery](https://github.com/search?q=repo%3AGoogleCloudPlatform%2FDataflowTemplates%20PubSub_Avro_to_BigQuery&type=code)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.JavascriptDocumentTransformerOptions;
import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.MongoDbOptions;
import com.google.cloud.teleport.v2.mongodb.options.MongoDbToBigQueryOptions.PubSubOptions;
import com.google.cloud.teleport.v2.mongodb.templates.MongoDbToBigQueryCdc.Options;
import com.google.cloud.teleport.v2.mongodb.templates.MongoDbCdcToBigQuery.Options;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.transforms.JavascriptDocumentTransformer.TransformDocumentViaJavascript;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
Expand All @@ -46,7 +46,7 @@
import org.slf4j.LoggerFactory;

/**
* The {@link MongoDbToBigQueryCdc} pipeline is a streaming pipeline which reads data pushed to
* The {@link MongoDbCdcToBigQuery} pipeline is a streaming pipeline which reads data pushed to
* PubSub from MongoDB Changestream and outputs the resulting records to BigQuery.
*
* <p>Check out <a
Expand All @@ -56,9 +56,9 @@
@Template(
name = "MongoDB_to_BigQuery_CDC",
category = TemplateCategory.STREAMING,
displayName = "MongoDB to BigQuery (CDC)",
displayName = "MongoDB (CDC) to BigQuery",
description =
"The MongoDB to BigQuery CDC (Change Data Capture) template is a streaming pipeline that works together with MongoDB change streams. "
"The MongoDB CDC (Change Data Capture) to BigQuery template is a streaming pipeline that works together with MongoDB change streams. "
+ "The pipeline reads the JSON records pushed to Pub/Sub via a MongoDB change stream and writes them to BigQuery as specified by the <code>userOption</code> parameter.",
optionsClass = Options.class,
flexContainerName = "mongodb-to-bigquery-cdc",
Expand All @@ -73,9 +73,9 @@
},
streaming = true,
supportsAtLeastOnce = true)
public class MongoDbToBigQueryCdc {
public class MongoDbCdcToBigQuery {

private static final Logger LOG = LoggerFactory.getLogger(MongoDbToBigQuery.class);
private static final Logger LOG = LoggerFactory.getLogger(MongoDbCdcToBigQuery.class);

/** Options interface. */
public interface Options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,15 @@ variable "region" {
description = "The region in which the created job should run."
}

variable "useStorageWriteApiAtLeastOnce" {
type = bool
description = "When using the Storage Write API, specifies the write semantics. To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to `true`. To use exactly- once semantics, set the parameter to `false`. This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`."
default = null
}

variable "mongoDbUri" {
type = string
description = "MongoDB connection URI in the format `mongodb+srv://:@`."
description = "The MongoDB connection URI in the format `mongodb+srv://:@.`"

}

Expand All @@ -53,7 +59,7 @@ variable "collection" {

variable "userOption" {
type = string
description = "User option: `FLATTEN` or `NONE`. `FLATTEN` flattens the documents to the single level. `NONE` stores the whole document as a JSON string. Defaults to: NONE."
description = "`FLATTEN`, `JSON`, or `NONE`. `FLATTEN` flattens the documents to the single level. `JSON` stores document in BigQuery JSON format. `NONE` stores the whole document as a JSON-formatted STRING. Defaults to: NONE."
default = "NONE"
}

Expand All @@ -63,55 +69,59 @@ variable "KMSEncryptionKey" {
default = null
}

variable "useStorageWriteApi" {
type = bool
variable "filter" {
type = string
description = <<EOT
If true, the pipeline uses the Storage Write API when writing the data to BigQuery (see https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). The default value is false. When using Storage Write API in exactly-once mode, you must set the following parameters: "Number of streams for BigQuery Storage Write API" and "Triggering frequency in seconds for BigQuery Storage Write API". If you enable Dataflow at-least-once mode or set the useStorageWriteApiAtLeastOnce parameter to true, then you don't need to set the number of streams or the triggering frequency.
Bson filter in json format. (Example: { "val": { $gt: 0, $lt: 9 }})
EOT
default = null
}

variable "useStorageWriteApiAtLeastOnce" {
variable "useStorageWriteApi" {
type = bool
description = <<EOT
This parameter takes effect only if "Use BigQuery Storage Write API" is enabled. If enabled the at-least-once semantics will be used for Storage Write API, otherwise exactly-once semantics will be used. Defaults to: false.
EOT
description = "If true, the pipeline uses the BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). The default value is `false`. For more information, see Using the Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)."
default = null
}

variable "numStorageWriteApiStreams" {
type = number
description = "Number of streams defines the parallelism of the BigQueryIO’s Write transform and roughly corresponds to the number of Storage Write API’s streams which will be used by the pipeline. See https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api for the recommended values. Defaults to: 0."
description = "When using the Storage Write API, specifies the number of write streams. If `useStorageWriteApi` is `true` and `useStorageWriteApiAtLeastOnce` is `false`, then you must set this parameter. Defaults to: 0."
default = null
}

variable "storageWriteApiTriggeringFrequencySec" {
type = number
description = "Triggering frequency will determine how soon the data will be visible for querying in BigQuery. See https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api for the recommended values."
description = "When using the Storage Write API, specifies the triggering frequency, in seconds. If `useStorageWriteApi` is `true` and `useStorageWriteApiAtLeastOnce` is `false`, then you must set this parameter."
default = null
}

variable "inputTopic" {
type = string
description = "Pub/Sub topic to read the input from, in the format of 'projects/your-project-id/topics/your-topic-name' (Example: projects/your-project-id/topics/your-topic-name)"
description = "The Pub/Sub input topic to read from, in the format of projects/<PROJECT_ID>/topics/<TOPIC_NAME>."

}

variable "outputTableSpec" {
type = string
description = "BigQuery table location to write the output to. The name should be in the format `<project>:<dataset>.<table_name>`. The table's schema must match input objects."
description = "The BigQuery table to write to. For example, `bigquery-project:dataset.output_table`."

}

variable "bigQuerySchemaPath" {
type = string
description = "The Cloud Storage path for the BigQuery JSON schema. (Example: gs://your-bucket/your-schema.json)"
default = null
}

variable "javascriptDocumentTransformGcsPath" {
type = string
description = "The Cloud Storage path pattern for the JavaScript code containing your user-defined functions. (Example: gs://your-bucket/your-transforms/*.js)"
description = "The Cloud Storage URI of the `.js` file that defines the JavaScript user-defined function (UDF) to use. (Example: gs://your-bucket/your-transforms/*.js)"
default = null
}

variable "javascriptDocumentTransformFunctionName" {
type = string
description = "The function name should only contain letters, digits and underscores. Example: 'transform' or 'transform_udf1'. (Example: transform)"
description = "The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is `myTransform(inJson) { /*...do stuff...*/ }`, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). (Example: transform)"
default = null
}

Expand Down Expand Up @@ -240,17 +250,19 @@ resource "google_dataflow_flex_template_job" "generated" {
provider = google-beta
container_spec_gcs_path = "gs://dataflow-templates-${var.region}/latest/flex/MongoDB_to_BigQuery_CDC"
parameters = {
useStorageWriteApiAtLeastOnce = tostring(var.useStorageWriteApiAtLeastOnce)
mongoDbUri = var.mongoDbUri
database = var.database
collection = var.collection
userOption = var.userOption
KMSEncryptionKey = var.KMSEncryptionKey
filter = var.filter
useStorageWriteApi = tostring(var.useStorageWriteApi)
useStorageWriteApiAtLeastOnce = tostring(var.useStorageWriteApiAtLeastOnce)
numStorageWriteApiStreams = tostring(var.numStorageWriteApiStreams)
storageWriteApiTriggeringFrequencySec = tostring(var.storageWriteApiTriggeringFrequencySec)
inputTopic = var.inputTopic
outputTableSpec = var.outputTableSpec
bigQuerySchemaPath = var.bigQuerySchemaPath
javascriptDocumentTransformGcsPath = var.javascriptDocumentTransformGcsPath
javascriptDocumentTransformFunctionName = var.javascriptDocumentTransformFunctionName
}
Expand Down
Loading