Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for Pig and HCatalog #118

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,14 @@ public void run(HookContext hookContext) throws Exception {
.equals("com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler")) {
String tableName = HiveUtils.getDbTableName(entity.getTable());
BigQueryMetaHook metahook = new BigQueryMetaHook(hookContext.getConf());
metahook.commitInsertTable(tableName);
try {
metahook.commitInsertTable(tableName);
} catch (Exception e) {
// Clean things up
OutputCommitterUtils.abortJob(hookContext.getConf(), tableName);
// Re-throw the original exception
throw new RuntimeException(e);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,12 @@
public class OutputCommitterUtils {

public static void commitJob(Configuration conf, JobDetails jobDetails) throws IOException {
try {
if (jobDetails.getWriteMethod().equals(HiveBigQueryConfig.WRITE_METHOD_DIRECT)) {
DirectOutputCommitter.commitJob(conf, jobDetails);
} else {
IndirectOutputCommitter.commitJob(conf, jobDetails);
}
} finally {
jobDetails.cleanUp(conf);
if (jobDetails.getWriteMethod().equals(HiveBigQueryConfig.WRITE_METHOD_DIRECT)) {
yigress marked this conversation as resolved.
Show resolved Hide resolved
DirectOutputCommitter.commitJob(conf, jobDetails);
} else {
IndirectOutputCommitter.commitJob(conf, jobDetails);
}
jobDetails.cleanUp(conf);
}

public static void commitJob(Configuration conf) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,9 @@ public static String getQueryId(Configuration conf) {
// In this case, the user is running a plain Hive query directly from Hive itself.
return "hive-query-id-" + hiveQueryId;
}
if (conf.get("pig.script.id") != null) {
// The user is running a Hive query from Pig. Use the job's timestamp as a pig script might
// run multiple jobs.
return String.format(
"pig-%s-%s", conf.get("pig.script.id"), conf.get("pig.job.submitted.timestamp"));
if (conf.get("mapreduce.workflow.id") != null) {
jphalip marked this conversation as resolved.
Show resolved Hide resolved
// Map reduce job, possibly from Pig
return String.format("mapreduce-%s", conf.get("mapreduce.workflow.id"));
}
throw new RuntimeException("No query id found in Hadoop conf");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public class AcceptanceTestContext {
final String connectorInitActionUri;
final String bqProject;
final String bqDataset;
final String bqTable;

public AcceptanceTestContext(
String testId,
Expand All @@ -34,8 +33,7 @@ public AcceptanceTestContext(
String connectorJarUri,
String connectorInitActionUri,
String bqProject,
String bqDataset,
String bqTable) {
String bqDataset) {
this.testId = testId;
this.dataprocImageVersion = dataprocImageVersion;
this.clusterId = clusterId;
Expand All @@ -44,7 +42,6 @@ public AcceptanceTestContext(
this.connectorInitActionUri = connectorInitActionUri;
this.bqProject = bqProject;
this.bqDataset = bqDataset;
this.bqTable = bqTable;
}

public String getFileUri(String testName, String filename) {
Expand All @@ -57,16 +54,15 @@ public String getOutputDirUri(String testName) {

@Override
public String toString() {
return new StringBuilder()
.append("testId: " + testId + "\n")
.append("dataprocImageVersion: " + dataprocImageVersion + "\n")
.append("clusterId: " + clusterId + "\n")
.append("testBaseGcsDir: " + testBaseGcsDir + "\n")
.append("connectorJarUri: " + connectorJarUri + "\n")
.append("connectorInitActionUri: " + connectorInitActionUri + "\n")
.append("projectId: " + bqProject + "\n")
.append("bqDataset: " + bqDataset + "\n")
.append("bqTable: " + bqTable + "\n")
.toString();
return String.join(
"\n",
"testId: " + testId,
"dataprocImageVersion: " + dataprocImageVersion,
"clusterId: " + clusterId,
"testBaseGcsDir: " + testBaseGcsDir,
"connectorJarUri: " + connectorJarUri,
"connectorInitActionUri: " + connectorInitActionUri,
"projectId: " + bqProject,
"bqDataset: " + bqDataset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@
import static com.google.cloud.hive.bigquery.connector.acceptance.AcceptanceTestUtils.uploadConnectorInitAction;
import static com.google.cloud.hive.bigquery.connector.acceptance.AcceptanceTestUtils.uploadConnectorJar;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.hive.bigquery.connector.TestUtils;
import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import com.google.common.collect.Streams;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.Test;
import test.hivebqcon.com.google.cloud.dataproc.v1.Cluster;
import test.hivebqcon.com.google.cloud.dataproc.v1.ClusterConfig;
Expand All @@ -53,6 +59,7 @@
import test.hivebqcon.com.google.cloud.dataproc.v1.JobPlacement;
import test.hivebqcon.com.google.cloud.dataproc.v1.JobStatus;
import test.hivebqcon.com.google.cloud.dataproc.v1.NodeInitializationAction;
import test.hivebqcon.com.google.cloud.dataproc.v1.PigJob;
import test.hivebqcon.com.google.cloud.dataproc.v1.SoftwareConfig;

public abstract class DataprocAcceptanceTestBase {
Expand All @@ -70,8 +77,7 @@ protected static AcceptanceTestContext setup(String dataprocImageVersion) throws
String connectorJarUri = testBaseGcsDir + "/connector.jar";
String connectorInitActionUri = testBaseGcsDir + "/connectors.sh";
String bqProject = TestUtils.getProject();
String bqDataset = "hivebq_test_dataset_" + testId.replace("-", "_");
String bqTable = "hivebq_test_table_" + testId.replace("-", "_");
String bqDataset = "acceptance_dataset_" + testId.replace("-", "_");

uploadConnectorJar(CONNECTOR_JAR_DIRECTORY, CONNECTOR_JAR_PREFIX, connectorJarUri);

Expand All @@ -91,8 +97,7 @@ protected static AcceptanceTestContext setup(String dataprocImageVersion) throws
connectorJarUri,
connectorInitActionUri,
bqProject,
bqDataset,
bqTable);
bqDataset);
System.out.print(testContext);

return testContext;
Expand Down Expand Up @@ -196,47 +201,74 @@ private static Cluster createClusterSpec(
.build();
}

private Job createAndRunHiveJob(
String testName, String queryFile, String outputDirUri, Duration timeout) throws Exception {
private Job createAndRunHiveJob(String testName, String queryFile, String outputDirUri)
throws Exception {
Job job = createHiveJob(testName, queryFile, outputDirUri);
System.out.print("Running job:\n" + job);
return runAndWait(testName, job, timeout);
System.out.print("Running hive job:\n" + job);
return runAndWait(testName, job);
}

private Job createHiveJob(String testName, String queryFile, String outputTableDirUri)
private Job createAndRunPigJob(String testName, String queryFile, String outputDirUri)
throws Exception {
Job job = createPigJob(testName, queryFile, outputDirUri);
System.out.print("Running pig job:\n" + job);
return runAndWait(testName, job);
}

private Map<String, String> getTestScriptVariables(String testName, String outputTableDirUri) {
return ImmutableMap.<String, String>builder()
.put("BQ_PROJECT", context.bqProject)
.put("BQ_DATASET", context.bqDataset)
.put("HIVE_TEST_TABLE", testName.replaceAll("-", "_") + "_table")
.put("HIVE_OUTPUT_TABLE", testName.replaceAll("-", "_") + "_output")
.put("HIVE_OUTPUT_DIR_URI", outputTableDirUri)
.build();
}

private String uploadQueryFile(String testName, String queryFile) throws Exception {
String queryFileUri = context.getFileUri(testName, queryFile);
AcceptanceTestUtils.uploadResourceToGcs("/acceptance/" + queryFile, queryFileUri, "text/x-sql");
ImmutableMap<String, String> scriptVariables =
ImmutableMap.<String, String>builder()
.put("BQ_PROJECT", context.bqProject)
.put("BQ_DATASET", context.bqDataset)
.put("BQ_TABLE", context.bqTable)
.put("HIVE_TEST_TABLE", testName.replaceAll("-", "_") + "_table")
.put("HIVE_OUTPUT_TABLE", testName.replaceAll("-", "_") + "_output")
.put("HIVE_OUTPUT_DIR_URI", outputTableDirUri)
.build();
return queryFileUri;
}

private Job createHiveJob(String testName, String queryFile, String outputTableDirUri)
throws Exception {
String queryFileUri = uploadQueryFile(testName, queryFile);
HiveJob.Builder hiveJobBuilder =
HiveJob.newBuilder().setQueryFileUri(queryFileUri).putAllScriptVariables(scriptVariables);
HiveJob.newBuilder()
.setQueryFileUri(queryFileUri)
.putAllScriptVariables(getTestScriptVariables(testName, outputTableDirUri));
return Job.newBuilder()
.setPlacement(JobPlacement.newBuilder().setClusterName(context.clusterId))
.setHiveJob(hiveJobBuilder)
.build();
}

private Job runAndWait(String testName, Job job, Duration timeout) throws Exception {
private Job createPigJob(String testName, String queryFile, String outputTableDirUri)
throws Exception {
String queryFileUri = uploadQueryFile(testName, queryFile);
PigJob.Builder pigJobBuilder =
PigJob.newBuilder()
.setQueryFileUri(queryFileUri)
.putAllScriptVariables(getTestScriptVariables(testName, outputTableDirUri));
return Job.newBuilder()
.setPlacement(JobPlacement.newBuilder().setClusterName(context.clusterId))
.setPigJob(pigJobBuilder)
.build();
}

private Job runAndWait(String testName, Job job) throws Exception {
try (JobControllerClient jobControllerClient =
JobControllerClient.create(
JobControllerSettings.newBuilder().setEndpoint(DATAPROC_ENDPOINT).build())) {
Job request = jobControllerClient.submitJob(TestUtils.getProject(), REGION, job);
String jobId = request.getReference().getJobId();
System.err.println(String.format("%s job ID: %s", testName, jobId));
System.err.printf("%s job ID: %s%n", testName, jobId);
CompletableFuture<Job> finishedJobFuture =
CompletableFuture.supplyAsync(
() ->
waitForJobCompletion(jobControllerClient, TestUtils.getProject(), REGION, jobId));
Job jobInfo = finishedJobFuture.get(timeout.getSeconds(), TimeUnit.SECONDS);
return jobInfo;
return finishedJobFuture.get(ACCEPTANCE_TEST_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -271,40 +303,72 @@ void verifyJobSucceeded(Job job) throws Exception {
}
}

void verifyJobOutput(String outputDirUri, String expectedOutput) throws Exception {
void verifyGCSJobOutput(String outputDirUri, String expectedOutput) throws Exception {
String output = readGcsFile(outputDirUri, "_0");
assertThat(output.trim()).isEqualTo(expectedOutput);
}

@Test
public void testHiveBq_managedTable_createWriteReadDrop_success() throws Exception {
public void testManagedTableCreateWriteReadDrop() throws Exception {
String testName = "test-hivebq-managed";
String outputDirUri = context.getOutputDirUri(testName);

Job result =
createAndRunHiveJob(
testName,
"create_write_read_drop_managed_table.sql",
outputDirUri,
Duration.ofSeconds(ACCEPTANCE_TEST_TIMEOUT_IN_SECONDS));

verifyJobSucceeded(result);
verifyJobOutput(outputDirUri, "345,world");
Job job =
createAndRunHiveJob(testName, "create_write_read_drop_managed_table.sql", outputDirUri);
verifyJobSucceeded(job);
verifyGCSJobOutput(outputDirUri, "345,world");
}

@Test
public void testHiveBq_externalTable_createReadDrop_success() throws Exception {
public void testExternalTable_CreateReadDrop() throws Exception {
String testName = "test-hivebq-external";
String outputDirUri = context.getOutputDirUri(testName);
Job job = createAndRunHiveJob(testName, "create_read_drop_external_table.sql", outputDirUri);
verifyJobSucceeded(job);
verifyGCSJobOutput(outputDirUri, "king,1191");
}

Job result =
createAndRunHiveJob(
testName,
"create_read_drop_external_table.sql",
outputDirUri,
Duration.ofSeconds(ACCEPTANCE_TEST_TIMEOUT_IN_SECONDS));

verifyJobSucceeded(result);
verifyJobOutput(outputDirUri, "king,1191");
@Test
public void testPig() throws Exception {
String testName = "test-pig";
String outputDirUri = context.getOutputDirUri(testName);
String sourceTable = testName.replaceAll("-", "_") + "_table";
String destTable = testName.replaceAll("-", "_") + "_output";
// Create the BQ tables
TestUtils.getBigqueryClient()
.query(
String.format(
"CREATE OR REPLACE TABLE `%s.%s` (%s)",
context.bqDataset, sourceTable, TestUtils.BIGQUERY_TEST_TABLE_DDL));
TestUtils.getBigqueryClient()
.query(
String.format(
"CREATE OR REPLACE TABLE `%s.%s` (%s)",
context.bqDataset, destTable, TestUtils.BIGQUERY_TEST_TABLE_DDL));
TestUtils.getBigqueryClient()
.query(
String.format(
"INSERT `%s.%s` VALUES (123, 'hello'), (789, 'abcd')",
context.bqDataset, sourceTable));
// Create the external test Hive tables
Job hiveJob = createAndRunHiveJob(testName, "create_two_external_tables.sql", outputDirUri);
verifyJobSucceeded(hiveJob);
// Run a pig Job
Job pigJob = createAndRunPigJob(testName, "read_write.pig", outputDirUri);
verifyJobSucceeded(pigJob);
// Wait a bit to give a chance for the committed writes to become readable
Thread.sleep(60000); // 1 minute
// Read the data using the BQ SDK
TableResult result =
TestUtils.getBigqueryClient()
.query(
String.format(
"SELECT * FROM `%s.%s` ORDER BY number", context.bqDataset, destTable));
// Verify we get the expected values
assertEquals(2, result.getTotalRows());
List<FieldValueList> rows = Streams.stream(result.iterateAll()).collect(Collectors.toList());
assertEquals(123L, rows.get(0).get(0).getLongValue());
assertEquals("hello", rows.get(0).get(1).getStringValue());
assertEquals(789L, rows.get(1).get(0).getLongValue());
assertEquals("abcd", rows.get(1).get(1).getStringValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ public void initHive(String engine, String readDataFormat, String tempGcsPath) {
System.getProperties()
.setProperty(
"fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"); // GCS Connector
System.getProperties().setProperty("datanucleus.autoStartMechanismMode", "ignored");

// This is needed to avoid an odd exception when running the tests with Tez and Hadoop 3.
// Similar issue to what's described in https://issues.apache.org/jira/browse/HIVE-24734
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ public void testHiveQueryId() {
@Test
public void testPigQueryId() {
Configuration conf = new Configuration();
conf.set("pig.script.id", "abcd");
conf.set("pig.job.submitted.timestamp", "123456789");
assertEquals("pig-abcd-123456789", HiveUtils.getQueryId(conf));
conf.set("mapreduce.workflow.id", "abcd");
assertEquals("mapreduce-abcd", HiveUtils.getQueryId(conf));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
DROP TABLE IF EXISTS ${HIVE_TEST_TABLE};
CREATE EXTERNAL TABLE ${HIVE_TEST_TABLE} (
number INT,
text STRING)
STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler'
TBLPROPERTIES (
'bq.table'='${BQ_PROJECT}.${BQ_DATASET}.${HIVE_TEST_TABLE}'
);

DROP TABLE IF EXISTS ${HIVE_OUTPUT_TABLE};
CREATE EXTERNAL TABLE ${HIVE_OUTPUT_TABLE} (
number INT,
text STRING)
STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler'
TBLPROPERTIES (
'bq.table'='${BQ_PROJECT}.${BQ_DATASET}.${HIVE_OUTPUT_TABLE}'
);
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ CREATE TABLE ${HIVE_TEST_TABLE} (
name STRING)
STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler'
TBLPROPERTIES (
'bq.table'='${BQ_PROJECT}.${BQ_DATASET}.${BQ_TABLE}'
'bq.table'='${BQ_PROJECT}.${BQ_DATASET}.${HIVE_TEST_TABLE}'
);

-- Write data to Hive BQ table.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
data = LOAD '${HIVE_TEST_TABLE}' USING org.apache.hive.hcatalog.pig.HCatLoader();
STORE data INTO '${HIVE_OUTPUT_TABLE}' USING org.apache.hive.hcatalog.pig.HCatStorer();
Loading