From 762ce0db7c03514e1d2db3290cc1799992b24a10 Mon Sep 17 00:00:00 2001 From: Julien Phalip Date: Thu, 25 Apr 2024 17:32:59 -0700 Subject: [PATCH 1/4] Fixes for Pig and HCatalog --- .../connector/output/PostInsertHook.java | 9 +- .../output/OutputCommitterUtils.java | 13 +- .../connector/utils/hive/HiveUtils.java | 8 +- .../acceptance/AcceptanceTestContext.java | 26 ++- .../DataprocAcceptanceTestBase.java | 156 ++++++++++++------ .../integration/IntegrationTestsBase.java | 1 - .../connector/utils/HiveUtilsTest.java | 5 +- .../acceptance/create_two_external_tables.sql | 17 ++ .../create_write_read_drop_managed_table.sql | 2 +- .../test/resources/acceptance/read_write.pig | 2 + 10 files changed, 159 insertions(+), 80 deletions(-) create mode 100644 hive-bigquery-connector-common/src/test/resources/acceptance/create_two_external_tables.sql create mode 100644 hive-bigquery-connector-common/src/test/resources/acceptance/read_write.pig diff --git a/hive-1-bigquery-connector/src/main/java/com/google/cloud/hive/bigquery/connector/output/PostInsertHook.java b/hive-1-bigquery-connector/src/main/java/com/google/cloud/hive/bigquery/connector/output/PostInsertHook.java index ed0e98a4..00f53a90 100644 --- a/hive-1-bigquery-connector/src/main/java/com/google/cloud/hive/bigquery/connector/output/PostInsertHook.java +++ b/hive-1-bigquery-connector/src/main/java/com/google/cloud/hive/bigquery/connector/output/PostInsertHook.java @@ -41,7 +41,14 @@ public void run(HookContext hookContext) throws Exception { .equals("com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler")) { String tableName = HiveUtils.getDbTableName(entity.getTable()); BigQueryMetaHook metahook = new BigQueryMetaHook(hookContext.getConf()); - metahook.commitInsertTable(tableName); + try { + metahook.commitInsertTable(tableName); + } catch (Exception e) { + // Clean things up + OutputCommitterUtils.abortJob(hookContext.getConf(), tableName); + // Re-throw the original exception + throw new RuntimeException(e); + } } } } diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/OutputCommitterUtils.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/OutputCommitterUtils.java index 27c0b7e7..434bfeeb 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/OutputCommitterUtils.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/OutputCommitterUtils.java @@ -28,15 +28,12 @@ public class OutputCommitterUtils { public static void commitJob(Configuration conf, JobDetails jobDetails) throws IOException { - try { - if (jobDetails.getWriteMethod().equals(HiveBigQueryConfig.WRITE_METHOD_DIRECT)) { - DirectOutputCommitter.commitJob(conf, jobDetails); - } else { - IndirectOutputCommitter.commitJob(conf, jobDetails); - } - } finally { - jobDetails.cleanUp(conf); + if (jobDetails.getWriteMethod().equals(HiveBigQueryConfig.WRITE_METHOD_DIRECT)) { + DirectOutputCommitter.commitJob(conf, jobDetails); + } else { + IndirectOutputCommitter.commitJob(conf, jobDetails); } + jobDetails.cleanUp(conf); } public static void commitJob(Configuration conf) throws IOException { diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/hive/HiveUtils.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/hive/HiveUtils.java index fa3fdf7b..57f34107 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/hive/HiveUtils.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/hive/HiveUtils.java @@ -65,11 +65,9 @@ public static String getQueryId(Configuration conf) { // In this case, the user is running a plain Hive query directly from Hive itself. return "hive-query-id-" + hiveQueryId; } - if (conf.get("pig.script.id") != null) { - // The user is running a Hive query from Pig. Use the job's timestamp as a pig script might - // run multiple jobs. - return String.format( - "pig-%s-%s", conf.get("pig.script.id"), conf.get("pig.job.submitted.timestamp")); + if (conf.get("mapreduce.workflow.id") != null) { + // Map reduce job, possibly from Pig + return String.format("mapreduce-%s", conf.get("mapreduce.workflow.id")); } throw new RuntimeException("No query id found in Hadoop conf"); } diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/acceptance/AcceptanceTestContext.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/acceptance/AcceptanceTestContext.java index 844a7cb7..886f7c4e 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/acceptance/AcceptanceTestContext.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/acceptance/AcceptanceTestContext.java @@ -24,7 +24,6 @@ public class AcceptanceTestContext { final String connectorInitActionUri; final String bqProject; final String bqDataset; - final String bqTable; public AcceptanceTestContext( String testId, @@ -34,8 +33,7 @@ public AcceptanceTestContext( String connectorJarUri, String connectorInitActionUri, String bqProject, - String bqDataset, - String bqTable) { + String bqDataset) { this.testId = testId; this.dataprocImageVersion = dataprocImageVersion; this.clusterId = clusterId; @@ -44,7 +42,6 @@ public AcceptanceTestContext( this.connectorInitActionUri = connectorInitActionUri; this.bqProject = bqProject; this.bqDataset = bqDataset; - this.bqTable = bqTable; } public String getFileUri(String testName, String filename) { @@ -57,16 +54,15 @@ public String getOutputDirUri(String testName) { @Override public String toString() { - return new StringBuilder() - .append("testId: " + testId + "\n") - .append("dataprocImageVersion: " + dataprocImageVersion + "\n") - .append("clusterId: " + clusterId + "\n") - .append("testBaseGcsDir: " + testBaseGcsDir + "\n") - .append("connectorJarUri: " + connectorJarUri + "\n") - .append("connectorInitActionUri: " + connectorInitActionUri + "\n") - .append("projectId: " + bqProject + "\n") - .append("bqDataset: " + bqDataset + "\n") - .append("bqTable: " + bqTable + "\n") - .toString(); + return String.join( + "\n", + "testId: " + testId, + "dataprocImageVersion: " + dataprocImageVersion, + "clusterId: " + clusterId, + "testBaseGcsDir: " + testBaseGcsDir, + "connectorJarUri: " + connectorJarUri, + "connectorInitActionUri: " + connectorInitActionUri, + "projectId: " + bqProject, + "bqDataset: " + bqDataset); } } diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/acceptance/DataprocAcceptanceTestBase.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/acceptance/DataprocAcceptanceTestBase.java index 8c8eeb59..c870e791 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/acceptance/DataprocAcceptanceTestBase.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/acceptance/DataprocAcceptanceTestBase.java @@ -32,12 +32,18 @@ import static com.google.cloud.hive.bigquery.connector.acceptance.AcceptanceTestUtils.uploadConnectorInitAction; import static com.google.cloud.hive.bigquery.connector.acceptance.AcceptanceTestUtils.uploadConnectorJar; import static com.google.common.truth.Truth.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.TableResult; import com.google.cloud.hive.bigquery.connector.TestUtils; import com.google.common.collect.ImmutableMap; -import java.time.Duration; +import com.google.common.collect.Streams; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.junit.Test; import test.hivebqcon.com.google.cloud.dataproc.v1.Cluster; import test.hivebqcon.com.google.cloud.dataproc.v1.ClusterConfig; @@ -53,6 +59,7 @@ import test.hivebqcon.com.google.cloud.dataproc.v1.JobPlacement; import test.hivebqcon.com.google.cloud.dataproc.v1.JobStatus; import test.hivebqcon.com.google.cloud.dataproc.v1.NodeInitializationAction; +import test.hivebqcon.com.google.cloud.dataproc.v1.PigJob; import test.hivebqcon.com.google.cloud.dataproc.v1.SoftwareConfig; public abstract class DataprocAcceptanceTestBase { @@ -70,8 +77,7 @@ protected static AcceptanceTestContext setup(String dataprocImageVersion) throws String connectorJarUri = testBaseGcsDir + "/connector.jar"; String connectorInitActionUri = testBaseGcsDir + "/connectors.sh"; String bqProject = TestUtils.getProject(); - String bqDataset = "hivebq_test_dataset_" + testId.replace("-", "_"); - String bqTable = "hivebq_test_table_" + testId.replace("-", "_"); + String bqDataset = "acceptance_dataset_" + testId.replace("-", "_"); uploadConnectorJar(CONNECTOR_JAR_DIRECTORY, CONNECTOR_JAR_PREFIX, connectorJarUri); @@ -91,8 +97,7 @@ protected static AcceptanceTestContext setup(String dataprocImageVersion) throws connectorJarUri, connectorInitActionUri, bqProject, - bqDataset, - bqTable); + bqDataset); System.out.print(testContext); return testContext; @@ -196,47 +201,74 @@ private static Cluster createClusterSpec( .build(); } - private Job createAndRunHiveJob( - String testName, String queryFile, String outputDirUri, Duration timeout) throws Exception { + private Job createAndRunHiveJob(String testName, String queryFile, String outputDirUri) + throws Exception { Job job = createHiveJob(testName, queryFile, outputDirUri); - System.out.print("Running job:\n" + job); - return runAndWait(testName, job, timeout); + System.out.print("Running hive job:\n" + job); + return runAndWait(testName, job); } - private Job createHiveJob(String testName, String queryFile, String outputTableDirUri) + private Job createAndRunPigJob(String testName, String queryFile, String outputDirUri) throws Exception { + Job job = createPigJob(testName, queryFile, outputDirUri); + System.out.print("Running pig job:\n" + job); + return runAndWait(testName, job); + } + + private Map getTestScriptVariables(String testName, String outputTableDirUri) { + return ImmutableMap.builder() + .put("BQ_PROJECT", context.bqProject) + .put("BQ_DATASET", context.bqDataset) + .put("HIVE_TEST_TABLE", testName.replaceAll("-", "_") + "_table") + .put("HIVE_OUTPUT_TABLE", testName.replaceAll("-", "_") + "_output") + .put("HIVE_OUTPUT_DIR_URI", outputTableDirUri) + .build(); + } + + private String uploadQueryFile(String testName, String queryFile) throws Exception { String queryFileUri = context.getFileUri(testName, queryFile); AcceptanceTestUtils.uploadResourceToGcs("/acceptance/" + queryFile, queryFileUri, "text/x-sql"); - ImmutableMap scriptVariables = - ImmutableMap.builder() - .put("BQ_PROJECT", context.bqProject) - .put("BQ_DATASET", context.bqDataset) - .put("BQ_TABLE", context.bqTable) - .put("HIVE_TEST_TABLE", testName.replaceAll("-", "_") + "_table") - .put("HIVE_OUTPUT_TABLE", testName.replaceAll("-", "_") + "_output") - .put("HIVE_OUTPUT_DIR_URI", outputTableDirUri) - .build(); + return queryFileUri; + } + + private Job createHiveJob(String testName, String queryFile, String outputTableDirUri) + throws Exception { + String queryFileUri = uploadQueryFile(testName, queryFile); HiveJob.Builder hiveJobBuilder = - HiveJob.newBuilder().setQueryFileUri(queryFileUri).putAllScriptVariables(scriptVariables); + HiveJob.newBuilder() + .setQueryFileUri(queryFileUri) + .putAllScriptVariables(getTestScriptVariables(testName, outputTableDirUri)); return Job.newBuilder() .setPlacement(JobPlacement.newBuilder().setClusterName(context.clusterId)) .setHiveJob(hiveJobBuilder) .build(); } - private Job runAndWait(String testName, Job job, Duration timeout) throws Exception { + private Job createPigJob(String testName, String queryFile, String outputTableDirUri) + throws Exception { + String queryFileUri = uploadQueryFile(testName, queryFile); + PigJob.Builder pigJobBuilder = + PigJob.newBuilder() + .setQueryFileUri(queryFileUri) + .putAllScriptVariables(getTestScriptVariables(testName, outputTableDirUri)); + return Job.newBuilder() + .setPlacement(JobPlacement.newBuilder().setClusterName(context.clusterId)) + .setPigJob(pigJobBuilder) + .build(); + } + + private Job runAndWait(String testName, Job job) throws Exception { try (JobControllerClient jobControllerClient = JobControllerClient.create( JobControllerSettings.newBuilder().setEndpoint(DATAPROC_ENDPOINT).build())) { Job request = jobControllerClient.submitJob(TestUtils.getProject(), REGION, job); String jobId = request.getReference().getJobId(); - System.err.println(String.format("%s job ID: %s", testName, jobId)); + System.err.printf("%s job ID: %s%n", testName, jobId); CompletableFuture finishedJobFuture = CompletableFuture.supplyAsync( () -> waitForJobCompletion(jobControllerClient, TestUtils.getProject(), REGION, jobId)); - Job jobInfo = finishedJobFuture.get(timeout.getSeconds(), TimeUnit.SECONDS); - return jobInfo; + return finishedJobFuture.get(ACCEPTANCE_TEST_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); } } @@ -271,40 +303,72 @@ void verifyJobSucceeded(Job job) throws Exception { } } - void verifyJobOutput(String outputDirUri, String expectedOutput) throws Exception { + void verifyGCSJobOutput(String outputDirUri, String expectedOutput) throws Exception { String output = readGcsFile(outputDirUri, "_0"); assertThat(output.trim()).isEqualTo(expectedOutput); } @Test - public void testHiveBq_managedTable_createWriteReadDrop_success() throws Exception { + public void testManagedTableCreateWriteReadDrop() throws Exception { String testName = "test-hivebq-managed"; String outputDirUri = context.getOutputDirUri(testName); - - Job result = - createAndRunHiveJob( - testName, - "create_write_read_drop_managed_table.sql", - outputDirUri, - Duration.ofSeconds(ACCEPTANCE_TEST_TIMEOUT_IN_SECONDS)); - - verifyJobSucceeded(result); - verifyJobOutput(outputDirUri, "345,world"); + Job job = + createAndRunHiveJob(testName, "create_write_read_drop_managed_table.sql", outputDirUri); + verifyJobSucceeded(job); + verifyGCSJobOutput(outputDirUri, "345,world"); } @Test - public void testHiveBq_externalTable_createReadDrop_success() throws Exception { + public void testExternalTable_CreateReadDrop() throws Exception { String testName = "test-hivebq-external"; String outputDirUri = context.getOutputDirUri(testName); + Job job = createAndRunHiveJob(testName, "create_read_drop_external_table.sql", outputDirUri); + verifyJobSucceeded(job); + verifyGCSJobOutput(outputDirUri, "king,1191"); + } - Job result = - createAndRunHiveJob( - testName, - "create_read_drop_external_table.sql", - outputDirUri, - Duration.ofSeconds(ACCEPTANCE_TEST_TIMEOUT_IN_SECONDS)); - - verifyJobSucceeded(result); - verifyJobOutput(outputDirUri, "king,1191"); + @Test + public void testPig() throws Exception { + String testName = "test-pig"; + String outputDirUri = context.getOutputDirUri(testName); + String sourceTable = testName.replaceAll("-", "_") + "_table"; + String destTable = testName.replaceAll("-", "_") + "_output"; + // Create the BQ tables + TestUtils.getBigqueryClient() + .query( + String.format( + "CREATE OR REPLACE TABLE `%s.%s` (%s)", + context.bqDataset, sourceTable, TestUtils.BIGQUERY_TEST_TABLE_DDL)); + TestUtils.getBigqueryClient() + .query( + String.format( + "CREATE OR REPLACE TABLE `%s.%s` (%s)", + context.bqDataset, destTable, TestUtils.BIGQUERY_TEST_TABLE_DDL)); + TestUtils.getBigqueryClient() + .query( + String.format( + "INSERT `%s.%s` VALUES (123, 'hello'), (789, 'abcd')", + context.bqDataset, sourceTable)); + // Create the external test Hive tables + Job hiveJob = createAndRunHiveJob(testName, "create_two_external_tables.sql", outputDirUri); + verifyJobSucceeded(hiveJob); + // Run a pig Job + Job pigJob = createAndRunPigJob(testName, "read_write.pig", outputDirUri); + verifyJobSucceeded(pigJob); + // Wait a bit to give a chance for the committed writes to become readable + Thread.sleep(60000); // 1 minute + // Read the data using the BQ SDK + TableResult result = + TestUtils.getBigqueryClient() + .query( + String.format( + "SELECT * FROM `%s.%s` ORDER BY number", context.bqDataset, destTable)); + // Verify we get the expected values + assertEquals(2, result.getTotalRows()); + List rows = Streams.stream(result.iterateAll()).collect(Collectors.toList()); + assertEquals(123L, rows.get(0).get(0).getLongValue()); + assertEquals("hello", rows.get(0).get(1).getStringValue()); + assertEquals(789L, rows.get(1).get(0).getLongValue()); + assertEquals("abcd", rows.get(1).get(1).getStringValue()); } } diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/IntegrationTestsBase.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/IntegrationTestsBase.java index d2decdca..591bf155 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/IntegrationTestsBase.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/IntegrationTestsBase.java @@ -316,7 +316,6 @@ public void initHive(String engine, String readDataFormat, String tempGcsPath) { System.getProperties() .setProperty( "fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"); // GCS Connector - System.getProperties().setProperty("datanucleus.autoStartMechanismMode", "ignored"); // This is needed to avoid an odd exception when running the tests with Tez and Hadoop 3. // Similar issue to what's described in https://issues.apache.org/jira/browse/HIVE-24734 diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/HiveUtilsTest.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/HiveUtilsTest.java index 506d7a42..043d76ce 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/HiveUtilsTest.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/HiveUtilsTest.java @@ -34,8 +34,7 @@ public void testHiveQueryId() { @Test public void testPigQueryId() { Configuration conf = new Configuration(); - conf.set("pig.script.id", "abcd"); - conf.set("pig.job.submitted.timestamp", "123456789"); - assertEquals("pig-abcd-123456789", HiveUtils.getQueryId(conf)); + conf.set("mapreduce.workflow.id", "abcd"); + assertEquals("mapreduce-abcd", HiveUtils.getQueryId(conf)); } } diff --git a/hive-bigquery-connector-common/src/test/resources/acceptance/create_two_external_tables.sql b/hive-bigquery-connector-common/src/test/resources/acceptance/create_two_external_tables.sql new file mode 100644 index 00000000..a200b437 --- /dev/null +++ b/hive-bigquery-connector-common/src/test/resources/acceptance/create_two_external_tables.sql @@ -0,0 +1,17 @@ +DROP TABLE IF EXISTS ${HIVE_TEST_TABLE}; +CREATE EXTERNAL TABLE ${HIVE_TEST_TABLE} ( + number INT, + text STRING) +STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler' +TBLPROPERTIES ( + 'bq.table'='${BQ_PROJECT}.${BQ_DATASET}.${HIVE_TEST_TABLE}' +); + +DROP TABLE IF EXISTS ${HIVE_OUTPUT_TABLE}; +CREATE EXTERNAL TABLE ${HIVE_OUTPUT_TABLE} ( + number INT, + text STRING) +STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler' +TBLPROPERTIES ( + 'bq.table'='${BQ_PROJECT}.${BQ_DATASET}.${HIVE_OUTPUT_TABLE}' +); \ No newline at end of file diff --git a/hive-bigquery-connector-common/src/test/resources/acceptance/create_write_read_drop_managed_table.sql b/hive-bigquery-connector-common/src/test/resources/acceptance/create_write_read_drop_managed_table.sql index 27a2eefa..e1601799 100644 --- a/hive-bigquery-connector-common/src/test/resources/acceptance/create_write_read_drop_managed_table.sql +++ b/hive-bigquery-connector-common/src/test/resources/acceptance/create_write_read_drop_managed_table.sql @@ -5,7 +5,7 @@ CREATE TABLE ${HIVE_TEST_TABLE} ( name STRING) STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler' TBLPROPERTIES ( - 'bq.table'='${BQ_PROJECT}.${BQ_DATASET}.${BQ_TABLE}' + 'bq.table'='${BQ_PROJECT}.${BQ_DATASET}.${HIVE_TEST_TABLE}' ); -- Write data to Hive BQ table. diff --git a/hive-bigquery-connector-common/src/test/resources/acceptance/read_write.pig b/hive-bigquery-connector-common/src/test/resources/acceptance/read_write.pig new file mode 100644 index 00000000..1a772d1b --- /dev/null +++ b/hive-bigquery-connector-common/src/test/resources/acceptance/read_write.pig @@ -0,0 +1,2 @@ +data = LOAD '${HIVE_TEST_TABLE}' USING org.apache.hive.hcatalog.pig.HCatLoader(); +STORE data INTO '${HIVE_OUTPUT_TABLE}' USING org.apache.hive.hcatalog.pig.HCatStorer(); \ No newline at end of file From b17526bebe709e7074bd3d64ae40a3919684480a Mon Sep 17 00:00:00 2001 From: Julien Phalip Date: Fri, 26 Apr 2024 17:32:40 -0700 Subject: [PATCH 2/4] More fixes for Pig and HCatalog --- .../connector/BigQueryStorageHandlerBase.java | 5 +- .../connector/config/HiveBigQueryConfig.java | 1 + .../utils/hcatalog/HCatalogUtils.java | 2 + .../connector/utils/hive/HiveUtils.java | 30 ++++++--- .../hive/bigquery/connector/TestUtils.java | 10 +-- .../DataprocAcceptanceTestBase.java | 62 +++++++++++-------- .../connector/utils/JobUtilsTest.java | 7 +-- .../acceptance/create_write_managed_table.sql | 12 ++++ .../create_write_read_drop_managed_table.sql | 22 ------- 9 files changed, 81 insertions(+), 70 deletions(-) create mode 100644 hive-bigquery-connector-common/src/test/resources/acceptance/create_write_managed_table.sql delete mode 100644 hive-bigquery-connector-common/src/test/resources/acceptance/create_write_read_drop_managed_table.sql diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandlerBase.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandlerBase.java index 05bc8e06..db41f275 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandlerBase.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandlerBase.java @@ -108,7 +108,7 @@ public DecomposedPredicate decomposePredicate( @Override public void setConf(Configuration configuration) { - this.conf = configuration; + conf = configuration; String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).toLowerCase(); if (engine.equals("tez")) { // Tez does not use OutputCommitter. So we set up a failure hook to @@ -214,7 +214,8 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map 0; diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/hive/HiveUtils.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/hive/HiveUtils.java index 57f34107..aa0f0dac 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/hive/HiveUtils.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/hive/HiveUtils.java @@ -15,8 +15,11 @@ */ package com.google.cloud.hive.bigquery.connector.utils.hive; +import com.google.cloud.hive.bigquery.connector.config.HiveBigQueryConfig; +import com.google.cloud.hive.bigquery.connector.utils.hcatalog.HCatalogUtils; import java.util.Map; import java.util.Objects; +import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -60,16 +63,29 @@ public static String[] getReadColumnNames(Configuration conf) { /** Returns the query's unique id. */ public static String getQueryId(Configuration conf) { - String hiveQueryId = conf.get(ConfVars.HIVEQUERYID.varname); - if (hiveQueryId != null) { - // In this case, the user is running a plain Hive query directly from Hive itself. - return "hive-query-id-" + hiveQueryId; + if (!conf.get(HiveBigQueryConfig.QUERY_ID, "").isEmpty()) { + return conf.get(HiveBigQueryConfig.QUERY_ID); } - if (conf.get("mapreduce.workflow.id") != null) { - // Map reduce job, possibly from Pig + if (!conf.get(ConfVars.HIVEQUERYID.varname, "").isEmpty()) { + // In this case, the user is running a plain Hive query directly from Hive itself. + return "hive-query-id-" + conf.get(ConfVars.HIVEQUERYID.varname); + } else if (!conf.get("pig.script.id", "").isEmpty() + && !conf.get("pig.job.submitted.timestamp", "").isEmpty()) { + // The user is running a Hive query from Pig. Use the job's timestamp as a pig script might + // run multiple jobs. + return String.format( + "pig-%s-%s", conf.get("pig.script.id"), conf.get("pig.job.submitted.timestamp")); + } else if (!conf.get(HCatalogUtils.HCAT_OUTPUT_ID_HASH, "").isEmpty()) { + // Possibly from Pig in Tez mode in some environments (e.g. Dataproc) + return String.format("hcat-output-%s", conf.get(HCatalogUtils.HCAT_OUTPUT_ID_HASH)); + } else if (!conf.get("mapreduce.workflow.id", "").isEmpty()) { + // Map reduce job, possibly from Pig in MR mode in some environments (e.g. Dataproc) return String.format("mapreduce-%s", conf.get("mapreduce.workflow.id")); } - throw new RuntimeException("No query id found in Hadoop conf"); + // Fall back: generate our own ID + String queryId = String.format("custom-query-id-%s", UUID.randomUUID()); + conf.set(HiveBigQueryConfig.QUERY_ID, queryId); + return queryId; } public static String getDbTableName(Table table) { diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/TestUtils.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/TestUtils.java index 942fdb25..7db9c7cb 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/TestUtils.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/TestUtils.java @@ -158,12 +158,6 @@ public class TestUtils { public static String HIVE_INGESTION_TIME_PARTITIONED_PROPS = "'bq.time.partition.type'='DAY'"; - public static Configuration getMockHadoopConf() { - Configuration conf = new Configuration(); - conf.set("hive.query.id", "query123"); - return conf; - } - /** Return Hive config values passed from system properties */ public static Map getHiveConfSystemOverrides() { Map overrides = new HashMap<>(); @@ -178,7 +172,7 @@ public static Map getHiveConfSystemOverrides() { } private static com.google.auth.Credentials getCredentials() { - Configuration conf = getMockHadoopConf(); + Configuration conf = new Configuration(); Map hiveConfSystemOverrides = getHiveConfSystemOverrides(); for (String key : hiveConfSystemOverrides.keySet()) { conf.set(key, hiveConfSystemOverrides.get(key)); @@ -191,7 +185,7 @@ private static com.google.auth.Credentials getCredentials() { } public static BigQueryClient getBigqueryClient() { - Configuration conf = getMockHadoopConf(); + Configuration conf = new Configuration(); Map hiveConfSystemOverrides = getHiveConfSystemOverrides(); for (String key : hiveConfSystemOverrides.keySet()) { conf.set(key, hiveConfSystemOverrides.get(key)); diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/acceptance/DataprocAcceptanceTestBase.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/acceptance/DataprocAcceptanceTestBase.java index c870e791..7d086a8b 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/acceptance/DataprocAcceptanceTestBase.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/acceptance/DataprocAcceptanceTestBase.java @@ -201,27 +201,25 @@ private static Cluster createClusterSpec( .build(); } - private Job createAndRunHiveJob(String testName, String queryFile, String outputDirUri) - throws Exception { - Job job = createHiveJob(testName, queryFile, outputDirUri); + private Job createAndRunHiveJob(String testName, String queryFile) throws Exception { + Job job = createHiveJob(testName, queryFile); System.out.print("Running hive job:\n" + job); return runAndWait(testName, job); } - private Job createAndRunPigJob(String testName, String queryFile, String outputDirUri) - throws Exception { - Job job = createPigJob(testName, queryFile, outputDirUri); + private Job createAndRunPigJob(String testName, String queryFile) throws Exception { + Job job = createPigJob(testName, queryFile); System.out.print("Running pig job:\n" + job); return runAndWait(testName, job); } - private Map getTestScriptVariables(String testName, String outputTableDirUri) { + private Map getTestScriptVariables(String testName) { return ImmutableMap.builder() .put("BQ_PROJECT", context.bqProject) .put("BQ_DATASET", context.bqDataset) .put("HIVE_TEST_TABLE", testName.replaceAll("-", "_") + "_table") .put("HIVE_OUTPUT_TABLE", testName.replaceAll("-", "_") + "_output") - .put("HIVE_OUTPUT_DIR_URI", outputTableDirUri) + .put("HIVE_OUTPUT_DIR_URI", context.getOutputDirUri(testName)) .build(); } @@ -231,26 +229,25 @@ private String uploadQueryFile(String testName, String queryFile) throws Excepti return queryFileUri; } - private Job createHiveJob(String testName, String queryFile, String outputTableDirUri) - throws Exception { + private Job createHiveJob(String testName, String queryFile) throws Exception { String queryFileUri = uploadQueryFile(testName, queryFile); HiveJob.Builder hiveJobBuilder = HiveJob.newBuilder() .setQueryFileUri(queryFileUri) - .putAllScriptVariables(getTestScriptVariables(testName, outputTableDirUri)); + .putAllScriptVariables(getTestScriptVariables(testName)); return Job.newBuilder() .setPlacement(JobPlacement.newBuilder().setClusterName(context.clusterId)) .setHiveJob(hiveJobBuilder) .build(); } - private Job createPigJob(String testName, String queryFile, String outputTableDirUri) - throws Exception { + private Job createPigJob(String testName, String queryFile) throws Exception { String queryFileUri = uploadQueryFile(testName, queryFile); PigJob.Builder pigJobBuilder = PigJob.newBuilder() + .addJarFileUris("file:///usr/lib/hive/lib/datanucleus-core-5.2.2.jar") .setQueryFileUri(queryFileUri) - .putAllScriptVariables(getTestScriptVariables(testName, outputTableDirUri)); + .putAllScriptVariables(getTestScriptVariables(testName)); return Job.newBuilder() .setPlacement(JobPlacement.newBuilder().setClusterName(context.clusterId)) .setPigJob(pigJobBuilder) @@ -309,28 +306,36 @@ void verifyGCSJobOutput(String outputDirUri, String expectedOutput) throws Excep } @Test - public void testManagedTableCreateWriteReadDrop() throws Exception { + public void testWriteManagedTable() throws Exception { String testName = "test-hivebq-managed"; - String outputDirUri = context.getOutputDirUri(testName); - Job job = - createAndRunHiveJob(testName, "create_write_read_drop_managed_table.sql", outputDirUri); + String table = testName.replaceAll("-", "_") + "_table"; + Job job = createAndRunHiveJob(testName, "create_write_managed_table.sql"); verifyJobSucceeded(job); - verifyGCSJobOutput(outputDirUri, "345,world"); + // Wait a bit to give a chance for the committed writes to become readable + Thread.sleep(60000); // 1 minute + // Read the data using the BQ SDK + TableResult result = + TestUtils.getBigqueryClient() + .query( + String.format("SELECT * FROM `%s.%s` ORDER BY number", context.bqDataset, table)); + // Verify we get the expected values + assertEquals(1, result.getTotalRows()); + List rows = Streams.stream(result.iterateAll()).collect(Collectors.toList()); + assertEquals(123L, rows.get(0).get(0).getLongValue()); + assertEquals("hello", rows.get(0).get(1).getStringValue()); } @Test - public void testExternalTable_CreateReadDrop() throws Exception { + public void testCreateReadDropExternalTable() throws Exception { String testName = "test-hivebq-external"; - String outputDirUri = context.getOutputDirUri(testName); - Job job = createAndRunHiveJob(testName, "create_read_drop_external_table.sql", outputDirUri); + Job job = createAndRunHiveJob(testName, "create_read_drop_external_table.sql"); verifyJobSucceeded(job); - verifyGCSJobOutput(outputDirUri, "king,1191"); + verifyGCSJobOutput(context.getOutputDirUri(testName), "king,1191"); } @Test - public void testPig() throws Exception { + public void testPigMRMode() throws Exception { String testName = "test-pig"; - String outputDirUri = context.getOutputDirUri(testName); String sourceTable = testName.replaceAll("-", "_") + "_table"; String destTable = testName.replaceAll("-", "_") + "_output"; // Create the BQ tables @@ -350,10 +355,10 @@ public void testPig() throws Exception { "INSERT `%s.%s` VALUES (123, 'hello'), (789, 'abcd')", context.bqDataset, sourceTable)); // Create the external test Hive tables - Job hiveJob = createAndRunHiveJob(testName, "create_two_external_tables.sql", outputDirUri); + Job hiveJob = createAndRunHiveJob(testName, "create_two_external_tables.sql"); verifyJobSucceeded(hiveJob); // Run a pig Job - Job pigJob = createAndRunPigJob(testName, "read_write.pig", outputDirUri); + Job pigJob = createAndRunPigJob(testName, "read_write.pig"); verifyJobSucceeded(pigJob); // Wait a bit to give a chance for the committed writes to become readable Thread.sleep(60000); // 1 minute @@ -371,4 +376,7 @@ public void testPig() throws Exception { assertEquals(789L, rows.get(1).get(0).getLongValue()); assertEquals("abcd", rows.get(1).get(1).getStringValue()); } + + // TODO: Test Pig in Tez mode. Need a way to specify Tez as the Pig execution engine when + // submitting the Dataproc Pig job. } diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/JobUtilsTest.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/JobUtilsTest.java index d823c0e1..fcedc4c4 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/JobUtilsTest.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/JobUtilsTest.java @@ -21,7 +21,6 @@ import com.google.cloud.bigquery.TableId; import com.google.cloud.hive.bigquery.connector.JobDetails; -import com.google.cloud.hive.bigquery.connector.TestUtils; import com.google.cloud.hive.bigquery.connector.output.WriterRegistry; import java.util.Properties; import org.apache.hadoop.conf.Configuration; @@ -47,7 +46,7 @@ public void testTableIdPrefix() { @Test public void testGetWorkDir() { - Configuration conf = TestUtils.getMockHadoopConf(); + Configuration conf = new Configuration(); conf.set("hadoop.tmp.dir", "/tmp"); Path path = JobUtils.getQueryWorkDir(conf); assertEquals("/tmp/hive-bq-hive-query-id-query123", path.toString()); @@ -58,7 +57,7 @@ public void testGetWorkDir() { @Test public void testGetJobDetailsFilePath() { - Configuration conf = TestUtils.getMockHadoopConf(); + Configuration conf = new Configuration(); conf.set("hadoop.tmp.dir", "/tmp"); String hmsDbTable = "default.mytable"; Path jobDetailsFilePath = JobUtils.getJobDetailsFilePath(conf, hmsDbTable); @@ -69,7 +68,7 @@ public void testGetJobDetailsFilePath() { @Test public void testGetTaskWriterOutputFile() { - Configuration conf = TestUtils.getMockHadoopConf(); + Configuration conf = new Configuration(); conf.set("hadoop.tmp.dir", "/hadoop-tmp/"); JobDetails jobDetails = new JobDetails(); jobDetails.setTableProperties(new Properties()); diff --git a/hive-bigquery-connector-common/src/test/resources/acceptance/create_write_managed_table.sql b/hive-bigquery-connector-common/src/test/resources/acceptance/create_write_managed_table.sql new file mode 100644 index 00000000..c06e2b31 --- /dev/null +++ b/hive-bigquery-connector-common/src/test/resources/acceptance/create_write_managed_table.sql @@ -0,0 +1,12 @@ +-- Create a managed Hive BQ table. +DROP TABLE IF EXISTS ${HIVE_TEST_TABLE}; +CREATE TABLE ${HIVE_TEST_TABLE} ( + number INT, + text STRING) +STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler' +TBLPROPERTIES ( + 'bq.table'='${BQ_PROJECT}.${BQ_DATASET}.${HIVE_TEST_TABLE}' +); + +-- Write data to Hive BQ table. +INSERT INTO ${HIVE_TEST_TABLE} VALUES(123, 'hello'); \ No newline at end of file diff --git a/hive-bigquery-connector-common/src/test/resources/acceptance/create_write_read_drop_managed_table.sql b/hive-bigquery-connector-common/src/test/resources/acceptance/create_write_read_drop_managed_table.sql deleted file mode 100644 index e1601799..00000000 --- a/hive-bigquery-connector-common/src/test/resources/acceptance/create_write_read_drop_managed_table.sql +++ /dev/null @@ -1,22 +0,0 @@ --- Create a managed Hive BQ table. -DROP TABLE IF EXISTS ${HIVE_TEST_TABLE}; -CREATE TABLE ${HIVE_TEST_TABLE} ( - id INT, - name STRING) -STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler' -TBLPROPERTIES ( - 'bq.table'='${BQ_PROJECT}.${BQ_DATASET}.${HIVE_TEST_TABLE}' -); - --- Write data to Hive BQ table. -INSERT INTO ${HIVE_TEST_TABLE} VALUES(123, 'hello'); -INSERT INTO ${HIVE_TEST_TABLE} VALUES(345, 'world'); - --- Read Hive BQ table, write the result into an output table backed by GCS. -CREATE TABLE ${HIVE_OUTPUT_TABLE} - ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' - LOCATION '${HIVE_OUTPUT_DIR_URI}' -AS SELECT * from ${HIVE_TEST_TABLE} WHERE id = 345; - --- Drop the managed table -DROP TABLE ${HIVE_TEST_TABLE}; From a7ad62463996ceab05630d23eacf2b670ca7f215 Mon Sep 17 00:00:00 2001 From: Julien Phalip Date: Fri, 26 Apr 2024 18:23:27 -0700 Subject: [PATCH 3/4] Fix unit tests --- .../bigquery/connector/utils/HiveUtilsTest.java | 17 +++++++++++++++-- .../bigquery/connector/utils/JobUtilsTest.java | 17 ++++++++--------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/HiveUtilsTest.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/HiveUtilsTest.java index 043d76ce..65dfda87 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/HiveUtilsTest.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/HiveUtilsTest.java @@ -25,16 +25,29 @@ public class HiveUtilsTest { @Test public void testHiveQueryId() { Configuration conf = new Configuration(); - Throwable exception = assertThrows(RuntimeException.class, () -> HiveUtils.getQueryId(conf)); - assertEquals("No query id found in Hadoop conf", exception.getMessage()); conf.set("hive.query.id", "abcd"); assertEquals("hive-query-id-abcd", HiveUtils.getQueryId(conf)); } @Test public void testPigQueryId() { + Configuration conf = new Configuration(); + conf.set("pig.script.id", "abcd"); + conf.set("pig.job.submitted.timestamp", "999"); + assertEquals("pig-abcd-999", HiveUtils.getQueryId(conf)); + } + + @Test + public void testMapreduceQueryId() { Configuration conf = new Configuration(); conf.set("mapreduce.workflow.id", "abcd"); assertEquals("mapreduce-abcd", HiveUtils.getQueryId(conf)); } + + @Test + public void testHCatQueryId() { + Configuration conf = new Configuration(); + conf.set("mapreduce.lib.hcatoutput.id", "abcd"); + assertEquals("hcat-output-abcd", HiveUtils.getQueryId(conf)); + } } diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/JobUtilsTest.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/JobUtilsTest.java index fcedc4c4..8c75c139 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/JobUtilsTest.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/utils/JobUtilsTest.java @@ -15,13 +15,13 @@ */ package com.google.cloud.hive.bigquery.connector.utils; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.text.MatchesPattern.matchesPattern; +import static com.google.common.truth.Truth.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import com.google.cloud.bigquery.TableId; import com.google.cloud.hive.bigquery.connector.JobDetails; import com.google.cloud.hive.bigquery.connector.output.WriterRegistry; +import com.google.common.truth.Truth; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -49,10 +49,10 @@ public void testGetWorkDir() { Configuration conf = new Configuration(); conf.set("hadoop.tmp.dir", "/tmp"); Path path = JobUtils.getQueryWorkDir(conf); - assertEquals("/tmp/hive-bq-hive-query-id-query123", path.toString()); + assertThat(path.toString()).matches("/tmp/hive-bq-custom-query-id-.*"); conf.set("bq.work.dir.parent.path", "/my/workdir"); path = JobUtils.getQueryWorkDir(conf); - assertEquals("/my/workdir/hive-bq-hive-query-id-query123", path.toString()); + assertThat(path.toString()).matches("/my/workdir/hive-bq-custom-query-id-.*"); } @Test @@ -61,9 +61,8 @@ public void testGetJobDetailsFilePath() { conf.set("hadoop.tmp.dir", "/tmp"); String hmsDbTable = "default.mytable"; Path jobDetailsFilePath = JobUtils.getJobDetailsFilePath(conf, hmsDbTable); - assertEquals( - "/tmp/hive-bq-hive-query-id-query123/default.mytable/job-details.json", - jobDetailsFilePath.toString()); + Truth.assertThat(jobDetailsFilePath.toString()) + .matches("/tmp/hive-bq-custom-query-id-.*/default\\.mytable/job-details\\.json"); } @Test @@ -79,7 +78,7 @@ public void testGetTaskWriterOutputFile() { String writerId = WriterRegistry.getWriterId(); Path path = JobUtils.getTaskWriterOutputFile(conf, jobDetails, taskAttemptID, writerId, "jpeg"); String pattern = - "^/hadoop-tmp/hive-bq-hive-query-id-query123/default.mytable/myproject_mydataset_mytable_abcd1234_w\\d+\\.jpeg"; - assertThat(path.toString(), matchesPattern(pattern)); + "^/hadoop-tmp/hive-bq-custom-query-id-.*/default.mytable/myproject_mydataset_mytable_abcd1234_w\\d+\\.jpeg"; + assertThat(path.toString()).matches(pattern); } } From b31ed76532e6869230690e61dd105b2e6201fa24 Mon Sep 17 00:00:00 2001 From: Julien Phalip Date: Mon, 29 Apr 2024 09:18:38 -0700 Subject: [PATCH 4/4] Small test improvement --- .../com/google/cloud/hive/bigquery/connector/TestUtils.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/TestUtils.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/TestUtils.java index 7db9c7cb..5589d083 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/TestUtils.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/TestUtils.java @@ -253,8 +253,10 @@ public static void createMaterializedView(String dataset, String table, String v } public static void dropBqTableIfExists(String dataset, String table) { - TableId tableId = TableId.of(dataset, table); - getBigqueryClient().deleteTable(tableId); + TableId tableId = TableId.of(getProject(), dataset, table); + if (getBigqueryClient().tableExists(tableId)) { + getBigqueryClient().deleteTable(tableId); + } } public static boolean bQTableExists(String dataset, String tableName) {