diff --git a/README.md b/README.md index 61e2c24c..54a3c3cc 100644 --- a/README.md +++ b/README.md @@ -278,17 +278,18 @@ You can use the following properties in the `TBLPROPERTIES` clause when you crea You can set the following Hive/Hadoop configuration properties in your environment: -| Property | Default value | Description | -|---------------------------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `bq.read.data.format` | `arrow` | Data format used for reads from BigQuery. Possible values: `arrow`, `avro`. | -| `bq.temp.gcs.path` | | GCS location for storing temporary Avro files when using the `indirect` write method | -| `bq.write.method` | `direct` | Indicates how to write data to BigQuery. Possible values: `direct` (to directly write to the BigQuery storage API), `indirect` (to stage temporary Avro files to GCS before loading into BigQuery). | -| `bq.work.dir.parent.path` | `${hadoop.tmp.dir}` | Parent path on HDFS where each job creates its temporary work directory | -| `bq.work.dir.name.prefix` | `hive-bq-` | Prefix used for naming the jobs' temporary directories. | -| `materializationProject` | | Project used to temporarily materialize data when reading views. Defaults to the same project as the read view. | -| `materializationDataset` | | Dataset used to temporarily materialize data when reading views. Defaults to the same dataset as the read view. | -| `maxParallelism` | | Maximum initial number of read streams | -| `viewsEnabled` | `false` | Set it to `true` to enable reading views. | +| Property | Default value | Description | +|-------------------------------------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `bq.read.data.format` | `arrow` | Data format used for reads from BigQuery. Possible values: `arrow`, `avro`. | +| `bq.temp.gcs.path` | | GCS location for storing temporary Avro files when using the `indirect` write method | +| `bq.write.method` | `direct` | Indicates how to write data to BigQuery. Possible values: `direct` (to directly write to the BigQuery storage API), `indirect` (to stage temporary Avro files to GCS before loading into BigQuery). | +| `bq.work.dir.parent.path` | `${hadoop.tmp.dir}` | Parent path on HDFS where each job creates its temporary work directory | +| `bq.work.dir.name.prefix` | `hive-bq-` | Prefix used for naming the jobs' temporary directories. | +| `bq.destination.table.kms.key.name` | | Cloud KMS encryption key used to protect the job's destination BigQuery table. Read more in the section on [customer-managed encryption keys](#customer-managed-encryption-keys) | +| `materializationProject` | | Project used to temporarily materialize data when reading views. Defaults to the same project as the read view. | +| `materializationDataset` | | Dataset used to temporarily materialize data when reading views. Defaults to the same dataset as the read view. | +| `maxParallelism` | | Maximum initial number of read streams | +| `viewsEnabled` | `false` | Set it to `true` to enable reading views. | ## Data Type Mapping @@ -713,6 +714,24 @@ There are multiple options to override the default behavior and to provide custo with the `bq.access.token` configuration property. You can generate an access token by running `gcloud auth application-default print-access-token`. +## Customer-managed encryption key (CMEK) + +You can provide a Cloud KMS key to be used to encrypt the destination table, for example when you +run a `CREATE TABLE` statement for a managed table, or when you insert data into a table that +doesn't exist yet. To do so, set the `bq.destination.table.kms.key.name` property with the +fully-qualified named of the desired Cloud KMS key in the form: + +``` +projects//locations//keyRings//cryptoKeys/ +``` + +The BigQuery service account associated with your project requires access to this encryption key. + +The table will be encrypted by the key only if it created by the connector. A pre-existing +unencrypted table won't be encrypted just by setting this option. + +For further information about using customer-managed encryption keys (CMEK) with BigQuery, see [here](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id). + ## Known issues and limitations * The `UPDATE`, `MERGE`, and `DELETE`, and `ALTER TABLE` statements are currently not supported. @@ -766,7 +785,8 @@ Enable the following APIs: ```sh gcloud services enable \ bigquerystorage.googleapis.com \ - bigqueryconnection.googleapis.com + bigqueryconnection.googleapis.com \ + cloudkms.googleapis.com ``` #### BigLake setup @@ -774,7 +794,7 @@ gcloud services enable \ Define environment variables: ```sh -export PROJECT=my-gcp-project +export PROJECT= export BIGLAKE_LOCATION=us export BIGLAKE_REGION=us-central1 export BIGLAKE_CONNECTION=hive-integration-tests @@ -807,6 +827,41 @@ export BIGLAKE_SA=$(bq show --connection --format json "${PROJECT}.${BIGLAKE_LOC gsutil iam ch serviceAccount:${BIGLAKE_SA}:objectViewer gs://${BIGLAKE_BUCKET} ``` +#### KMS setup + +Create a KMS keyring: + +```sh +gcloud kms keyrings create \ + integration_tests_keyring \ + --location us +``` + +```sh +gcloud kms keys create integration_tests_key \ + --keyring integration_tests_keyring \ + --location us \ + --purpose "encryption" +``` + +Obtain the BigQuery service account name: + +```sh +BQ_SERVICE_ACCOUNT=$(bq show --encryption_service_account --format json | jq -r ".ServiceAccountID") +``` + +Assign the Encrypter/Decrypter role to the BigQuery service account: + +```sh +gcloud kms keys add-iam-policy-binding \ + --project=${PROJECT} \ + --member serviceAccount:${BQ_SERVICE_ACCOUNT} \ + --role roles/cloudkms.cryptoKeyEncrypterDecrypter \ + --location=us \ + --keyring=integration_tests_keyring \ + integration_tests_key +``` + #### Running the tests You must use Java version 8, as it's the version that Hive itself uses. Make sure that `JAVA_HOME` points to the Java diff --git a/cloudbuild/cloudbuild.yaml b/cloudbuild/cloudbuild.yaml index 37385ade..dd6fd2c0 100644 --- a/cloudbuild/cloudbuild.yaml +++ b/cloudbuild/cloudbuild.yaml @@ -57,6 +57,7 @@ steps: args: ['/workspace/cloudbuild/presubmit.sh', 'integrationtest_hive1'] env: - 'CODECOV_TOKEN=${_CODECOV_TOKEN}' + - 'BIGQUERY_KMS_KEY_NAME=${_BIGQUERY_KMS_KEY_NAME}' # 7. Run integration tests for Hive 2 - name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit' @@ -66,6 +67,7 @@ steps: args: ['/workspace/cloudbuild/presubmit.sh', 'integrationtest_hive2'] env: - 'CODECOV_TOKEN=${_CODECOV_TOKEN}' + - 'BIGQUERY_KMS_KEY_NAME=${_BIGQUERY_KMS_KEY_NAME}' # 8. Run integration tests for Hive 3 - name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit' @@ -75,6 +77,7 @@ steps: args: ['/workspace/cloudbuild/presubmit.sh', 'integrationtest_hive3'] env: - 'CODECOV_TOKEN=${_CODECOV_TOKEN}' + - 'BIGQUERY_KMS_KEY_NAME=${_BIGQUERY_KMS_KEY_NAME}' # Tests should take under 120 mins timeout: 7200s diff --git a/hive-1-bigquery-connector/src/main/java/com/google/cloud/hive/bigquery/connector/output/PreInsertHook.java b/hive-1-bigquery-connector/src/main/java/com/google/cloud/hive/bigquery/connector/output/PreInsertHook.java index 64cd2e65..240098c1 100644 --- a/hive-1-bigquery-connector/src/main/java/com/google/cloud/hive/bigquery/connector/output/PreInsertHook.java +++ b/hive-1-bigquery-connector/src/main/java/com/google/cloud/hive/bigquery/connector/output/PreInsertHook.java @@ -62,7 +62,17 @@ public void run(HookContext hookContext) throws Exception { return; } - // Parse and analyze the semantics of the Hive query + // Parse and analyze the semantics of the Hive query. + // We have to do this because unfortunately the WriteEntity objects in hookContext.getOutputs() + // are systematically marked as being of type INSERT_OVERWRITE, regardless of whether it is + // an "INSERT OVERWRITE" query or a regular "INSERT" query. This is apparently caused by the + // fact that Hive 1.x.x treats all "non native" tables (i.e. by Hive 1.x.x's definition all + // tables that have a storage handler defined: + // https://github.com/apache/hive/blob/release-1.2.1/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java#L845) + // as INSERT_OVERWRITE: + // https://github.com/apache/hive/blob/release-1.2.1/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java#L12147 + // To get around this issue, we parse the query ourselves and try to determine the proper type + // for our purposes (insert or insert overwrite). QBParseInfo parseInfo; try { Configuration conf = hookContext.getConf(); @@ -70,9 +80,9 @@ public void run(HookContext hookContext) throws Exception { context.setCmd(hookContext.getQueryPlan().getQueryString()); ParseDriver parseDriver = new ParseDriver(); ASTNode tree = parseDriver.parse(hookContext.getQueryPlan().getQueryString(), context); - HiveConf hiveConf = new HiveConf(conf, HiveConf.class); + HiveConf hiveConf = new HiveConf(conf, this.getClass()); SemanticAnalyzer analyzer = new SemanticAnalyzer(hiveConf); - if (tree.getChildren().size() == 0 || tree.getChild(0).getType() != HiveParser.TOK_QUERY) { + if (tree.getChildren().isEmpty() || tree.getChild(0).getType() != HiveParser.TOK_QUERY) { return; } analyzer.analyze((ASTNode) tree.getChild(0), context); diff --git a/hive-3-bigquery-connector/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandler.java b/hive-3-bigquery-connector/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandler.java index dd37c43b..7b97cbed 100644 --- a/hive-3-bigquery-connector/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandler.java +++ b/hive-3-bigquery-connector/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandler.java @@ -54,8 +54,8 @@ public Map getBasicStatistics(Partish partish) { Guice.createInjector( new BigQueryClientModule(), new HiveBigQueryConnectorModule(conf, hmsTable.getParameters())); - BigQueryClient bqClient = injector.getInstance(BigQueryClient.class); - HiveBigQueryConfig config = injector.getInstance(HiveBigQueryConfig.class); - return BigQueryUtils.getBasicStatistics(bqClient, config.getTableId()); + return BigQueryUtils.getBasicStatistics( + injector.getInstance(BigQueryClient.class), + injector.getInstance(HiveBigQueryConfig.class).getTableId()); } } diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryMetaHook.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryMetaHook.java index 6d049e69..0bce2575 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryMetaHook.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryMetaHook.java @@ -134,21 +134,32 @@ public static void assertDoesNotContainColumn(Table hmsTable, String columnName) } } - protected void createBigQueryTable(Table hmsTable, TableInfo bigQueryTableInfo) { - Injector injector = - Guice.createInjector( - new BigQueryClientModule(), - new HiveBigQueryConnectorModule(conf, hmsTable.getParameters())); - HiveBigQueryConfig opts = injector.getInstance(HiveBigQueryConfig.class); + protected void createBigQueryTable( + Injector injector, + TableId tableId, + StandardTableDefinition tableDefinition, + HiveBigQueryConfig opts, + Table hmsTable) { + // TODO: We currently can't use the `BigQueryClient.createTable()` because it doesn't have a way + // to + // pass a TableInfo. This forces us to duplicate some code below from the existing + // `BigQueryClient.createTable()`. One better long-term solution would be to add a + // `createTable(TableInfo)` method to BigQueryClient. See: + // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/1213 + TableInfo.Builder bigQueryTableInfo = + TableInfo.newBuilder(tableId, tableDefinition) + .setDescription(hmsTable.getParameters().get("comment")); + opts.getKmsKeyName() + .ifPresent( + keyName -> + bigQueryTableInfo.setEncryptionConfiguration( + EncryptionConfiguration.newBuilder().setKmsKeyName(keyName).build())); BigQueryCredentialsSupplier credentialsSupplier = injector.getInstance(BigQueryCredentialsSupplier.class); HeaderProvider headerProvider = injector.getInstance(HeaderProvider.class); - - // TODO: We cannot use the BigQueryClient class here because it doesn't have a - // `create(TableInfo)` method. We could add it to that class eventually. BigQuery bigQueryService = BigQueryUtils.getBigQueryService(opts, headerProvider, credentialsSupplier); - bigQueryService.create(bigQueryTableInfo); + bigQueryService.create(bigQueryTableInfo.build()); } /** @@ -247,12 +258,7 @@ public void preCreateTable(Table table) throws MetaException { tableDefBuilder.setTimePartitioning(tpBuilder.build()); } - StandardTableDefinition tableDefinition = tableDefBuilder.build(); - TableInfo bigQueryTableInfo = - TableInfo.newBuilder(tableId, tableDefinition) - .setDescription(table.getParameters().get("comment")) - .build(); - createBigQueryTable(table, bigQueryTableInfo); + createBigQueryTable(injector, tableId, tableDefBuilder.build(), opts, table); String hmsDbTableName = HiveUtils.getDbTableName(table); LOG.info("Created BigQuery table {} for {}", tableId, hmsDbTableName); @@ -366,6 +372,11 @@ public void rollbackInsertTable(Table table, boolean overwrite) throws MetaExcep } public void commitDropTable(Table table, boolean deleteData) throws MetaException { + if (conf.getBoolean(HiveBigQueryConfig.CONNECTOR_IN_TEST, false) + && conf.getBoolean(HiveBigQueryConfig.FORCE_DROP_FAILURE, false)) { + // For integration testing only + throw new RuntimeException(HiveBigQueryConfig.FORCED_DROP_FAILURE_ERROR_MESSAGE); + } if (!HiveUtils.isExternalTable(table) && deleteData) { // This is a managed table, so let's delete the table in BigQuery Injector injector = diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandlerBase.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandlerBase.java index db41f275..680c12e6 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandlerBase.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/BigQueryStorageHandlerBase.java @@ -15,10 +15,12 @@ */ package com.google.cloud.hive.bigquery.connector; -import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.TableInfo; import com.google.cloud.bigquery.connector.common.BigQueryClient; +import com.google.cloud.bigquery.connector.common.BigQueryClient.CreateTableOptions; import com.google.cloud.bigquery.connector.common.BigQueryClientModule; +import com.google.cloud.bigquery.connector.common.BigQueryConnectorException.InvalidSchemaException; import com.google.cloud.bigquery.connector.common.BigQueryCredentialsSupplier; import com.google.cloud.bigquery.connector.common.BigQueryUtil; import com.google.cloud.hive.bigquery.connector.config.HiveBigQueryConfig; @@ -29,11 +31,14 @@ import com.google.cloud.hive.bigquery.connector.output.FailureExecHook; import com.google.cloud.hive.bigquery.connector.utils.JobUtils; import com.google.cloud.hive.bigquery.connector.utils.avro.AvroUtils; +import com.google.cloud.hive.bigquery.connector.utils.bq.BigQuerySchemaConverter; import com.google.cloud.hive.bigquery.connector.utils.hcatalog.HCatalogUtils; import com.google.cloud.hive.bigquery.connector.utils.hive.HiveUtils; +import com.google.common.base.Preconditions; import com.google.inject.Guice; import com.google.inject.Injector; import java.io.IOException; +import java.util.Collections; import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; @@ -233,17 +238,8 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties) { // Special case for HCatalog diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/JobDetails.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/JobDetails.java index 0838ff4c..8dd85a7f 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/JobDetails.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/JobDetails.java @@ -48,6 +48,7 @@ public class JobDetails { private transient org.apache.avro.Schema avroSchema; // Only used by the 'indirect' write method private String avroSchemaJSON; // Only used by the 'indirect' write method private String writeMethod; + private boolean deleteTableOnAbort; public JobDetails() {} @@ -181,4 +182,12 @@ public void writeFile(Configuration conf) { throw new RuntimeException(e); } } + + public boolean isDeleteTableOnAbort() { + return deleteTableOnAbort; + } + + public void setDeleteTableOnAbort(boolean deleteTableOnAbort) { + this.deleteTableOnAbort = deleteTableOnAbort; + } } diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/config/HiveBigQueryConfig.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/config/HiveBigQueryConfig.java index 66bb2bf0..29713958 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/config/HiveBigQueryConfig.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/config/HiveBigQueryConfig.java @@ -70,6 +70,7 @@ public class HiveBigQueryConfig public static final String IMPERSONATE_FOR_GROUP_PREFIX = "bq.impersonation.service.account.for.group."; public static final String IMPERSONATE_SERVICE_ACCOUNT = "bq.impersonation.service.account"; + public static final String DESTINATION_TABLE_KMS_KEY_NAME = "bq.destination.table.kms.key.name"; public static final String CREATE_DISPOSITION_KEY = "bq.create.disposition"; public static final String VIEWS_ENABLED_KEY = "viewsEnabled"; public static final String FAIL_ON_UNSUPPORTED_UDFS = @@ -110,6 +111,13 @@ public class HiveBigQueryConfig public static final String JOB_DETAILS_FILE = "job-details.json"; public static final String QUERY_ID = "bq.connector.query.id"; + // For internal use only + public static final String CONNECTOR_IN_TEST = "hive.bq.connector.in.test"; + public static final String FORCE_DROP_FAILURE = "hive.bq.connector.test.force.drop.failure"; + public static final String FORCED_DROP_FAILURE_ERROR_MESSAGE = "Forced table drop failure"; + public static final String FORCE_COMMIT_FAILURE = "hive.bq.connector.test.force.commit.failure"; + public static final String FORCED_COMMIT_FAILURE_ERROR_MESSAGE = "Forced commit failure"; + TableId tableId; Optional traceId = empty(); @@ -125,6 +133,9 @@ public class HiveBigQueryConfig Optional> impersonationServiceAccountsForUsers; Optional> impersonationServiceAccountsForGroups; + // KMS + Optional destinationTableKmsKeyName = empty(); + // Reading parameters DataFormat readDataFormat; // ARROW or AVRO Optional createReadSessionTimeoutInSeconds; @@ -325,6 +336,10 @@ public static HiveBigQueryConfig from(Configuration conf, Map ta opts.tableId = BigQueryUtil.parseTableId(bqTable.get()); } + // KMS + opts.destinationTableKmsKeyName = + getOption(DESTINATION_TABLE_KMS_KEY_NAME, tableParameters, conf); + // Partitioning and clustering opts.partitionType = getOption(TIME_PARTITION_TYPE_KEY, tableParameters, conf) @@ -421,6 +436,10 @@ public List getLoadSchemaUpdateOptions() { return loadSchemaUpdateOptions; } + public void setLoadSchemaUpdateOptions(ImmutableList options) { + loadSchemaUpdateOptions = options; + } + @Override public boolean getEnableModeCheckForSchemaFields() { return enableModeCheckForSchemaFields; @@ -483,7 +502,7 @@ public String getParentProjectId() { @Override public java.util.Optional getKmsKeyName() { - return java.util.Optional.empty(); + return destinationTableKmsKeyName.toJavaUtil(); } @Override diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/FailureExecHook.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/FailureExecHook.java index 8dc0eee4..76b4035e 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/FailureExecHook.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/FailureExecHook.java @@ -20,10 +20,12 @@ import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity.WriteType; /** - * Post execution hook used to commit the outputs. We only use this with Hive 1 in combination with - * Tez. + * This hook is only used with Tez because Tez does not call `OutputCommitter.abortJob()` when a job + * fails. So we use this hook to replicate the same behavior that `OutputCommitter.abortJob()` would + * do with the MapReduce engine. */ public class FailureExecHook implements ExecuteWithHookContext { @@ -32,6 +34,8 @@ public void run(HookContext hookContext) throws Exception { for (WriteEntity entity : hookContext.getOutputs()) { if (entity.getType() == Type.TABLE && entity.getTable().getStorageHandler() != null + && (entity.getWriteType() == WriteType.INSERT + || entity.getWriteType() == WriteType.INSERT_OVERWRITE) && entity .getTable() .getStorageHandler() diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/OutputCommitterUtils.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/OutputCommitterUtils.java index 434bfeeb..cb98f93c 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/OutputCommitterUtils.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/OutputCommitterUtils.java @@ -28,6 +28,11 @@ public class OutputCommitterUtils { public static void commitJob(Configuration conf, JobDetails jobDetails) throws IOException { + if (conf.getBoolean(HiveBigQueryConfig.CONNECTOR_IN_TEST, false) + && conf.getBoolean(HiveBigQueryConfig.FORCE_COMMIT_FAILURE, false)) { + // For integration testing only + throw new RuntimeException(HiveBigQueryConfig.FORCED_COMMIT_FAILURE_ERROR_MESSAGE); + } if (jobDetails.getWriteMethod().equals(HiveBigQueryConfig.WRITE_METHOD_DIRECT)) { DirectOutputCommitter.commitJob(conf, jobDetails); } else { diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/direct/DirectOutputCommitter.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/direct/DirectOutputCommitter.java index 655e079c..9db78ae6 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/direct/DirectOutputCommitter.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/direct/DirectOutputCommitter.java @@ -15,7 +15,6 @@ */ package com.google.cloud.hive.bigquery.connector.output.direct; -import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.connector.common.BigQueryClient; import com.google.cloud.bigquery.connector.common.BigQueryClientFactory; import com.google.cloud.bigquery.connector.common.BigQueryClientModule; @@ -79,11 +78,6 @@ public static void commitJob(Configuration conf, JobDetails jobDetails) throws I new HiveBigQueryConnectorModule(conf, jobDetails.getTableProperties())); BigQueryClient bqClient = injector.getInstance(BigQueryClient.class); BigQueryClientFactory bqClientFactory = injector.getInstance(BigQueryClientFactory.class); - HiveBigQueryConfig opts = injector.getInstance(HiveBigQueryConfig.class); - - // Retrieve the BigQuery schema - Schema bigQuerySchema = bqClient.getTable(jobDetails.getTableId()).getDefinition().getSchema(); - // Finally, make the new data available in the destination table by committing the streams DirectWriterContext writerContext = new DirectWriterContext( @@ -91,12 +85,11 @@ public static void commitJob(Configuration conf, JobDetails jobDetails) throws I bqClientFactory, jobDetails.getTableId(), jobDetails.getFinalTableId(), - bigQuerySchema, - opts.getEnableModeCheckForSchemaFields()); + jobDetails.isDeleteTableOnAbort()); try { writerContext.commit(streamNames); } finally { - writerContext.clean(); + writerContext.clean(false); } } @@ -107,18 +100,13 @@ public static void abortJob(Configuration conf, JobDetails jobDetails) { new HiveBigQueryConnectorModule(conf, jobDetails.getTableProperties())); BigQueryClient bqClient = injector.getInstance(BigQueryClient.class); BigQueryClientFactory bqClientFactory = injector.getInstance(BigQueryClientFactory.class); - HiveBigQueryConfig config = injector.getInstance(HiveBigQueryConfig.class); - - // Retrieve the BigQuery schema - Schema bigQuerySchema = bqClient.getTable(jobDetails.getTableId()).getDefinition().getSchema(); DirectWriterContext writerContext = new DirectWriterContext( bqClient, bqClientFactory, jobDetails.getTableId(), jobDetails.getFinalTableId(), - bigQuerySchema, - config.getEnableModeCheckForSchemaFields()); - writerContext.clean(); + jobDetails.isDeleteTableOnAbort()); + writerContext.clean(true); } } diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/direct/DirectWriterContext.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/direct/DirectWriterContext.java index 6a93c11e..1ddcae95 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/direct/DirectWriterContext.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/direct/DirectWriterContext.java @@ -16,20 +16,15 @@ package com.google.cloud.hive.bigquery.connector.output.direct; import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.TableInfo; import com.google.cloud.bigquery.connector.common.BigQueryClient; import com.google.cloud.bigquery.connector.common.BigQueryClientFactory; import com.google.cloud.bigquery.connector.common.BigQueryConnectorException; -import com.google.cloud.bigquery.connector.common.BigQueryUtil; import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest.Builder; import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; -import com.google.common.base.Preconditions; -import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,10 +37,8 @@ public class DirectWriterContext { private final TableId tableIdToWrite; private final TableId destinationTableId; - private final boolean enableModeCheckForSchemaFields; - private final String tablePathForBigQueryStorage; - private boolean deleteTableOnAbort; + private final boolean deleteTableOnAbort; private final BigQueryWriteClient writeClient; @@ -54,55 +47,19 @@ public DirectWriterContext( BigQueryClientFactory bigQueryWriteClientFactory, TableId tableId, TableId destinationTableId, - Schema schema, - boolean enableModeCheckForSchemaFields) + boolean deleteTableOnAbort) throws IllegalArgumentException { this.bigQueryClient = bigQueryClient; - this.tableIdToWrite = getOrCreateTable(tableId, schema); + this.tableIdToWrite = tableId; this.destinationTableId = destinationTableId; this.tablePathForBigQueryStorage = bigQueryClient.createTablePathForBigQueryStorage(tableIdToWrite); this.writeClient = bigQueryWriteClientFactory.getBigQueryWriteClient(); - this.enableModeCheckForSchemaFields = enableModeCheckForSchemaFields; - } - - /** - * This function determines whether the destination table exists: if it doesn't, we will create a - * table and Hive will directly write to it. - * - * @param tableId the TableId, as was supplied by the user - * @param bigQuerySchema the bigQuery schema - * @return The TableId to which Hive will do the writing: whether that is the destination TableID - * or a temporary TableId. - */ - private TableId getOrCreateTable(TableId tableId, Schema bigQuerySchema) - throws IllegalArgumentException { - if (bigQueryClient.tableExists(tableId)) { - TableInfo destinationTable = bigQueryClient.getTable(tableId); - Schema tableSchema = destinationTable.getDefinition().getSchema(); - Preconditions.checkArgument( - BigQueryUtil.schemaWritable( - tableSchema, - bigQuerySchema, /* regardFieldOrder */ - false, - enableModeCheckForSchemaFields), - new BigQueryConnectorException.InvalidSchemaException( - "Destination table's schema is not compatible with query's" + " schema")); - deleteTableOnAbort = false; - return destinationTable.getTableId(); - } else { - deleteTableOnAbort = true; - return bigQueryClient - .createTable( - tableId, - bigQuerySchema, - BigQueryClient.CreateTableOptions.of(Optional.empty(), Collections.emptyMap())) - .getTableId(); - } + this.deleteTableOnAbort = deleteTableOnAbort; } public void commit(List streamNames) { - BatchCommitWriteStreamsRequest.Builder batchCommitWriteStreamsRequest = + Builder batchCommitWriteStreamsRequest = BatchCommitWriteStreamsRequest.newBuilder().setParent(tablePathForBigQueryStorage); batchCommitWriteStreamsRequest.addAllWriteStreams(streamNames); BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse = @@ -136,9 +93,8 @@ public void commit(List streamNames) { } } - public void clean() { - // Special case for "INSERT OVERWRITE" statements: delete the temporary table. - if (deleteTableOnAbort + public void clean(boolean isAborting) { + if (isAborting && deleteTableOnAbort || (destinationTableId != null && !destinationTableId.equals(tableIdToWrite))) { LOG.info("Deleting BigQuery table {}", tableIdToWrite); try { diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/indirect/IndirectOutputCommitter.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/indirect/IndirectOutputCommitter.java index 5dd37090..65d957c7 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/indirect/IndirectOutputCommitter.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/output/indirect/IndirectOutputCommitter.java @@ -16,6 +16,8 @@ package com.google.cloud.hive.bigquery.connector.output.indirect; import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobInfo.SchemaUpdateOption; import com.google.cloud.bigquery.JobInfo.WriteDisposition; import com.google.cloud.bigquery.connector.common.BigQueryClient; import com.google.cloud.bigquery.connector.common.BigQueryClientModule; @@ -25,6 +27,7 @@ import com.google.cloud.hive.bigquery.connector.utils.FileSystemUtils; import com.google.cloud.hive.bigquery.connector.utils.JobUtils; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; import com.google.inject.Guice; import com.google.inject.Injector; import java.io.IOException; @@ -63,10 +66,22 @@ public static void commitJob(Configuration conf, JobDetails jobDetails) throws I BigQueryClient bqClient = injector.getInstance(BigQueryClient.class); HiveBigQueryConfig opts = injector.getInstance(HiveBigQueryConfig.class); FormatOptions formatOptions = FormatOptions.avro(); - WriteDisposition writeDisposition = - jobDetails.isOverwrite() - ? WriteDisposition.WRITE_TRUNCATE - : WriteDisposition.WRITE_APPEND; + WriteDisposition writeDisposition; + if (jobDetails.isOverwrite()) { + // Truncate the table before inserting the new rows + writeDisposition = WriteDisposition.WRITE_TRUNCATE; + } else { + // Append the new rows + writeDisposition = WriteDisposition.WRITE_APPEND; + // Since Hive doesn't have a way of specifying required (NOT NULL) fields, we have to assume + // that all fields might be nullable, i.e. are of the Avro type UNION(NULL, ORIGINAL_TYPE). + // This is not needed in the overwrite case as WRITE_TRUNCATE would already take care of the + // schema change. + ImmutableList.Builder loadSchemaUpdateOptionsBuilder = + ImmutableList.builder(); + loadSchemaUpdateOptionsBuilder.add(SchemaUpdateOption.ALLOW_FIELD_RELAXATION); + opts.setLoadSchemaUpdateOptions(loadSchemaUpdateOptionsBuilder.build()); + } LOG.info("Loading avroFiles [ " + Joiner.on(",").join(avroFiles) + "]"); // Load the Avro files into BigQuery bqClient.loadDataIntoTable( diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/bq/BigQuerySchemaConverter.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/bq/BigQuerySchemaConverter.java index b73e68f9..add4b27d 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/bq/BigQuerySchemaConverter.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/bq/BigQuerySchemaConverter.java @@ -17,19 +17,72 @@ import com.google.cloud.bigquery.*; import com.google.cloud.hive.bigquery.connector.HiveCompat; +import com.google.cloud.hive.bigquery.connector.utils.hcatalog.HCatalogUtils; +import com.google.cloud.hive.bigquery.connector.utils.hive.HiveUtils; import com.google.cloud.hive.bigquery.connector.utils.hive.KeyValueObjectInspector; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.objectinspector.*; import org.apache.hadoop.hive.serde2.typeinfo.*; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; /** Converts Hive Schema to BigQuery schema. */ public class BigQuerySchemaConverter { + public static Schema toBigQuerySchema(Configuration conf, TableDesc tableDesc) { + if (HCatalogUtils.isHCatalogOutputJob(conf)) { + HCatSchema hcatSchema = HCatalogUtils.getHCatalogOutputJobInfo(conf).getOutputSchema(); + return toBigQuerySchema(hcatSchema); + } + // Fetch the Hive schema + Hive hive; + HiveConf hiveConf = + conf instanceof HiveConf ? (HiveConf) conf : new HiveConf(conf, HiveUtils.class); + try { + hive = Hive.get(hiveConf); + } catch (HiveException e) { + throw new RuntimeException(e); + } + String[] dbAndTableNames = tableDesc.getTableName().split("\\."); + if (dbAndTableNames.length != 2 + || dbAndTableNames[0].isEmpty() + || dbAndTableNames[1].isEmpty()) { + throw new IllegalArgumentException( + "Invalid table name format. Expected format 'dbName.tblName'. Received: " + + tableDesc.getTableName()); + } + Table table; + try { + table = hive.getTable(dbAndTableNames[0], dbAndTableNames[1]); + } catch (HiveException e) { + throw new RuntimeException(e); + } + // Convert the Hive schema to BigQuery schema + return toBigQuerySchema(table.getSd()); + } + + /** Converts the provided HCatalog schema to the corresponding BigQuery schema. */ + public static Schema toBigQuerySchema(HCatSchema hcatSchema) { + List bigQueryFields = new ArrayList<>(); + for (HCatFieldSchema hiveField : hcatSchema.getFields()) { + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(hiveField.getTypeString()); + bigQueryFields.add(buildBigQueryField(hiveField.getName(), typeInfo, hiveField.getComment())); + } + return Schema.of(bigQueryFields); + } + + /** Converts the provided Hive schema to the corresponding BigQuery schema. */ public static Schema toBigQuerySchema(StorageDescriptor sd) { List bigQueryFields = new ArrayList<>(); for (FieldSchema hiveField : sd.getCols()) { diff --git a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/bq/BigQueryUtils.java b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/bq/BigQueryUtils.java index 6516613c..09b43aa0 100644 --- a/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/bq/BigQueryUtils.java +++ b/hive-bigquery-connector-common/src/main/java/com/google/cloud/hive/bigquery/connector/utils/bq/BigQueryUtils.java @@ -74,8 +74,9 @@ public static Schema loadSchemaFromJSON(String json) { /** * Returns a BigQuery service object. We need this instead of the BigQueryClient class from the - * bigquery-connector-common library because that class doesn't have a `create(TableInfo)` method. - * See more about this in {@link BigQueryMetaHook#commitCreateTable(Table)} + * bigquery-connector-common library because that class's `createTable()` method currently doesn't + * have a way to pass a table description. See more about this in {@link + * BigQueryMetaHook#commitCreateTable(Table)} */ public static BigQuery getBigQueryService( HiveBigQueryConfig config, diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/TestUtils.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/TestUtils.java index 5589d083..eb6b6c51 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/TestUtils.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/TestUtils.java @@ -36,7 +36,6 @@ public class TestUtils { public static final String HIVECONF_SYSTEM_OVERRIDE_PREFIX = "hiveconf_"; public static final String LOCATION = "us"; public static final String TEST_TABLE_NAME = "test"; - public static final String SCHEMA_MISMATCH_TABLE_NAME = "schema_mismatch"; public static final String BIGLAKE_TABLE_NAME = "biglake"; public static final String TEST_VIEW_NAME = "test_view"; public static final String ANOTHER_TEST_TABLE_NAME = "another_test"; @@ -52,6 +51,8 @@ public class TestUtils { public static final String BIGLAKE_CONNECTION_ENV_VAR = "BIGLAKE_CONNECTION"; public static final String BIGLAKE_BUCKET_ENV_VAR = "BIGLAKE_BUCKET"; + public static final String KMS_KEY_NAME_ENV_VAR = "BIGQUERY_KMS_KEY_NAME"; + public static String BIGQUERY_TEST_TABLE_DDL = String.join( "\n", @@ -59,8 +60,6 @@ public class TestUtils { // insensitivity. See PR #98 "text STRING"); - public static String BIGQUERY_SCHEMA_MISMATCH_TABLE_DDL = String.join("\n", "number BYTES"); - public static String BIGQUERY_ANOTHER_TEST_TABLE_DDL = String.join("\n", "num INT64,", "str_val STRING"); @@ -106,8 +105,6 @@ public class TestUtils { public static String HIVE_TEST_TABLE_DDL = String.join("\n", "number BIGINT,", "text STRING"); - public static String HIVE_SCHEMA_MISMATCH_TABLE_DDL = String.join("\n", "number BIGINT"); - public static String HIVE_TEST_VIEW_DDL = String.join("\n", "number BIGINT,", "text STRING"); public static String HIVE_BIGLAKE_TABLE_DDL = @@ -215,6 +212,14 @@ public static String getBigLakeBucket() { return System.getenv().getOrDefault(BIGLAKE_BUCKET_ENV_VAR, getProject() + "-biglake-tests"); } + public static String getKmsKeyName() { + String kmsKeyName = System.getenv().get(KMS_KEY_NAME_ENV_VAR); + if (kmsKeyName == null) { + throw new RuntimeException(KMS_KEY_NAME_ENV_VAR + " env var is not set"); + } + return kmsKeyName; + } + /** * Returns the name of the bucket used to store temporary Avro files when testing the indirect * write method. This bucket is created automatically when running the tests. @@ -256,6 +261,12 @@ public static void dropBqTableIfExists(String dataset, String table) { TableId tableId = TableId.of(getProject(), dataset, table); if (getBigqueryClient().tableExists(tableId)) { getBigqueryClient().deleteTable(tableId); + try { + // Wait a bit to avoid rate limiting issues with the BQ backend for table operations + Thread.sleep(2000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/IntegrationTestsBase.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/IntegrationTestsBase.java index 591bf155..f77b9dcf 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/IntegrationTestsBase.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/IntegrationTestsBase.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.provider.Arguments; @@ -91,6 +90,8 @@ public static void setUpAll() { } catch (StorageException e) { if (e.getCode() == 409) { // The bucket already exists, which is okay. + } else { + throw new RuntimeException(e); } } @@ -313,13 +314,14 @@ public void initHive(String engine, String readDataFormat, String tempGcsPath) { System.getProperties().setProperty(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, engine); System.getProperties().setProperty(HiveBigQueryConfig.READ_DATA_FORMAT_KEY, readDataFormat); System.getProperties().setProperty(HiveBigQueryConfig.TEMP_GCS_PATH_KEY, tempGcsPath); + System.getProperties().setProperty(HiveBigQueryConfig.CONNECTOR_IN_TEST, "true"); System.getProperties() .setProperty( "fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"); // GCS Connector // This is needed to avoid an odd exception when running the tests with Tez and Hadoop 3. // Similar issue to what's described in https://issues.apache.org/jira/browse/HIVE-24734 - System.getProperties().setProperty(MRJobConfig.MAP_MEMORY_MB, "1024"); + System.getProperties().setProperty("mapreduce.map.memory.mb", "1024"); // Apply system properties to the Hive conf Enumeration propertyNames = System.getProperties().propertyNames(); diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/ManagedAndExternalHiveTableIntegrationTestsBase.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/ManagedAndExternalHiveTableIntegrationTestsBase.java index 1700fe48..696af906 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/ManagedAndExternalHiveTableIntegrationTestsBase.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/ManagedAndExternalHiveTableIntegrationTestsBase.java @@ -20,6 +20,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.hive.bigquery.connector.TestUtils; +import com.google.cloud.hive.bigquery.connector.config.HiveBigQueryConfig; import org.junit.jupiter.api.Test; public abstract class ManagedAndExternalHiveTableIntegrationTestsBase extends IntegrationTestsBase { @@ -40,10 +42,10 @@ public void testCreateManagedTable() { // and that the two schemas are the same TableInfo managedTableInfo = getTableInfo(dataset, MANAGED_TEST_TABLE_NAME); TableInfo allTypesTableInfo = getTableInfo(dataset, ALL_TYPES_TABLE_NAME); - assertEquals(managedTableInfo.getDescription(), allTypesTableInfo.getDescription()); + assertEquals(allTypesTableInfo.getDescription(), managedTableInfo.getDescription()); assertEquals( - managedTableInfo.getDefinition().getSchema(), - allTypesTableInfo.getDefinition().getSchema()); + allTypesTableInfo.getDefinition().getSchema(), + managedTableInfo.getDefinition().getSchema()); } // --------------------------------------------------------------------------------------------------- @@ -110,6 +112,27 @@ public void testDropManagedTable() { // --------------------------------------------------------------------------------------------------- + @Test + public void testDropManagedTableFailure() { + System.getProperties().setProperty(HiveBigQueryConfig.FORCE_DROP_FAILURE, "true"); + initHive(); + // Make sure the managed table doesn't exist yet in BigQuery + dropBqTableIfExists(dataset, MANAGED_TEST_TABLE_NAME); + assertFalse(bQTableExists(dataset, MANAGED_TEST_TABLE_NAME)); + // Create the managed table using Hive + createManagedTable(MANAGED_TEST_TABLE_NAME, HIVE_ALL_TYPES_TABLE_DDL); + // Check that the table was created in BigQuery + assertTrue(bQTableExists(dataset, MANAGED_TEST_TABLE_NAME)); + // Attempt to drop the managed table using hive + Throwable exception = + assertThrows( + RuntimeException.class, () -> runHiveQuery("DROP TABLE " + MANAGED_TEST_TABLE_NAME)); + assertTrue( + exception.getMessage().contains(HiveBigQueryConfig.FORCED_DROP_FAILURE_ERROR_MESSAGE)); + } + + // --------------------------------------------------------------------------------------------------- + @Test public void testDropExternalTable() { initHive(); @@ -119,4 +142,15 @@ public void testDropExternalTable() { // Check that the table still exists in BigQuery assertTrue(bQTableExists(dataset, TEST_TABLE_NAME)); } + + @Test + void createCmekTable() { + System.getProperties() + .setProperty(HiveBigQueryConfig.DESTINATION_TABLE_KMS_KEY_NAME, TestUtils.getKmsKeyName()); + initHive(); + dropBqTableIfExists(dataset, MANAGED_TEST_TABLE_NAME); + createManagedTable(MANAGED_TEST_TABLE_NAME, HIVE_ALL_TYPES_TABLE_DDL); + TableInfo tableInfo = getTableInfo(dataset, MANAGED_TEST_TABLE_NAME); + assertEquals(TestUtils.getKmsKeyName(), tableInfo.getEncryptionConfiguration().getKmsKeyName()); + } } diff --git a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/WriteIntegrationTestsBase.java b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/WriteIntegrationTestsBase.java index f7e0af90..27a5fb0f 100644 --- a/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/WriteIntegrationTestsBase.java +++ b/hive-bigquery-connector-common/src/test/java/com/google/cloud/hive/bigquery/connector/integration/WriteIntegrationTestsBase.java @@ -20,7 +20,9 @@ import static org.junit.jupiter.api.Assertions.*; import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.TableInfo; import com.google.cloud.bigquery.TableResult; +import com.google.cloud.hive.bigquery.connector.TestUtils; import com.google.cloud.hive.bigquery.connector.config.HiveBigQueryConfig; import com.google.cloud.storage.Blob; import com.google.common.collect.Streams; @@ -533,43 +535,60 @@ public void testMultiReadWrite(String engine, String readDataFormat, String writ // --------------------------------------------------------------------------------------------------- - /** Check that we correctly clean things up in case of a query failure */ + /** Check that we correctly clean things up in case of a query failure. */ @ParameterizedTest @MethodSource(EXECUTION_ENGINE_WRITE_METHOD) - public void testWriteFailure(String engine, String writeMethod) { + public void testFailure(String engine, String writeMethod) { + // Set up the job for failure + System.getProperties().setProperty(HiveBigQueryConfig.FORCE_COMMIT_FAILURE, "true"); System.getProperties().setProperty(HiveBigQueryConfig.WRITE_METHOD_KEY, writeMethod); initHive(engine); - - // Create the two mismatched tables - createExternalTable(SCHEMA_MISMATCH_TABLE_NAME, HIVE_SCHEMA_MISMATCH_TABLE_DDL); - createBqTable(SCHEMA_MISMATCH_TABLE_NAME, BIGQUERY_SCHEMA_MISMATCH_TABLE_DDL); - - // Make sure the query fails + String tableName = String.format("failure_%s", writeMethod); + createExternalTable(tableName, HIVE_TEST_TABLE_DDL, BIGQUERY_TEST_TABLE_DDL); + // Make sure the insert query fails Throwable exception = assertThrows( RuntimeException.class, - () -> - runHiveQuery( - String.format("INSERT INTO %s VALUES (123)", SCHEMA_MISMATCH_TABLE_NAME))); - + () -> runHiveQuery("INSERT INTO " + tableName + " VALUES (123, 'hello')")); // Check for error messages, which are only available when using Tez if (hive.getHiveConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { - if (writeMethod.equals(HiveBigQueryConfig.WRITE_METHOD_DIRECT)) { - // Error message from the Storage Write API - assertTrue(exception.getMessage().contains("The proto field mismatched with BigQuery")); - } else { - // Error message from the Load Job API - assertTrue( - exception.getMessage().contains("Field number has changed type from BYTES to INTEGER")); - } + assertTrue( + exception.getMessage().contains(HiveBigQueryConfig.FORCED_COMMIT_FAILURE_ERROR_MESSAGE)); } - // Make sure no rows were inserted - TableResult result = - runBqQuery(String.format("SELECT * FROM `${dataset}.%s`", SCHEMA_MISMATCH_TABLE_NAME)); + TableResult result = runBqQuery(String.format("SELECT * FROM `${dataset}.%s`", tableName)); assertEquals(0, result.getTotalRows()); // Make sure things are correctly cleaned up checkThatWorkDirsHaveBeenCleaned(); } + + // --------------------------------------------------------------------------------------------------- + + /** Check that we can write to an encrypted table. */ + @ParameterizedTest + @MethodSource(EXECUTION_ENGINE_WRITE_METHOD) + public void testWriteToCmekTable(String engine, String writeMethod) { + System.getProperties() + .setProperty(HiveBigQueryConfig.DESTINATION_TABLE_KMS_KEY_NAME, TestUtils.getKmsKeyName()); + System.getProperties().setProperty(HiveBigQueryConfig.WRITE_METHOD_KEY, writeMethod); + initHive(engine); + String tableName = String.format("cmek_%s", writeMethod); + createExternalTable(tableName, HIVE_TEST_TABLE_DDL); + // Run an insert query using Hive + runHiveQuery("INSERT INTO " + tableName + " VALUES (123, 'hello')"); + // Check that the table was created in BQ with the correct key name + TableInfo tableInfo = getTableInfo(dataset, tableName); + assertEquals(TestUtils.getKmsKeyName(), tableInfo.getEncryptionConfiguration().getKmsKeyName()); + // Read the data using the BQ SDK + TableResult result = + runBqQuery(String.format("SELECT * FROM `${dataset}.%s` ORDER BY number", tableName)); + // Verify we get the expected values + assertEquals(1, result.getTotalRows()); + List rows = Streams.stream(result.iterateAll()).collect(Collectors.toList()); + assertEquals(123L, rows.get(0).get(0).getLongValue()); + assertEquals("hello", rows.get(0).get(1).getStringValue()); + // Make sure things are correctly cleaned up + checkThatWorkDirsHaveBeenCleaned(); + } }