Skip to content

Commit

Permalink
Rollback YamlTemplate to 2.57 (#1829)
Browse files Browse the repository at this point in the history
Signed-off-by: Jeffrey Kinard <[email protected]>
  • Loading branch information
Polber committed Aug 31, 2024
1 parent e58646a commit 97fd440
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ RUN if ! [ -f requirements.txt ] ; then echo "$BEAM_PACKAGE" > requirements.txt

# Install dependencies to launch the pipeline and download to reduce startup time
# Remove Jinja2 dependency once YAML templatization support is added to Beam
# TODO - remove `pip uninstall apache-beam` line when repo is upgraded to Beam 2.59.0
RUN python -m venv /venv \
&& /venv/bin/pip uninstall apache-beam -y \
&& /venv/bin/pip install --no-cache-dir --upgrade pip setuptools \
&& /venv/bin/pip install --no-cache-dir -U -r $REQUIREMENTS_FILE \
&& /venv/bin/pip install --no-cache-dir -U Jinja2 \
Expand Down
3 changes: 3 additions & 0 deletions python/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>

<!-- TODO - remove line when repo is upgraded to Beam 2.59.0 -->
<beam-python.version>2.57.0</beam-python.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates.python;

import static org.apache.beam.it.gcp.bigquery.matchers.BigQueryAsserts.assertThatBigQueryRecords;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;

import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
import org.apache.beam.it.common.PipelineOperator.Result;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.TemplateTestBase;
import org.apache.beam.it.gcp.bigquery.BigQueryResourceManager;
import org.apache.beam.it.gcp.bigquery.conditions.BigQueryRowsCheck;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.commons.lang3.RandomStringUtils;
import org.json.JSONObject;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Integration test for {@link YAMLTemplate} Flex template. */
@Category(TemplateIntegrationTest.class)
@TemplateIntegrationTest(YAMLTemplate.class)
@RunWith(JUnit4.class)
public class PubSubToBigQueryYamlIT extends TemplateTestBase {

private PubsubResourceManager pubsubResourceManager;
private BigQueryResourceManager bigQueryResourceManager;

private static final int MESSAGES_COUNT = 10;
private static final String YAML_PIPELINE = "PubSubToBigQueryYamlIT.yaml";
private static final String YAML_PIPELINE_GCS_PATH = "input/" + YAML_PIPELINE;

@Before
public void setUp() throws IOException {
pubsubResourceManager =
PubsubResourceManager.builder(testName, PROJECT, credentialsProvider).build();
bigQueryResourceManager =
BigQueryResourceManager.builder(testName, PROJECT, credentials).build();

gcsClient.createArtifact(YAML_PIPELINE_GCS_PATH, createSimpleYamlMessage());
}

@After
public void cleanUp() {
ResourceManagerUtils.cleanResources(pubsubResourceManager, bigQueryResourceManager);
}

@Test
public void testPubSubToBigQuery() throws IOException {
basePubSubToBigQuery(Function.identity()); // no extra parameters
}

private void basePubSubToBigQuery(
Function<LaunchConfig.Builder, LaunchConfig.Builder> paramsAdder) throws IOException {
// Arrange
List<Field> bqSchemaFields =
Arrays.asList(
Field.of("id", StandardSQLTypeName.INT64),
Field.of("job", StandardSQLTypeName.STRING),
Field.of("name", StandardSQLTypeName.STRING));
Schema bqSchema = Schema.of(bqSchemaFields);

String nameSuffix = RandomStringUtils.randomAlphanumeric(8);
TopicName topic = pubsubResourceManager.createTopic("input-" + nameSuffix);
bigQueryResourceManager.createDataset(REGION);
SubscriptionName subscription =
pubsubResourceManager.createSubscription(topic, "sub-1-" + nameSuffix);
TableId table = bigQueryResourceManager.createTable(testName, bqSchema);

LaunchConfig.Builder options =
paramsAdder.apply(
LaunchConfig.builder(testName, specPath)
.addParameter("yaml_pipeline_file", getGcsPath(YAML_PIPELINE_GCS_PATH))
.addParameter(
"jinja_variables",
String.format(
"{" + "\"SUBSCRIPTION\": \"%s\", " + "\"BQ_TABLE\": \"%s\"" + "}",
subscription.toString(), toTableSpecStandard(table))));

// Act
LaunchInfo info = launchTemplate(options);
assertThatPipeline(info).isRunning();

List<Map<String, Object>> expectedMessages = new ArrayList<>();
for (int i = 1; i <= MESSAGES_COUNT; i++) {
Map<String, Object> message = Map.of("id", i, "job", testName, "name", "message");
ByteString messageData = ByteString.copyFromUtf8(new JSONObject(message).toString());
pubsubResourceManager.publish(topic, ImmutableMap.of(), messageData);
expectedMessages.add(message);
}

Result result =
pipelineOperator()
.waitForConditionsAndFinish(
createConfig(info),
BigQueryRowsCheck.builder(bigQueryResourceManager, table)
.setMinRows(MESSAGES_COUNT)
.build());

// Assert
assertThatResult(result).meetsConditions();

TableResult records = bigQueryResourceManager.readTable(table);

// Make sure record can be read
assertThatBigQueryRecords(records).hasRecordsUnordered(expectedMessages);
}

private String createSimpleYamlMessage() throws IOException {
return Files.readString(Paths.get(Resources.getResource(YAML_PIPELINE).getPath()));
}
}
21 changes: 21 additions & 0 deletions python/src/test/resources/PubSubToBigQueryYamlIT.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
subscription: {{ SUBSCRIPTION }}
format: json
schema:
type: object
properties:
id: {type: integer}
job: {type: string}
name: {type: string}
- type: WriteToBigQuery
config:
table: {{ BQ_TABLE }}
windowing:
type: fixed
size: 5s
options:
streaming: true

0 comments on commit 97fd440

Please sign in to comment.