Skip to content

Commit

Permalink
Add CMEK support
Browse files Browse the repository at this point in the history
Also remove the previous limitation where you could only insert data into a BQ table that already existed.
  • Loading branch information
jphalip committed Apr 15, 2024
1 parent 7971a8a commit 15af4a1
Show file tree
Hide file tree
Showing 17 changed files with 310 additions and 150 deletions.
81 changes: 68 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,17 +278,18 @@ You can use the following properties in the `TBLPROPERTIES` clause when you crea

You can set the following Hive/Hadoop configuration properties in your environment:

| Property | Default value | Description |
|---------------------------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `bq.read.data.format` | `arrow` | Data format used for reads from BigQuery. Possible values: `arrow`, `avro`. |
| `bq.temp.gcs.path` | | GCS location for storing temporary Avro files when using the `indirect` write method |
| `bq.write.method` | `direct` | Indicates how to write data to BigQuery. Possible values: `direct` (to directly write to the BigQuery storage API), `indirect` (to stage temporary Avro files to GCS before loading into BigQuery). |
| `bq.work.dir.parent.path` | `${hadoop.tmp.dir}` | Parent path on HDFS where each job creates its temporary work directory |
| `bq.work.dir.name.prefix` | `hive-bq-` | Prefix used for naming the jobs' temporary directories. |
| `materializationProject` | | Project used to temporarily materialize data when reading views. Defaults to the same project as the read view. |
| `materializationDataset` | | Dataset used to temporarily materialize data when reading views. Defaults to the same dataset as the read view. |
| `maxParallelism` | | Maximum initial number of read streams |
| `viewsEnabled` | `false` | Set it to `true` to enable reading views. |
| Property | Default value | Description |
|-------------------------------------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `bq.read.data.format` | `arrow` | Data format used for reads from BigQuery. Possible values: `arrow`, `avro`. |
| `bq.temp.gcs.path` | | GCS location for storing temporary Avro files when using the `indirect` write method |
| `bq.write.method` | `direct` | Indicates how to write data to BigQuery. Possible values: `direct` (to directly write to the BigQuery storage API), `indirect` (to stage temporary Avro files to GCS before loading into BigQuery). |
| `bq.work.dir.parent.path` | `${hadoop.tmp.dir}` | Parent path on HDFS where each job creates its temporary work directory |
| `bq.work.dir.name.prefix` | `hive-bq-` | Prefix used for naming the jobs' temporary directories. |
| `bq.destination.table.kms.key.name` | | Cloud KMS encryption key used to protect the job's destination BigQuery table. Read more in the section on [customer-managed encryption keys](#customer-managed-encryption-keys) |
| `materializationProject` | | Project used to temporarily materialize data when reading views. Defaults to the same project as the read view. |
| `materializationDataset` | | Dataset used to temporarily materialize data when reading views. Defaults to the same dataset as the read view. |
| `maxParallelism` | | Maximum initial number of read streams |
| `viewsEnabled` | `false` | Set it to `true` to enable reading views. |

## Data Type Mapping

Expand Down Expand Up @@ -713,6 +714,24 @@ There are multiple options to override the default behavior and to provide custo
with the `bq.access.token` configuration property. You can generate an access token by running
`gcloud auth application-default print-access-token`.

## Customer-managed encryption key (CMEK)

You can provide a Cloud KMS key to be used to encrypt the destination table, for example when you
run a `CREATE TABLE` statement for a managed table, or when you insert data into a table that
doesn't exist yet. To do so, set the `bq.destination.table.kms.key.name` property with the
fully-qualified named of the desired Cloud KMS key in the form:
```
projects/<KMS_PROJECT_ID>/locations/<LOCATION>/keyRings/<KEY_RING>/cryptoKeys/<KEY>
```
The BigQuery service account associated with your project requires access to this encryption key.
The table will be encrypted by the key only if it created by the connector. A pre-existing
unencrypted table won't be encrypted just by setting this option.

For further information about using customer-managed encryption keys (CMEK) with BigQuery, see [here](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id).

## Known issues and limitations

* The `UPDATE`, `MERGE`, and `DELETE`, and `ALTER TABLE` statements are currently not supported.
Expand Down Expand Up @@ -766,15 +785,16 @@ Enable the following APIs:
```sh
gcloud services enable \
bigquerystorage.googleapis.com \
bigqueryconnection.googleapis.com
bigqueryconnection.googleapis.com \
cloudkms.googleapis.com
```
#### BigLake setup
Define environment variables:
```sh
export PROJECT=my-gcp-project
export PROJECT=<my-gcp-project>
export BIGLAKE_LOCATION=us
export BIGLAKE_REGION=us-central1
export BIGLAKE_CONNECTION=hive-integration-tests
Expand Down Expand Up @@ -807,6 +827,41 @@ export BIGLAKE_SA=$(bq show --connection --format json "${PROJECT}.${BIGLAKE_LOC
gsutil iam ch serviceAccount:${BIGLAKE_SA}:objectViewer gs://${BIGLAKE_BUCKET}
```

#### KMS setup

Create a KMS keyring:

```sh
gcloud kms keyrings create \
integration_tests_keyring \
--location us
```

```sh
gcloud kms keys create integration_tests_key \
--keyring integration_tests_keyring \
--location us \
--purpose "encryption"
```

Obtain the BigQuery service account name:

```sh
BQ_SERVICE_ACCOUNT=$(bq show --encryption_service_account --format json | jq -r ".ServiceAccountID")
```

Assign the Encrypter/Decrypter role to the BigQuery service account:

```sh
gcloud kms keys add-iam-policy-binding \
--project=${PROJECT} \
--member serviceAccount:${BQ_SERVICE_ACCOUNT} \
--role roles/cloudkms.cryptoKeyEncrypterDecrypter \
--location=us \
--keyring=integration_tests_keyring \
integration_tests_key
```

#### Running the tests

You must use Java version 8, as it's the version that Hive itself uses. Make sure that `JAVA_HOME` points to the Java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void run(HookContext hookContext) throws Exception {
context.setCmd(hookContext.getQueryPlan().getQueryString());
ParseDriver parseDriver = new ParseDriver();
ASTNode tree = parseDriver.parse(hookContext.getQueryPlan().getQueryString(), context);
HiveConf hiveConf = new HiveConf(conf, HiveConf.class);
HiveConf hiveConf = new HiveConf(conf, this.getClass());
SemanticAnalyzer analyzer = new SemanticAnalyzer(hiveConf);
if (tree.getChildren().size() == 0 || tree.getChild(0).getType() != HiveParser.TOK_QUERY) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public Map<String, String> getBasicStatistics(Partish partish) {
Guice.createInjector(
new BigQueryClientModule(),
new HiveBigQueryConnectorModule(conf, hmsTable.getParameters()));
BigQueryClient bqClient = injector.getInstance(BigQueryClient.class);
HiveBigQueryConfig config = injector.getInstance(HiveBigQueryConfig.class);
return BigQueryUtils.getBasicStatistics(bqClient, config.getTableId());
return BigQueryUtils.getBasicStatistics(
injector.getInstance(BigQueryClient.class),
injector.getInstance(HiveBigQueryConfig.class).getTableId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,32 @@ public static void assertDoesNotContainColumn(Table hmsTable, String columnName)
}
}

protected void createBigQueryTable(Table hmsTable, TableInfo bigQueryTableInfo) {
Injector injector =
Guice.createInjector(
new BigQueryClientModule(),
new HiveBigQueryConnectorModule(conf, hmsTable.getParameters()));
HiveBigQueryConfig opts = injector.getInstance(HiveBigQueryConfig.class);
protected void createBigQueryTable(
Injector injector,
TableId tableId,
StandardTableDefinition tableDefinition,
HiveBigQueryConfig opts,
Table hmsTable) {
// TODO: We currently can't use the `BigQueryClient.createTable()` because it doesn't have a way
// to
// pass a TableInfo. This forces us to duplicate some code below from the existing
// `BigQueryClient.createTable()`. One better long-term solution would be to add a
// `createTable(TableInfo)` method to BigQueryClient. See:
// https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/1213
TableInfo.Builder bigQueryTableInfo =
TableInfo.newBuilder(tableId, tableDefinition)
.setDescription(hmsTable.getParameters().get("comment"));
opts.getKmsKeyName()
.ifPresent(
keyName ->
bigQueryTableInfo.setEncryptionConfiguration(
EncryptionConfiguration.newBuilder().setKmsKeyName(keyName).build()));
BigQueryCredentialsSupplier credentialsSupplier =
injector.getInstance(BigQueryCredentialsSupplier.class);
HeaderProvider headerProvider = injector.getInstance(HeaderProvider.class);

// TODO: We cannot use the BigQueryClient class here because it doesn't have a
// `create(TableInfo)` method. We could add it to that class eventually.
BigQuery bigQueryService =
BigQueryUtils.getBigQueryService(opts, headerProvider, credentialsSupplier);
bigQueryService.create(bigQueryTableInfo);
bigQueryService.create(bigQueryTableInfo.build());
}

/**
Expand Down Expand Up @@ -247,12 +258,7 @@ public void preCreateTable(Table table) throws MetaException {
tableDefBuilder.setTimePartitioning(tpBuilder.build());
}

StandardTableDefinition tableDefinition = tableDefBuilder.build();
TableInfo bigQueryTableInfo =
TableInfo.newBuilder(tableId, tableDefinition)
.setDescription(table.getParameters().get("comment"))
.build();
createBigQueryTable(table, bigQueryTableInfo);
createBigQueryTable(injector, tableId, tableDefBuilder.build(), opts, table);

String hmsDbTableName = HiveUtils.getDbTableName(table);
LOG.info("Created BigQuery table {} for {}", tableId, hmsDbTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package com.google.cloud.hive.bigquery.connector;

import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.connector.common.BigQueryClient;
import com.google.cloud.bigquery.connector.common.BigQueryClient.CreateTableOptions;
import com.google.cloud.bigquery.connector.common.BigQueryClientModule;
import com.google.cloud.bigquery.connector.common.BigQueryConnectorException.InvalidSchemaException;
import com.google.cloud.bigquery.connector.common.BigQueryCredentialsSupplier;
import com.google.cloud.bigquery.connector.common.BigQueryUtil;
import com.google.cloud.hive.bigquery.connector.config.HiveBigQueryConfig;
Expand All @@ -29,16 +31,21 @@
import com.google.cloud.hive.bigquery.connector.output.FailureExecHook;
import com.google.cloud.hive.bigquery.connector.utils.JobUtils;
import com.google.cloud.hive.bigquery.connector.utils.avro.AvroUtils;
import com.google.cloud.hive.bigquery.connector.utils.bq.BigQuerySchemaConverter;
import com.google.cloud.hive.bigquery.connector.utils.hcatalog.HCatalogUtils;
import com.google.cloud.hive.bigquery.connector.utils.hive.HiveUtils;
import com.google.common.base.Preconditions;
import com.google.inject.Guice;
import com.google.inject.Injector;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
Expand All @@ -57,6 +64,7 @@
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hive.hcatalog.common.HCatConstants;
import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
import org.apache.thrift.TException;

/** Main entrypoint for Hive/BigQuery interactions. */
@SuppressWarnings({"rawtypes", "deprecated"})
Expand Down Expand Up @@ -236,13 +244,16 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String
Guice.createInjector(
new BigQueryClientModule(), new HiveBigQueryConnectorModule(conf, tableProperties));

// Retrieve some info from the BQ table
TableId tableId =
BigQueryUtil.parseTableId(tableProperties.getProperty(HiveBigQueryConfig.TABLE_KEY));
TableInfo bqTableInfo = injector.getInstance(BigQueryClient.class).getTable(tableId);
if (bqTableInfo == null) {
throw new RuntimeException("BigQuery table does not exist: " + tableId);
// Convert Hive schema to BigQuery schema
HiveMetaStoreClient metastoreClient = HiveUtils.createMetastoreClient(conf);
String[] dbAndTableNames = tableDesc.getTableName().split("\\.");
Table table;
try {
table = metastoreClient.getTable(dbAndTableNames[0], dbAndTableNames[1]);
} catch (TException e) {
throw new RuntimeException(e);
}
Schema bigQuerySchema = BigQuerySchemaConverter.toBigQuerySchema(table.getSd());

// More special treatment for HCatalog
if (HCatalogUtils.isHCatalogOutputJob(conf)) {
Expand All @@ -256,19 +267,55 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String
JobDetails jobDetails = new JobDetails();
HiveBigQueryConfig opts = HiveBigQueryConfig.from(conf, tableProperties);
jobDetails.setWriteMethod(opts.getWriteMethod());
jobDetails.setBigquerySchema(bqTableInfo.getDefinition().getSchema());
jobDetails.setBigquerySchema(bigQuerySchema);
jobDetails.setTableProperties(tableProperties);
jobDetails.setTableId(tableId);
jobDetails.setTableId(
BigQueryUtil.parseTableId(tableProperties.getProperty(HiveBigQueryConfig.TABLE_KEY)));

if (opts.getWriteMethod().equals(HiveBigQueryConfig.WRITE_METHOD_INDIRECT)) {
configureJobDetailsForIndirectWrite(
opts, jobDetails, injector.getInstance(BigQueryCredentialsSupplier.class));
}

createBigQueryTableIfDoesNotExist(jobDetails);

// Save the job details file to disk
jobDetails.writeFile(conf);
}

/**
* This function determines whether the destination table exists: if it doesn't, then create it.
*/
public void createBigQueryTableIfDoesNotExist(JobDetails jobDetails)
throws IllegalArgumentException {
Injector injector =
Guice.createInjector(new BigQueryClientModule(), new HiveBigQueryConnectorModule(conf));
BigQueryClient bqClient = injector.getInstance(BigQueryClient.class);
HiveBigQueryConfig opts = injector.getInstance(HiveBigQueryConfig.class);
if (bqClient.tableExists(jobDetails.getTableId())) {
// Check that the destination table's schema matches that of the insert query
TableInfo destinationTable = bqClient.getTable(jobDetails.getTableId());
Schema destinationTableSchema = destinationTable.getDefinition().getSchema();
Preconditions.checkArgument(
BigQueryUtil.schemaWritable(
jobDetails.getBigquerySchema(),
destinationTableSchema,
false, /* regardFieldOrder */
opts.getEnableModeCheckForSchemaFields()),
new InvalidSchemaException(
"Destination table's schema is not compatible with query's schema"));
jobDetails.setDeleteTableOnAbort(false);
} else {
jobDetails.setDeleteTableOnAbort(true);
bqClient
.createTable(
jobDetails.getTableId(),
jobDetails.getBigquerySchema(),
CreateTableOptions.of(opts.getKmsKeyName(), Collections.emptyMap()))
.getTableId();
}
}

@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
// Special case for HCatalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class JobDetails {
private transient org.apache.avro.Schema avroSchema; // Only used by the 'indirect' write method
private String avroSchemaJSON; // Only used by the 'indirect' write method
private String writeMethod;
private boolean deleteTableOnAbort;

public JobDetails() {}

Expand Down Expand Up @@ -181,4 +182,12 @@ public void writeFile(Configuration conf) {
throw new RuntimeException(e);
}
}

public boolean isDeleteTableOnAbort() {
return deleteTableOnAbort;
}

public void setDeleteTableOnAbort(boolean deleteTableOnAbort) {
this.deleteTableOnAbort = deleteTableOnAbort;
}
}
Loading

0 comments on commit 15af4a1

Please sign in to comment.