diff --git a/cicd/cmd/run-it-smoke-tests/main.go b/cicd/cmd/run-it-smoke-tests/main.go index 752c734536..2911f4383c 100644 --- a/cicd/cmd/run-it-smoke-tests/main.go +++ b/cicd/cmd/run-it-smoke-tests/main.go @@ -67,9 +67,11 @@ func main() { flags.SpannerHost(), flags.FailureMode(), flags.RetryFailures(), - flags.StaticOracleInstance(), + flags.StaticOracleHost(), + flags.StaticOracleSysPassword(), flags.CloudProxyHost(), - flags.CloudProxyPort(), + flags.CloudProxyMySqlPort(), + flags.CloudProxyPostgresPort(), flags.CloudProxyPassword()) if err != nil { log.Fatalf("%v\n", err) diff --git a/cicd/cmd/run-it-tests/main.go b/cicd/cmd/run-it-tests/main.go index 8a36ff9ccf..45de079cfe 100644 --- a/cicd/cmd/run-it-tests/main.go +++ b/cicd/cmd/run-it-tests/main.go @@ -68,9 +68,11 @@ func main() { flags.SpannerHost(), flags.FailureMode(), flags.RetryFailures(), - flags.StaticOracleInstance(), + flags.StaticOracleHost(), + flags.StaticOracleSysPassword(), flags.CloudProxyHost(), - flags.CloudProxyPort(), + flags.CloudProxyMySqlPort(), + flags.CloudProxyPostgresPort(), flags.CloudProxyPassword()) if err != nil { log.Fatalf("%v\n", err) diff --git a/cicd/cmd/run-load-tests/main.go b/cicd/cmd/run-load-tests/main.go index ca38e5dafd..379fc2eb90 100644 --- a/cicd/cmd/run-load-tests/main.go +++ b/cicd/cmd/run-load-tests/main.go @@ -67,9 +67,11 @@ func main() { flags.ExportProject(), flags.ExportDataset(), flags.ExportTable(), - flags.StaticOracleInstance(), + flags.StaticOracleHost(), + flags.StaticOracleSysPassword(), flags.CloudProxyHost(), - flags.CloudProxyPort(), + flags.CloudProxyMySqlPort(), + flags.CloudProxyPostgresPort(), flags.CloudProxyPassword()) if err != nil { log.Fatalf("%v\n", err) diff --git a/cicd/internal/flags/it-flags.go b/cicd/internal/flags/it-flags.go index f8a54e867e..cceac0b924 100644 --- a/cicd/internal/flags/it-flags.go +++ b/cicd/internal/flags/it-flags.go @@ -24,19 +24,21 @@ import ( // Avoid making these vars public. var ( - dRegion string - dProject string - dArtifactBucket string - dStageBucket string - dHostIp string - dPrivateConnectivity string - dSpannerHost string - dReleaseMode bool - dRetryFailures string - dCloudProxyHost string - dCloudProxyPort string - dCloudProxyPassword string - dOracleInstance string + dRegion string + dProject string + dArtifactBucket string + dStageBucket string + dHostIp string + dPrivateConnectivity string + dSpannerHost string + dReleaseMode bool + dRetryFailures string + dCloudProxyHost string + dCloudProxyMySqlPort string + dCloudProxyPostgresPort string + dCloudProxyPassword string + dOracleHost string + dCloudOracleSysPassword string ) // Registers all common flags. Must be called before flag.Parse(). @@ -51,9 +53,11 @@ func RegisterItFlags() { flag.BoolVar(&dReleaseMode, "it-release", false, "(optional) Set if tests are being executed for a release") flag.StringVar(&dRetryFailures, "it-retry-failures", "0", "Number of retries attempts for failing tests") flag.StringVar(&dCloudProxyHost, "it-cloud-proxy-host", "10.128.0.34", "Hostname or IP address of static Cloud Auth Proxy") - flag.StringVar(&dCloudProxyPort, "it-cloud-proxy-port", "33134", "Port number of static Cloud Auth Proxy") + flag.StringVar(&dCloudProxyMySqlPort, "it-cloud-proxy-mysql-port", "33134", "MySql port number on static Cloud Auth Proxy") + flag.StringVar(&dCloudProxyPostgresPort, "it-cloud-proxy-postgres-port", "33136", "Postgres port number on static Cloud Auth Proxy") flag.StringVar(&dCloudProxyPassword, "it-cloud-proxy-password", "t>5xl%J(&qTK6?FaZ", "Password of static Cloud Auth Proxy") - flag.StringVar(&dOracleInstance, "it-oracle-host", "10.128.0.90", "Hostname or IP address of static Oracle DB") + flag.StringVar(&dOracleHost, "it-oracle-host", "10.128.0.90", "Hostname or IP address of static Oracle DB") + flag.StringVar(&dCloudOracleSysPassword, "it-oracle-sys-password", "oracle", "sys password of static Oracle DB") } func Region() string { @@ -119,14 +123,22 @@ func CloudProxyHost() string { return "-DcloudProxyHost=" + dCloudProxyHost } -func CloudProxyPort() string { - return "-DcloudProxyPort=" + dCloudProxyPort +func CloudProxyMySqlPort() string { + return "-DcloudProxyMySqlPort=" + dCloudProxyMySqlPort +} + +func CloudProxyPostgresPort() string { + return "-DcloudProxyPostgresPort=" + dCloudProxyPostgresPort } func CloudProxyPassword() string { return "-DcloudProxyPassword=" + dCloudProxyPassword } -func StaticOracleInstance() string { - return "-DcloudOracleHost=" + dOracleInstance +func StaticOracleHost() string { + return "-DcloudOracleHost=" + dOracleHost +} + +func StaticOracleSysPassword() string { + return "-DcloudOracleSysPassword=" + dCloudOracleSysPassword } diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudMySQLResourceManager.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudMySQLResourceManager.java index b32cd6c820..37031c7863 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudMySQLResourceManager.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudMySQLResourceManager.java @@ -18,6 +18,8 @@ package org.apache.beam.it.gcp.cloudsql; import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Custom class for the MySQL implementation of {@link CloudSqlResourceManager} abstract class. @@ -29,6 +31,8 @@ */ public class CloudMySQLResourceManager extends CloudSqlResourceManager { + private static final Logger LOG = LoggerFactory.getLogger(CloudMySQLResourceManager.class); + private CloudMySQLResourceManager(Builder builder) { super(builder); } @@ -49,6 +53,15 @@ public Builder(String testId) { super(testId); } + @Override + protected void configurePort() { + if (System.getProperty("cloudProxyMySqlPort") != null) { + this.setPort(Integer.parseInt(System.getProperty("cloudProxyMySqlPort"))); + } else { + LOG.warn("Missing -DcloudProxyMySqlPort."); + } + } + @Override public @NonNull CloudMySQLResourceManager build() { return new CloudMySQLResourceManager(this); diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudOracleResourceManager.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudOracleResourceManager.java index 448d40c2f7..1e5081a77b 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudOracleResourceManager.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudOracleResourceManager.java @@ -36,11 +36,16 @@ public class CloudOracleResourceManager extends CloudSqlResourceManager { private static final Logger LOG = LoggerFactory.getLogger(CloudOracleResourceManager.class); + private static final String DEFAULT_SYSTEM_IDENTIFIER = "xe"; + private static final int DEFAULT_ORACLE_PORT = 1521; - private CloudOracleResourceManager(Builder builder) { + private String systemIdentifier; + + protected CloudOracleResourceManager(Builder builder) { super(builder); + this.systemIdentifier = builder.systemIdentifier; System.setProperty("oracle.jdbc.timezoneAsRegion", "false"); } @@ -65,27 +70,65 @@ public static Builder builder(String testId) { return "SELECT * FROM " + tableName + " WHERE ROWNUM <= 1"; } + /** + * Return the SID of the connected DB. + * + * @return the SID. + */ + public String getSystemIdentifier() { + return this.systemIdentifier; + } + /** Builder for {@link CloudOracleResourceManager}. */ public static final class Builder extends CloudSqlResourceManager.Builder { + private String systemIdentifier; + public Builder(String testId) { super(testId); - this.setDatabaseName("xe"); - this.setPort(DEFAULT_ORACLE_PORT); - - // Currently only supports static Oracle instance on GCE - this.maybeUseStaticInstance(); + this.setSystemIdentifier(DEFAULT_SYSTEM_IDENTIFIER); + this.setDatabaseName(this.systemIdentifier); } - public Builder maybeUseStaticInstance() { + @Override + protected void configureHost() { if (System.getProperty("cloudOracleHost") != null) { this.setHost(System.getProperty("cloudOracleHost")); } else { LOG.warn("Missing -DcloudOracleHost."); } - this.useStaticContainer(); + } + + @Override + protected void configurePort() { + if (System.getProperty("cloudOraclePort") != null) { + this.setPort(Integer.parseInt(System.getProperty("cloudOraclePort"))); + } else { + this.setPort(DEFAULT_ORACLE_PORT); + } + } + + @Override + protected void configureUsername() { + if (System.getProperty("cloudOracleUsername") != null) { + this.setUsername(System.getProperty("cloudOracleUsername")); + } else { + super.configureUsername(); + } + } + + @Override + protected void configurePassword() { + if (System.getProperty("cloudOraclePassword") != null) { + this.setPassword(System.getProperty("cloudOraclePassword")); + } else { + super.configurePassword(); + } + } + public Builder setSystemIdentifier(String systemIdentifier) { + this.systemIdentifier = systemIdentifier; return this; } diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudPostgresResourceManager.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudPostgresResourceManager.java new file mode 100644 index 0000000000..756ce8d23d --- /dev/null +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudPostgresResourceManager.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.gcp.cloudsql; + +import java.util.List; +import java.util.Map; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Custom class for the Postgres implementation of {@link CloudSqlResourceManager} abstract class. + * + *

The class supports one database, and multiple tables per database object. A database is * + * created when the container first spins up, if one is not given. + * + *

A schema will also be created with the same name as the DB, unless one is manually set in the + * builder. + * + *

The class is thread-safe. + */ +public class CloudPostgresResourceManager extends CloudSqlResourceManager { + + private static final Logger LOG = LoggerFactory.getLogger(CloudPostgresResourceManager.class); + + private static final String DEFAULT_POSTGRES_USERNAME = "postgres"; + + private String pgSchema; + private boolean createdSchema; + + private CloudPostgresResourceManager(Builder builder) { + super(builder); + this.pgSchema = builder.schema; + this.createdSchema = false; + + // Set schema set by builder or default to use same name as database + if (this.pgSchema == null) { + this.pgSchema = databaseName; + LOG.info("Creating Postgres schema {}.", this.pgSchema); + runSQLUpdate(String.format("CREATE SCHEMA IF NOT EXISTS %s", this.pgSchema)); + this.createdSchema = true; + } + } + + public static Builder builder(String testId) { + return new Builder(testId); + } + + public String getSchema() { + return this.pgSchema; + } + + public String getFullTableName(String tableName) { + return String.format("%s.%s", pgSchema, tableName); + } + + @Override + public @NonNull String getJDBCPrefix() { + return "postgresql"; + } + + @Override + public boolean createTable(@NonNull String tableName, @NonNull JDBCSchema schema) { + boolean status = super.createTable(tableName, schema); + + // Set table schema + runSQLUpdate(String.format("ALTER TABLE %s SET SCHEMA %s", tableName, pgSchema)); + this.createdTables.remove(tableName); + this.createdTables.add(tableName); + + return status; + } + + @Override + public void dropTable(@NonNull String tableName) { + super.dropTable(getFullTableName(tableName)); + } + + @Override + public boolean write(String tableName, List> rows) { + return super.write(getFullTableName(tableName), rows); + } + + @Override + public List> readTable(String tableName) { + return super.readTable(getFullTableName(tableName)); + } + + @Override + public synchronized List getTableSchema(String tableName) { + return super.getTableSchema(getFullTableName(tableName)); + } + + @Override + public synchronized long getRowCount(String tableName) { + return super.getRowCount(getFullTableName(tableName)); + } + + @Override + public void dropDatabase(@NonNull String databaseName) { + LOG.info("Dropping database using databaseName '{}'.", databaseName); + + this.createdDatabase = false; + runSQLUpdate(String.format("DROP DATABASE %s WITH (force)", databaseName)); + + LOG.info("Successfully dropped database {}", databaseName); + } + + @Override + public void cleanupAll() { + // Cleanup table schema if using a static DB with non-static tables + if (this.usingCustomDb && this.createdSchema) { + LOG.info("Attempting to drop Postgres schema {}.", this.pgSchema); + try { + runSQLUpdate(String.format("DROP SCHEMA %s CASCADE", this.pgSchema)); + LOG.info("Postgres schema successfully cleaned up."); + } catch (Exception e) { + throw new CloudSqlResourceManagerException("Failed to drop Postgres schema.", e); + } + } else { + LOG.info("Not dropping pre-configured Postgres schema {}.", this.pgSchema); + } + super.cleanupAll(); + } + + /** Builder for {@link CloudPostgresResourceManager}. */ + public static final class Builder extends CloudSqlResourceManager.Builder { + + private String schema; + + public Builder(String testId) { + super(testId); + } + + /** + * Set the table schema for all the tables created by the resource manager. This schema will not + * be cleaned up, but tables created by this resource manager will be. + * + * @param schema name of schema to user for tables. + * @return this builder. + */ + public Builder setSchema(String schema) { + this.schema = schema; + return this; + } + + @Override + protected String getDefaultUsername() { + return DEFAULT_POSTGRES_USERNAME; + } + + @Override + protected void configurePort() { + if (System.getProperty("cloudProxyPostgresPort") != null) { + this.setPort(Integer.parseInt(System.getProperty("cloudProxyPostgresPort"))); + } else { + LOG.warn("Missing -DcloudProxyPostgresPort."); + } + } + + @Override + public @NonNull CloudPostgresResourceManager build() { + return new CloudPostgresResourceManager(this); + } + } +} diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudSqlResourceManager.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudSqlResourceManager.java index 6e3dd23642..6d41a79233 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudSqlResourceManager.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudSqlResourceManager.java @@ -42,8 +42,8 @@ public abstract class CloudSqlResourceManager private static final Logger LOG = LoggerFactory.getLogger(CloudSqlResourceManager.class); protected final List createdTables; - private boolean createdDatabase; - private boolean usingCustomDb; + protected boolean createdDatabase; + protected boolean usingCustomDb; protected CloudSqlResourceManager(@NonNull Builder builder) { super(CloudSqlContainer.of(), builder); @@ -161,10 +161,24 @@ public Builder(String testId) { this.usingCustomDb = false; // Currently only supports static CloudSQL instance with static Cloud Auth Proxy - this.maybeUseStaticCloudProxy(); + this.maybeUseStaticInstance(); } - public Builder maybeUseStaticCloudProxy() { + public Builder maybeUseStaticInstance() { + this.configureHost(); + this.configurePort(); + this.configureUsername(); + this.configurePassword(); + this.useStaticContainer(); + + return this; + } + + protected String getDefaultUsername() { + return DEFAULT_JDBC_USERNAME; + } + + protected void configureHost() { if (System.getProperty("cloudProxyHost") != null) { this.setHost(System.getProperty("cloudProxyHost")); } else { @@ -175,6 +189,20 @@ public Builder maybeUseStaticCloudProxy() { } else { LOG.warn("Missing -DcloudProxyPort."); } + } + + protected abstract void configurePort(); + + protected void configureUsername() { + if (System.getProperty("cloudProxyUsername") != null) { + this.setUsername(System.getProperty("cloudProxyUsername")); + } else { + LOG.info("-DcloudProxyUsername not specified, using default: " + getDefaultUsername()); + this.setUsername(getDefaultUsername()); + } + } + + protected void configurePassword() { if (System.getProperty("cloudProxyPassword") != null) { this.setPassword(System.getProperty("cloudProxyPassword")); } else { @@ -186,8 +214,6 @@ public Builder maybeUseStaticCloudProxy() { LOG.info("-DcloudProxyUsername not specified, using default: " + DEFAULT_JDBC_USERNAME); } this.useStaticContainer(); - - return this; } @Override diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/cloudsql/CloudSqlResourceManagerTest.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/cloudsql/CloudSqlResourceManagerTest.java index db0be86223..1b8bbd0017 100644 --- a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/cloudsql/CloudSqlResourceManagerTest.java +++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/cloudsql/CloudSqlResourceManagerTest.java @@ -88,6 +88,11 @@ private MockCloudSqlResourceManager createTestManager(boolean useCustomDb) { public @NonNull CloudSqlResourceManager build() { return new MockCloudSqlResourceManager(this); } + + @Override + protected void configurePort() { + this.setPort(1234); + } }; if (useCustomDb) { diff --git a/v2/datastream-to-sql/pom.xml b/v2/datastream-to-sql/pom.xml index 63e3b6f15d..4c84747de7 100644 --- a/v2/datastream-to-sql/pom.xml +++ b/v2/datastream-to-sql/pom.xml @@ -61,6 +61,30 @@ ${truth.version} test + + com.google.cloud.teleport + it-google-cloud-platform + ${project.version} + test + + + com.google.cloud.teleport + it-conditions + ${project.version} + test + + + com.google.cloud.teleport + it-jdbc + ${project.version} + test + + + com.oracle.database.jdbc + ojdbc8 + ${ojdbc8.version} + test + diff --git a/v2/datastream-to-sql/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSQLIT.java b/v2/datastream-to-sql/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSQLIT.java new file mode 100644 index 0000000000..a88f7be31c --- /dev/null +++ b/v2/datastream-to-sql/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSQLIT.java @@ -0,0 +1,471 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates; + +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatRecords; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; + +import com.google.cloud.datastream.v1.DestinationConfig; +import com.google.cloud.datastream.v1.SourceConfig; +import com.google.cloud.datastream.v1.Stream; +import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; +import com.google.cloud.teleport.metadata.TemplateIntegrationTest; +import com.google.common.base.Strings; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.function.Function; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.utils.PipelineUtils; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.conditions.ChainedConditionCheck; +import org.apache.beam.it.conditions.ConditionCheck; +import org.apache.beam.it.gcp.TemplateTestBase; +import org.apache.beam.it.gcp.cloudsql.CloudMySQLResourceManager; +import org.apache.beam.it.gcp.cloudsql.CloudOracleResourceManager; +import org.apache.beam.it.gcp.cloudsql.CloudPostgresResourceManager; +import org.apache.beam.it.gcp.cloudsql.CloudSqlResourceManager; +import org.apache.beam.it.gcp.datastream.DatastreamResourceManager; +import org.apache.beam.it.gcp.datastream.JDBCSource; +import org.apache.beam.it.gcp.datastream.OracleSource; +import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; +import org.apache.beam.it.jdbc.JDBCResourceManager; +import org.apache.beam.it.jdbc.conditions.JDBCRowsCheck; +import org.apache.commons.lang3.RandomStringUtils; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) +@TemplateIntegrationTest(DataStreamToSQL.class) +@RunWith(JUnit4.class) +public class DataStreamToSQLIT extends TemplateTestBase { + + enum JDBCType { + MYSQL, + ORACLE, + POSTGRES + } + + private static final int NUM_EVENTS = 10; + + private static final String ROW_ID = "row_id"; + private static final String NAME = "name"; + private static final String AGE = "age"; + private static final String MEMBER = "member"; + private static final String ENTRY_ADDED = "entry_added"; + private static final List COLUMNS = List.of(ROW_ID, NAME, AGE, MEMBER, ENTRY_ADDED); + + private String gcsPrefix; + private String dlqGcsPrefix; + + private CloudOracleResourceManager cloudOracleSysUser; + private CloudSqlResourceManager cloudSqlSourceResourceManager; + private CloudSqlResourceManager cloudSqlDestinationResourceManager; + private PubsubResourceManager pubsubResourceManager; + private DatastreamResourceManager datastreamResourceManager; + + @Before + public void setUp() throws IOException { + datastreamResourceManager = + DatastreamResourceManager.builder(testName, PROJECT, REGION) + .setCredentialsProvider(credentialsProvider) + .setPrivateConnectivity("datastream-private-connect-us-central1") + .build(); + + String password = System.getProperty("cloudOracleSysPassword"); + if (Strings.isNullOrEmpty(password)) { + throw new IllegalStateException("Missing -DcloudOracleSysPassword"); + } + cloudOracleSysUser = + (CloudOracleResourceManager) + CloudOracleResourceManager.builder(testName) + .setUsername("sys as sysdba") + .setPassword(password) + .build(); + + gcsPrefix = getGcsPath(testName + "/cdc/").replace("gs://" + artifactBucketName, ""); + dlqGcsPrefix = getGcsPath(testName + "/dlq/").replace("gs://" + artifactBucketName, ""); + } + + @After + public void cleanUp() { + ResourceManagerUtils.cleanResources( + cloudOracleSysUser, + cloudSqlSourceResourceManager, + cloudSqlDestinationResourceManager, + pubsubResourceManager, + datastreamResourceManager); + } + + @Test + public void testDataStreamOracleToMySqlJson() throws IOException { + // Run a simple IT + simpleOracleToJdbcTest(JDBCType.MYSQL, Function.identity()); + } + + @Test + public void testDataStreamOracleToPostgresJson() throws IOException { + // Run a simple IT + simpleOracleToJdbcTest(JDBCType.POSTGRES, Function.identity()); + } + + @Test + public void testDataStreamOracleToMySqlJsonGCSNotifications() throws IOException { + // Set up pubsub notifications + SubscriptionName subscriptionName = createGcsNotifications(); + + // Run a simple IT + simpleOracleToJdbcTest( + JDBCType.MYSQL, + config -> config.addParameter("gcsPubSubSubscription", subscriptionName.toString())); + } + + @Test + public void testDataStreamOracleToPostgresJsonGCSNotifications() throws IOException { + // Set up pubsub notifications + SubscriptionName subscriptionName = createGcsNotifications(); + + // Run a simple IT + simpleOracleToJdbcTest( + JDBCType.POSTGRES, + config -> config.addParameter("gcsPubSubSubscription", subscriptionName.toString())); + } + + private void simpleOracleToJdbcTest( + JDBCType destJdbcType, + Function + paramsAdder) + throws IOException { + + // Create destination JDBC Resource manager + cloudSqlDestinationResourceManager = + destJdbcType.equals(JDBCType.MYSQL) + ? CloudMySQLResourceManager.builder(testName).build() + : CloudPostgresResourceManager.builder(testName).build(); + + // Since test uses Oracle XE, schemas are mapped to users. The following code + // creates a new Oracle user and grants all necessary permissions needed by + // Datastream. + String oracleUser = cloudSqlDestinationResourceManager.getDatabaseName(); + String oraclePassword = System.getProperty("cloudProxyPassword"); + setUpOracleUser(oracleUser, oraclePassword); + cloudSqlSourceResourceManager = + (CloudSqlResourceManager) + CloudOracleResourceManager.builder(testName) + .setUsername(oracleUser) + .setPassword(oraclePassword) + .build(); + + // Create source JDBC table + String tableName = "oracletosql_" + RandomStringUtils.randomAlphanumeric(5).toLowerCase(); + cloudSqlSourceResourceManager.createTable(tableName, createJdbcSchema(JDBCType.ORACLE)); + + // Create destination JDBC table + cloudSqlDestinationResourceManager.createTable(tableName, createJdbcSchema(destJdbcType)); + + // Create JDBC source + JDBCSource jdbcSource = + OracleSource.builder( + cloudSqlSourceResourceManager.getHost(), + cloudSqlSourceResourceManager.getUsername(), + cloudSqlSourceResourceManager.getPassword(), + cloudSqlSourceResourceManager.getPort(), + cloudSqlSourceResourceManager.getDatabaseName()) + .setAllowedTables( + Map.of( + cloudSqlSourceResourceManager.getUsername().toUpperCase(), + List.of(tableName.toUpperCase()))) + .build(); + + // Create Datastream JDBC Source Connection profile and config + SourceConfig sourceConfig = + datastreamResourceManager.buildJDBCSourceConfig("oracle-profile", jdbcSource); + + // Create Datastream GCS Destination Connection profile and config + DestinationConfig destinationConfig = + datastreamResourceManager.buildGCSDestinationConfig( + "gcs-profile", + artifactBucketName, + gcsPrefix, + DatastreamResourceManager.DestinationOutputFormat.JSON_FILE_FORMAT); + + // Create and start Datastream stream + Stream stream = + datastreamResourceManager.createStream("stream1", sourceConfig, destinationConfig); + datastreamResourceManager.startStream(stream); + + // Construct template + String jobName = PipelineUtils.createJobName(testName); + PipelineLauncher.LaunchConfig.Builder options = + paramsAdder + .apply( + PipelineLauncher.LaunchConfig.builder(jobName, specPath) + .addParameter("inputFilePattern", getGcsPath(testName) + "/cdc/") + .addParameter("streamName", stream.getName()) + .addParameter("inputFileFormat", "json")) + .addParameter( + "databaseType", destJdbcType.equals(JDBCType.MYSQL) ? "mysql" : "postgres") + .addParameter("databaseName", cloudSqlDestinationResourceManager.getDatabaseName()) + .addParameter("databaseHost", cloudSqlDestinationResourceManager.getHost()) + .addParameter( + "databasePort", String.valueOf(cloudSqlDestinationResourceManager.getPort())) + .addParameter("databaseUser", cloudSqlDestinationResourceManager.getUsername()) + .addParameter("databasePassword", cloudSqlDestinationResourceManager.getPassword()); + + // Act + PipelineLauncher.LaunchInfo info = launchTemplate(options); + assertThatPipeline(info).isRunning(); + + // Construct a ChainedConditionCheck with 4 stages. + // 1. Send initial wave of events to JDBC + // 2. Wait on BigQuery to merge events from staging to destination + // 3. Send wave of mutations to JDBC + // 4. Wait on BigQuery to merge second wave of events + Map>> cdcEvents = new HashMap<>(); + ChainedConditionCheck conditionCheck = + ChainedConditionCheck.builder( + List.of( + writeJdbcData(tableName, cdcEvents), + JDBCRowsCheck.builder(cloudSqlDestinationResourceManager, tableName) + .setMinRows(NUM_EVENTS) + .build(), + changeJdbcData(tableName, cdcEvents), + checkDestinationRows(tableName, cdcEvents))) + .build(); + + // Job needs to be cancelled as draining will time out + PipelineOperator.Result result = + pipelineOperator() + .waitForConditionAndCancel(createConfig(info, Duration.ofMinutes(20)), conditionCheck); + + // Assert + checkJdbcTable(tableName, cdcEvents); + assertThatResult(result).meetsConditions(); + } + + /** + * Helper function for constructing a ConditionCheck whose check() method constructs the initial + * rows of data in the JDBC database according to the common schema for the IT's in this class. + * + * @return A ConditionCheck containing the JDBC write operation. + */ + private ConditionCheck writeJdbcData( + String tableName, Map>> cdcEvents) { + return new ConditionCheck() { + @Override + protected @NonNull String getDescription() { + return "Send initial JDBC events."; + } + + @Override + protected @NonNull CheckResult check() { + List> rows = new ArrayList<>(); + for (int i = 0; i < NUM_EVENTS; i++) { + Map values = new HashMap<>(); + values.put(COLUMNS.get(0), i); + values.put(COLUMNS.get(1), RandomStringUtils.randomAlphabetic(10).toLowerCase()); + values.put(COLUMNS.get(2), new Random().nextInt(100)); + values.put(COLUMNS.get(3), new Random().nextInt() % 2 == 0 ? "Y" : "N"); + values.put(COLUMNS.get(4), Instant.now().toString()); + rows.add(values); + } + + // Force log file archive - needed so Datastream can see changes which are read from + // archived log files. + boolean success = cloudSqlSourceResourceManager.write(tableName, rows); + cloudSqlSourceResourceManager.runSQLUpdate("ALTER SYSTEM SWITCH LOGFILE"); + + cdcEvents.put(tableName, rows); + return new CheckResult( + success, String.format("Sent %d rows to %s.", rows.size(), tableName)); + } + }; + } + + /** + * Helper function for constructing a ConditionCheck whose check() method changes rows of data in + * the JDBC database according to the common schema for the IT's in this class. Half the rows are + * mutated and half are removed completely. + * + * @return A ConditionCheck containing the JDBC mutate operation. + */ + private ConditionCheck changeJdbcData( + String tableName, Map>> cdcEvents) { + return new ConditionCheck() { + @Override + protected @NonNull String getDescription() { + return "Send JDBC changes."; + } + + @Override + protected @NonNull CheckResult check() { + List> newCdcEvents = new ArrayList<>(); + for (int i = 0; i < NUM_EVENTS; i++) { + if (i % 2 == 0) { + Map values = cdcEvents.get(tableName).get(i); + values.put(COLUMNS.get(2), new Random().nextInt(100)); + values.put( + COLUMNS.get(3), + (Objects.equals(values.get(COLUMNS.get(3)).toString(), "Y") ? "N" : "Y")); + + String updateSql = + "UPDATE " + + tableName + + " SET " + + COLUMNS.get(2) + + " = " + + values.get(COLUMNS.get(2)) + + "," + + COLUMNS.get(3) + + " = '" + + values.get(COLUMNS.get(3)) + + "'" + + " WHERE " + + COLUMNS.get(0) + + " = " + + i; + cloudSqlSourceResourceManager.runSQLUpdate(updateSql); + newCdcEvents.add(values); + } else { + cloudSqlSourceResourceManager.runSQLUpdate( + "DELETE FROM " + tableName + " WHERE " + COLUMNS.get(0) + "=" + i); + } + } + + // Force log file archive - needed so Datastream can see changes which are read from + // archived log files. + cloudSqlSourceResourceManager.runSQLUpdate("ALTER SYSTEM SWITCH LOGFILE"); + + cdcEvents.put(tableName, newCdcEvents); + return new CheckResult( + true, String.format("Sent %d changes to %s.", newCdcEvents.size(), tableName)); + } + }; + } + + /** + * Helper function for constructing a ConditionCheck whose check() method checks the rows in the + * destination BigQuery database for specific rows. + * + * @return A ConditionCheck containing the check operation. + */ + private ConditionCheck checkDestinationRows( + String tableName, Map>> cdcEvents) { + return new ConditionCheck() { + @Override + protected @NonNull String getDescription() { + return "Check JDBC rows."; + } + + @Override + protected @NonNull CheckResult check() { + // First, check that correct number of rows were deleted. + long totalRows = cloudSqlDestinationResourceManager.getRowCount(tableName); + long maxRows = cdcEvents.get(tableName).size(); + if (totalRows > maxRows) { + return new CheckResult( + false, String.format("Expected up to %d rows but found %d", maxRows, totalRows)); + } + + // Next, make sure in-place mutations were applied. + try { + checkJdbcTable(tableName, cdcEvents); + return new CheckResult(true, "JDBC table contains expected rows."); + } catch (AssertionError error) { + return new CheckResult(false, "JDBC table does not contain expected rows."); + } + } + }; + } + + /** Helper function for checking the rows of the destination JDBC tables. */ + private void checkJdbcTable(String tableName, Map>> cdcEvents) { + + assertThatRecords(cloudSqlDestinationResourceManager.readTable(tableName)) + .hasRecordsUnorderedCaseInsensitiveColumns(cdcEvents.get(tableName)); + } + + private JDBCResourceManager.JDBCSchema createJdbcSchema(JDBCType jdbcType) { + // Arrange MySQL-compatible schema + HashMap columns = new HashMap<>(); + String numericDataType = jdbcType.equals(JDBCType.ORACLE) ? "NUMBER" : "NUMERIC"; + columns.put(ROW_ID, numericDataType + " NOT NULL"); + columns.put(NAME, "VARCHAR(200)"); + columns.put(AGE, numericDataType); + columns.put(MEMBER, "VARCHAR(200)"); + columns.put(ENTRY_ADDED, "VARCHAR(200)"); + return new JDBCResourceManager.JDBCSchema(columns, ROW_ID); + } + + private SubscriptionName createGcsNotifications() throws IOException { + // Instantiate pubsub resource manager for notifications + pubsubResourceManager = + PubsubResourceManager.builder(testName, PROJECT, credentialsProvider).build(); + + // Create pubsub notifications + TopicName topic = pubsubResourceManager.createTopic("it"); + TopicName dlqTopic = pubsubResourceManager.createTopic("dlq"); + SubscriptionName subscription = pubsubResourceManager.createSubscription(topic, "it-sub"); + + gcsClient.createNotification(topic.toString(), gcsPrefix.substring(1)); + gcsClient.createNotification(dlqTopic.toString(), dlqGcsPrefix.substring(1)); + + return subscription; + } + + /** + * Helper method for granting all the permissions to a user required by Datastream. + * + * @param user the user that will be given to Datastream + * @param password the password for the given user. + */ + private void setUpOracleUser(String user, String password) { + cloudOracleSysUser.runSQLUpdate( + String.format("CREATE USER %s IDENTIFIED BY \"%s\"", user, password)); + cloudOracleSysUser.runSQLUpdate(String.format("GRANT EXECUTE_CATALOG_ROLE TO %s", user)); + cloudOracleSysUser.runSQLUpdate(String.format("GRANT CONNECT TO %s", user)); + cloudOracleSysUser.runSQLUpdate(String.format("GRANT CREATE SESSION TO %s", user)); + cloudOracleSysUser.runSQLUpdate(String.format("GRANT SELECT ON SYS.V_$DATABASE TO %s", user)); + cloudOracleSysUser.runSQLUpdate( + String.format("GRANT SELECT ON SYS.V_$ARCHIVED_LOG TO %s", user)); + cloudOracleSysUser.runSQLUpdate( + String.format("GRANT SELECT ON SYS.V_$LOGMNR_CONTENTS TO %s", user)); + cloudOracleSysUser.runSQLUpdate(String.format("GRANT EXECUTE ON DBMS_LOGMNR TO %s", user)); + cloudOracleSysUser.runSQLUpdate(String.format("GRANT EXECUTE ON DBMS_LOGMNR_D TO %s", user)); + cloudOracleSysUser.runSQLUpdate(String.format("GRANT SELECT ANY TRANSACTION TO %s", user)); + cloudOracleSysUser.runSQLUpdate(String.format("GRANT SELECT ANY TABLE TO %s", user)); + cloudOracleSysUser.runSQLUpdate(String.format("GRANT SELECT ON DBA_EXTENTS TO %s", user)); + + cloudOracleSysUser.runSQLUpdate(String.format("GRANT CREATE ANY TABLE TO %s", user)); + cloudOracleSysUser.runSQLUpdate(String.format("ALTER USER %s QUOTA 50m ON SYSTEM", user)); + cloudOracleSysUser.runSQLUpdate(String.format("GRANT ALTER SYSTEM TO %s", user)); + } +}