Skip to content

Commit

Permalink
merging
Browse files Browse the repository at this point in the history
  • Loading branch information
dhercher committed Aug 1, 2024
1 parent 02d1464 commit 17881c5
Show file tree
Hide file tree
Showing 22 changed files with 198 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.it.gcp.bigtable;

import static org.apache.beam.it.common.utils.ResourceManagerUtils.generateResourceId;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;

import com.google.cloud.bigtable.admin.v2.models.StorageType;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -185,4 +186,13 @@ static void checkValidTableId(String idToCheck) {
+ " is not a valid ID. Only letters, numbers, hyphens, underscores and exclamation points are allowed.");
}
}

/**
* Generates an app profile id.
*
* @return The app profile id string.
*/
public static String generateAppProfileId() {
return "app_profile_" + randomAlphanumeric(8).toLowerCase() + "_" + System.nanoTime();
}
}
9 changes: 7 additions & 2 deletions v1/README_Cloud_Bigtable_to_GCS_Avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat

### Optional parameters

* **bigtableAppProfileId** : The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.



Expand Down Expand Up @@ -113,6 +114,7 @@ export OUTPUT_DIRECTORY=<outputDirectory>
export FILENAME_PREFIX=part

### Optional
export BIGTABLE_APP_PROFILE_ID=default

gcloud dataflow jobs run "cloud-bigtable-to-gcs-avro-job" \
--project "$PROJECT" \
Expand All @@ -122,7 +124,8 @@ gcloud dataflow jobs run "cloud-bigtable-to-gcs-avro-job" \
--parameters "bigtableInstanceId=$BIGTABLE_INSTANCE_ID" \
--parameters "bigtableTableId=$BIGTABLE_TABLE_ID" \
--parameters "outputDirectory=$OUTPUT_DIRECTORY" \
--parameters "filenamePrefix=$FILENAME_PREFIX"
--parameters "filenamePrefix=$FILENAME_PREFIX" \
--parameters "bigtableAppProfileId=$BIGTABLE_APP_PROFILE_ID"
```

For more information about the command, please check:
Expand All @@ -148,6 +151,7 @@ export OUTPUT_DIRECTORY=<outputDirectory>
export FILENAME_PREFIX=part

### Optional
export BIGTABLE_APP_PROFILE_ID=default

mvn clean package -PtemplatesRun \
-DskipTests \
Expand All @@ -156,7 +160,7 @@ mvn clean package -PtemplatesRun \
-Dregion="$REGION" \
-DjobName="cloud-bigtable-to-gcs-avro-job" \
-DtemplateName="Cloud_Bigtable_to_GCS_Avro" \
-Dparameters="bigtableProjectId=$BIGTABLE_PROJECT_ID,bigtableInstanceId=$BIGTABLE_INSTANCE_ID,bigtableTableId=$BIGTABLE_TABLE_ID,outputDirectory=$OUTPUT_DIRECTORY,filenamePrefix=$FILENAME_PREFIX" \
-Dparameters="bigtableProjectId=$BIGTABLE_PROJECT_ID,bigtableInstanceId=$BIGTABLE_INSTANCE_ID,bigtableTableId=$BIGTABLE_TABLE_ID,outputDirectory=$OUTPUT_DIRECTORY,filenamePrefix=$FILENAME_PREFIX,bigtableAppProfileId=$BIGTABLE_APP_PROFILE_ID" \
-f v1
```

Expand Down Expand Up @@ -207,6 +211,7 @@ resource "google_dataflow_job" "cloud_bigtable_to_gcs_avro" {
bigtableTableId = "<bigtableTableId>"
outputDirectory = "gs://mybucket/somefolder"
filenamePrefix = "part"
# bigtableAppProfileId = "default"
}
}
```
9 changes: 7 additions & 2 deletions v1/README_Cloud_Bigtable_to_GCS_Json.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **outputDirectory** : The Cloud Storage path where the output JSON files are stored. (Example: gs://your-bucket/your-path/).
* **userOption** : Possible values are `FLATTEN` or `NONE`. `FLATTEN` flattens the row to the single level. `NONE` stores the whole row as a JSON string. Defaults to `NONE`.
* **columnsAliases** : A comma-separated list of columns that are required for the Vertex AI Vector Search index. The columns `id` and `embedding` are required for Vertex AI Vector Search. You can use the notation `fromfamily:fromcolumn;to`. For example, if the columns are `rowkey` and `cf:my_embedding`, where `rowkey` has a different name than the embedding column, specify `cf:my_embedding;embedding` and, `rowkey;id`. Only use this option when the value for `userOption` is `FLATTEN`.
* **bigtableAppProfileId** : The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.



Expand Down Expand Up @@ -116,6 +117,7 @@ export FILENAME_PREFIX=part
export OUTPUT_DIRECTORY=<outputDirectory>
export USER_OPTION=NONE
export COLUMNS_ALIASES=<columnsAliases>
export BIGTABLE_APP_PROFILE_ID=default

gcloud dataflow jobs run "cloud-bigtable-to-gcs-json-job" \
--project "$PROJECT" \
Expand All @@ -127,7 +129,8 @@ gcloud dataflow jobs run "cloud-bigtable-to-gcs-json-job" \
--parameters "outputDirectory=$OUTPUT_DIRECTORY" \
--parameters "filenamePrefix=$FILENAME_PREFIX" \
--parameters "userOption=$USER_OPTION" \
--parameters "columnsAliases=$COLUMNS_ALIASES"
--parameters "columnsAliases=$COLUMNS_ALIASES" \
--parameters "bigtableAppProfileId=$BIGTABLE_APP_PROFILE_ID"
```

For more information about the command, please check:
Expand Down Expand Up @@ -155,6 +158,7 @@ export FILENAME_PREFIX=part
export OUTPUT_DIRECTORY=<outputDirectory>
export USER_OPTION=NONE
export COLUMNS_ALIASES=<columnsAliases>
export BIGTABLE_APP_PROFILE_ID=default

mvn clean package -PtemplatesRun \
-DskipTests \
Expand All @@ -163,7 +167,7 @@ mvn clean package -PtemplatesRun \
-Dregion="$REGION" \
-DjobName="cloud-bigtable-to-gcs-json-job" \
-DtemplateName="Cloud_Bigtable_to_GCS_Json" \
-Dparameters="bigtableProjectId=$BIGTABLE_PROJECT_ID,bigtableInstanceId=$BIGTABLE_INSTANCE_ID,bigtableTableId=$BIGTABLE_TABLE_ID,outputDirectory=$OUTPUT_DIRECTORY,filenamePrefix=$FILENAME_PREFIX,userOption=$USER_OPTION,columnsAliases=$COLUMNS_ALIASES" \
-Dparameters="bigtableProjectId=$BIGTABLE_PROJECT_ID,bigtableInstanceId=$BIGTABLE_INSTANCE_ID,bigtableTableId=$BIGTABLE_TABLE_ID,outputDirectory=$OUTPUT_DIRECTORY,filenamePrefix=$FILENAME_PREFIX,userOption=$USER_OPTION,columnsAliases=$COLUMNS_ALIASES,bigtableAppProfileId=$BIGTABLE_APP_PROFILE_ID" \
-f v1
```

Expand Down Expand Up @@ -216,6 +220,7 @@ resource "google_dataflow_job" "cloud_bigtable_to_gcs_json" {
# outputDirectory = "gs://your-bucket/your-path/"
# userOption = "NONE"
# columnsAliases = "<columnsAliases>"
# bigtableAppProfileId = "default"
}
}
```
9 changes: 7 additions & 2 deletions v1/README_Cloud_Bigtable_to_GCS_Parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
### Optional parameters

* **numShards** : The maximum number of output shards produced when writing. A higher number of shards means higher throughput for writing to Cloud Storage, but potentially higher data aggregation cost across shards when processing output Cloud Storage files. The default value is decided by Dataflow.
* **bigtableAppProfileId** : The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.



Expand Down Expand Up @@ -115,6 +116,7 @@ export FILENAME_PREFIX=part

### Optional
export NUM_SHARDS=0
export BIGTABLE_APP_PROFILE_ID=default

gcloud dataflow jobs run "cloud-bigtable-to-gcs-parquet-job" \
--project "$PROJECT" \
Expand All @@ -125,7 +127,8 @@ gcloud dataflow jobs run "cloud-bigtable-to-gcs-parquet-job" \
--parameters "bigtableTableId=$BIGTABLE_TABLE_ID" \
--parameters "outputDirectory=$OUTPUT_DIRECTORY" \
--parameters "filenamePrefix=$FILENAME_PREFIX" \
--parameters "numShards=$NUM_SHARDS"
--parameters "numShards=$NUM_SHARDS" \
--parameters "bigtableAppProfileId=$BIGTABLE_APP_PROFILE_ID"
```

For more information about the command, please check:
Expand All @@ -152,6 +155,7 @@ export FILENAME_PREFIX=part

### Optional
export NUM_SHARDS=0
export BIGTABLE_APP_PROFILE_ID=default

mvn clean package -PtemplatesRun \
-DskipTests \
Expand All @@ -160,7 +164,7 @@ mvn clean package -PtemplatesRun \
-Dregion="$REGION" \
-DjobName="cloud-bigtable-to-gcs-parquet-job" \
-DtemplateName="Cloud_Bigtable_to_GCS_Parquet" \
-Dparameters="bigtableProjectId=$BIGTABLE_PROJECT_ID,bigtableInstanceId=$BIGTABLE_INSTANCE_ID,bigtableTableId=$BIGTABLE_TABLE_ID,outputDirectory=$OUTPUT_DIRECTORY,filenamePrefix=$FILENAME_PREFIX,numShards=$NUM_SHARDS" \
-Dparameters="bigtableProjectId=$BIGTABLE_PROJECT_ID,bigtableInstanceId=$BIGTABLE_INSTANCE_ID,bigtableTableId=$BIGTABLE_TABLE_ID,outputDirectory=$OUTPUT_DIRECTORY,filenamePrefix=$FILENAME_PREFIX,numShards=$NUM_SHARDS,bigtableAppProfileId=$BIGTABLE_APP_PROFILE_ID" \
-f v1
```

Expand Down Expand Up @@ -212,6 +216,7 @@ resource "google_dataflow_job" "cloud_bigtable_to_gcs_parquet" {
outputDirectory = "<outputDirectory>"
filenamePrefix = "part"
# numShards = "0"
# bigtableAppProfileId = "default"
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ public interface Options extends PipelineOptions {

@SuppressWarnings("unused")
void setFilenamePrefix(ValueProvider<String> filenamePrefix);

@TemplateParameter.Text(
order = 6,
groupName = "Source",
optional = true,
regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description = "Application profile ID",
helpText =
"The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.")
@Default.String("default")
ValueProvider<String> getBigtableAppProfileId();

@SuppressWarnings("unused")
void setBigtableAppProfileId(ValueProvider<String> appProfileId);
}

/**
Expand All @@ -153,6 +167,7 @@ public static PipelineResult run(Options options) {
BigtableIO.read()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withAppProfileId(options.getBigtableAppProfileId())
.withTableId(options.getBigtableTableId());

// Do not validate input fields if it is running as a template.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,20 @@ public interface Options extends PipelineOptions {

@SuppressWarnings("unused")
void setColumnsAliases(ValueProvider<String> value);

@TemplateParameter.Text(
order = 8,
groupName = "Source",
optional = true,
regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description = "Application profile ID",
helpText =
"The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.")
@Default.String("default")
ValueProvider<String> getBigtableAppProfileId();

@SuppressWarnings("unused")
void setBigtableAppProfileId(ValueProvider<String> appProfileId);
}

/**
Expand All @@ -186,6 +200,7 @@ public static PipelineResult run(Options options) {
BigtableIO.read()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withAppProfileId(options.getBigtableAppProfileId())
.withTableId(options.getBigtableTableId());

// Do not validate input fields if it is running as a template.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ public interface Options extends PipelineOptions {

@SuppressWarnings("unused")
void setNumShards(ValueProvider<Integer> numShards);

@TemplateParameter.Text(
order = 7,
groupName = "Source",
optional = true,
regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description = "Application profile ID",
helpText =
"The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.")
@Default.String("default")
ValueProvider<String> getBigtableAppProfileId();

@SuppressWarnings("unused")
void setBigtableAppProfileId(ValueProvider<String> appProfileId);
}

/**
Expand Down Expand Up @@ -170,6 +184,7 @@ public static PipelineResult run(Options options) {
BigtableIO.read()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withAppProfileId(options.getBigtableAppProfileId())
.withTableId(options.getBigtableTableId());

// Do not validate input fields if it is running as a template.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.common.truth.Truth.assertThat;
import static org.apache.beam.it.gcp.artifacts.matchers.ArtifactAsserts.assertThatGenericRecords;
import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateAppProfileId;
import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateTableId;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
Expand Down Expand Up @@ -116,6 +117,10 @@ public void testBigtableToAvro() throws IOException {
String tableId = generateTableId(testName);
bigtableResourceManager.createTable(tableId, ImmutableList.of("family1", "family2"));

String appProfileId = generateAppProfileId();
bigtableResourceManager.createAppProfile(
appProfileId, false, bigtableResourceManager.getClusterNames());

long timestamp = System.currentTimeMillis() * 1000;
bigtableResourceManager.write(
ImmutableList.of(
Expand All @@ -130,7 +135,8 @@ public void testBigtableToAvro() throws IOException {
.addParameter("bigtableInstanceId", bigtableResourceManager.getInstanceId())
.addParameter("bigtableTableId", tableId)
.addParameter("outputDirectory", getGcsPath("output/"))
.addParameter("filenamePrefix", "bigtable-to-avro-output-");
.addParameter("filenamePrefix", "bigtable-to-avro-output-")
.addParameter("bigtableAppProfileId", appProfileId);

// Act
PipelineLauncher.LaunchInfo info = launchTemplate(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.common.truth.Truth.assertThat;
import static org.apache.beam.it.gcp.artifacts.matchers.ArtifactAsserts.assertThatArtifacts;
import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateAppProfileId;
import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateTableId;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
Expand Down Expand Up @@ -68,6 +69,10 @@ public void testUnflattenedBigtableToJson() throws IOException {
String tableId = generateTableId(testName);
bigtableResourceManager.createTable(tableId, ImmutableList.of("col1"));

String appProfileId = generateAppProfileId();
bigtableResourceManager.createAppProfile(
appProfileId, false, bigtableResourceManager.getClusterNames());

long timestamp = System.currentTimeMillis() * 1000;
bigtableResourceManager.write(
ImmutableList.of(
Expand All @@ -81,7 +86,8 @@ public void testUnflattenedBigtableToJson() throws IOException {
.addParameter("bigtableInstanceId", bigtableResourceManager.getInstanceId())
.addParameter("bigtableTableId", tableId)
.addParameter("outputDirectory", getGcsPath("output/"))
.addParameter("filenamePrefix", "bigtable-to-json-output-");
.addParameter("filenamePrefix", "bigtable-to-json-output-")
.addParameter("bigtableAppProfileId", appProfileId);

// Act
PipelineLauncher.LaunchInfo info = launchTemplate(options);
Expand All @@ -106,6 +112,10 @@ public void testFlattenedBigtableToJson() throws IOException {
String tableId = generateTableId(testName);
bigtableResourceManager.createTable(tableId, ImmutableList.of("col1"));

String appProfileId = generateAppProfileId();
bigtableResourceManager.createAppProfile(
appProfileId, false, bigtableResourceManager.getClusterNames());

long timestamp = System.currentTimeMillis() * 1000;
bigtableResourceManager.write(
ImmutableList.of(
Expand All @@ -120,7 +130,8 @@ public void testFlattenedBigtableToJson() throws IOException {
.addParameter("bigtableTableId", tableId)
.addParameter("outputDirectory", getGcsPath("output/"))
.addParameter("filenamePrefix", "bigtable-to-json-output-")
.addParameter("userOption", "FLATTEN");
.addParameter("userOption", "FLATTEN")
.addParameter("bigtableAppProfileId", appProfileId);

// Act
PipelineLauncher.LaunchInfo info = launchTemplate(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.common.truth.Truth.assertThat;
import static org.apache.beam.it.gcp.artifacts.matchers.ArtifactAsserts.assertThatGenericRecords;
import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateAppProfileId;
import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateTableId;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
Expand Down Expand Up @@ -71,6 +72,10 @@ public void testBigtableToParquet() throws IOException {
String tableId = generateTableId(testName);
bigtableResourceManager.createTable(tableId, ImmutableList.of("col1"));

String appProfileId = generateAppProfileId();
bigtableResourceManager.createAppProfile(
appProfileId, false, bigtableResourceManager.getClusterNames());

long timestamp = System.currentTimeMillis() * 1000;
bigtableResourceManager.write(
ImmutableList.of(
Expand All @@ -85,7 +90,8 @@ public void testBigtableToParquet() throws IOException {
.addParameter("bigtableTableId", tableId)
.addParameter("outputDirectory", getGcsPath("output/"))
.addParameter("filenamePrefix", "bigtable-to-parquet-output-")
.addParameter("numShards", "1");
.addParameter("numShards", "1")
.addParameter("bigtableAppProfileId", appProfileId);

// Act
PipelineLauncher.LaunchInfo info = launchTemplate(options);
Expand Down
Loading

0 comments on commit 17881c5

Please sign in to comment.