Skip to content

Commit

Permalink
Sample optimized tables (#715)
Browse files Browse the repository at this point in the history
* sample optimized table with tests
  • Loading branch information
kbtian authored Mar 29, 2022
1 parent 4090580 commit fd3fd1f
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 44 deletions.
51 changes: 51 additions & 0 deletions gcp_variant_transforms/libs/bigquery_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,57 @@ def compose_table_name(base_name, suffix):
# type: (str, str) -> str
return TABLE_SUFFIX_SEPARATOR.join([base_name, suffix])


def compose_temp_table_base(base_name, prefix):
# type: (str, str) -> str
table_re_match = re.match(
r'^((?P<project>.+):)(?P<dataset>\w+)\.(?P<table>[\w\$]+)$', base_name)
project_id = table_re_match.group('project')
dataset_id = table_re_match.group('dataset')
table_id = table_re_match.group('table')
temp_table_base_name = '{}:{}.{}{}'.format(project_id, dataset_id, prefix,
table_id)
return temp_table_base_name


def get_non_temp_table_name(temp_table, prefix):
table_re_match = re.match(
rf'^((?P<project>.+):)(?P<dataset>\w+)\.(?P<tmpname>{prefix})(?P<table>[\w\$]+)$',
temp_table)
project_id = table_re_match.group('project')
dataset_id = table_re_match.group('dataset')
table_id = table_re_match.group('table')
non_temp_table_name = ('{}:{}.{}'.format(project_id, dataset_id, table_id))
return non_temp_table_name


def copy_table(source, destination):
source_table_re_match = re.match(
r'^((?P<project>.+):)(?P<dataset>\w+)\.(?P<table>[\w\$]+)$', source)
source_project_id = source_table_re_match.group('project')
source_dataset_id = source_table_re_match.group('dataset')
source_table_id = source_table_re_match.group('table')
destination_table_re_match = re.match(
r'^((?P<project>.+):)(?P<dataset>\w+)\.(?P<table>[\w\$]+)$', destination)
destination_project_id = destination_table_re_match.group('project')
destination_dataset_id = destination_table_re_match.group('dataset')
destination_table_id = destination_table_re_match.group('table')
client = bigquery.Client(project=source_project_id)
source_dataset = client.dataset(
dataset_id=source_dataset_id, project=source_project_id)
source_table = source_dataset.table(source_table_id)
destination_dataset = client.dataset(
dataset_id=destination_dataset_id, project=destination_project_id)
destination_table = destination_dataset.table(destination_table_id)
job_config = bigquery.job.CopyJobConfig(write_disposition='WRITE_APPEND')
copy_job = client.copy_table(
source_table, destination_table, job_config=job_config)
try:
results = copy_job.result(timeout=600)
except TimeoutError as e:
logging.warning('Time out copying from temp table: %s', source_table)


def get_table_base_name(table_name):
return table_name.split(TABLE_SUFFIX_SEPARATOR)[0]

Expand Down
2 changes: 1 addition & 1 deletion gcp_variant_transforms/libs/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self, base_table_id, suffixes, append):
self._sub_fields = []

job_config = bigquery.job.QueryJobConfig(
write_disposition='WRITE_TRUNCATE' if append else 'WRITE_EMPTY')
write_disposition='WRITE_APPEND' if append else 'WRITE_EMPTY')
self._client = bigquery.Client(project=self._project_id,
default_query_job_config=job_config)
self._find_one_non_empty_table()
Expand Down
177 changes: 134 additions & 43 deletions gcp_variant_transforms/vcf_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ def _record_newly_created_table(full_table_id):
global _newly_created_tables # pylint: disable=global-statement
_newly_created_tables.append(full_table_id)

_new_temp_tables = []
def _record_new_temp_table(full_table_id):
global _new_temp_tables # pylint: disable=global-statement
_new_temp_tables.append(full_table_id)

def _read_variants(all_patterns, # type: List[str]
pipeline, # type: beam.Pipeline
Expand Down Expand Up @@ -569,20 +573,137 @@ def run(argv=None):
known_args.output_table))

suffixes.append(sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX)
load_avro = avro_util.LoadAvro(
avro_root_path, known_args.output_table, suffixes, False)
not_empty_variant_suffixes = load_avro.start_loading()
logging.info('Following tables were loaded with at least 1 row:')
for suffix in not_empty_variant_suffixes:
logging.info(bigquery_util.compose_table_name(known_args.output_table,
suffix))
# Remove sample_info table from both lists to avoid duplicating it when
# --sample_lookup_optimized_output_table flag is set
suffixes.remove(sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX)
if sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX in\
not_empty_variant_suffixes:
not_empty_variant_suffixes.remove(

# If creating sample optimized tables and running in append mode, create
# temp tables first, to be able to copy just new records to sample
# optimized tables.
if (known_args.append and known_args.sample_lookup_optimized_output_table):
tmp_prefix = '_tmp_'
temp_table_base_name = bigquery_util.compose_temp_table_base(
known_args.output_table, tmp_prefix)

temp_suffixes = []
for i in range(num_shards):
temp_suffixes.append(sharding.get_output_table_suffix(i))
temp_partition_range_end = sharding.get_output_table_partition_range_end(
i)
temp_table_name = bigquery_util.compose_table_name(
temp_table_base_name, temp_suffixes[i])
partitioning.create_bq_table(
temp_table_name, schema_file,
bigquery_util.ColumnKeyConstants.START_POSITION,
temp_partition_range_end)
_record_newly_created_table(temp_table_name)
logging.info('Integer range partitioned table %s was created.',
temp_table_name)
_record_new_temp_table(temp_table_name)
temp_sample_table_id = sample_info_table_schema_generator.create_sample_info_table(
temp_table_base_name)
_record_newly_created_table(temp_sample_table_id)
_record_new_temp_table(temp_sample_table_id)
temp_suffixes.append(
sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX)
temp_load_avro = avro_util.LoadAvro(avro_root_path, temp_table_base_name,
temp_suffixes, False)
temp_not_empty_variant_suffixes = temp_load_avro.start_loading()

# Copy tables
for temp_t in _new_temp_tables:
try:
output_table = bigquery_util.get_non_temp_table_name(
temp_t, tmp_prefix)
bigquery_util.copy_table(temp_t, output_table)
except Exception as e:
logging.error(
'Something unexpected during the copy of the temp '
'table: %s to the target table %s: %s', temp_t, output_table,
str(e))

# Remove sample_info table from both lists to avoid duplicating it when
# --sample_lookup_optimized_output_table flag is set
temp_suffixes.remove(
sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX)
if sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX in temp_not_empty_variant_suffixes:
temp_not_empty_variant_suffixes.remove(
sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX)

# Copy to sample optimized tables
temp_flatten_call_column = partitioning.FlattenCallColumn(
temp_table_base_name, temp_not_empty_variant_suffixes,
known_args.append)
try:
temp_flatten_schema_file = tempfile.mkstemp(
suffix=_BQ_SCHEMA_FILE_SUFFIX)[1]
if not temp_flatten_call_column.get_flatten_table_schema(
temp_flatten_schema_file):
raise ValueError('Failed to extract schema of flatten table')
# Copy to flatten sample lookup tables from the variant lookup tables.
temp_flatten_call_column.copy_to_flatten_table(
known_args.sample_lookup_optimized_output_table)
logging.info('All sample lookup optimized tables are fully loaded.')
except Exception as e:
logging.error(
'Something unexpected happened during the loading rows to '
'sample optimized table stage. Since this copy failed, the '
'temporary tables were not deleted. To avoid extra storage '
'charges, delete the temporary tables in your dataset that '
'will begin with %s. Error: %s', tmp_prefix, str(e))
raise e
else:
for temp_t in _new_temp_tables:
if bigquery_util.delete_table(temp_t) != 0:
logging.error('Deletion of temporary table "%s" has failed.',
temp_t)

else:
load_avro = avro_util.LoadAvro(avro_root_path, known_args.output_table,
suffixes, False)
not_empty_variant_suffixes = load_avro.start_loading()
logging.info('Following tables were loaded with at least 1 row:')
for suffix in not_empty_variant_suffixes:
logging.info(
bigquery_util.compose_table_name(known_args.output_table, suffix))
# Remove sample_info table from both lists to avoid duplicating it when
# --sample_lookup_optimized_output_table flag is set
suffixes.remove(
sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX)
if sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX in\
not_empty_variant_suffixes:
not_empty_variant_suffixes.remove(
sample_info_table_schema_generator.SAMPLE_INFO_TABLE_SUFFIX)

if known_args.sample_lookup_optimized_output_table:
flatten_call_column = partitioning.FlattenCallColumn(
known_args.output_table, not_empty_variant_suffixes,
known_args.append)
try:
flatten_schema_file = tempfile.mkstemp(
suffix=_BQ_SCHEMA_FILE_SUFFIX)[1]
if not flatten_call_column.get_flatten_table_schema(
flatten_schema_file):
raise ValueError('Failed to extract schema of flatten table')

# Create all sample optimized tables including those that will be empty.
for suffix in suffixes:
output_table_id = bigquery_util.compose_table_name(
known_args.sample_lookup_optimized_output_table, suffix)
partitioning.create_bq_table(
output_table_id, flatten_schema_file,
bigquery_util.ColumnKeyConstants.CALLS_SAMPLE_ID,
partitioning.MAX_RANGE_END)
_record_newly_created_table(output_table_id)
logging.info('Sample lookup optimized table %s was created.',
output_table_id)
# Copy to flatten sample lookup tables from the variant lookup tables.
flatten_call_column.copy_to_flatten_table(
known_args.sample_lookup_optimized_output_table)
logging.info('All sample lookup optimized tables are fully loaded.')
except Exception as e:
logging.error(
'Something unexpected happened during the loading rows to '
'sample optimized table stage: %s', str(e))
raise e

except Exception as e:
logging.error('Something unexpected happened during the loading of AVRO '
'files to BigQuery: %s', str(e))
Expand All @@ -603,36 +724,6 @@ def run(argv=None):
'failed.', avro_root_path)


if known_args.sample_lookup_optimized_output_table:
flatten_call_column = partitioning.FlattenCallColumn(
known_args.output_table, not_empty_variant_suffixes, known_args.append)
try:
flatten_schema_file = tempfile.mkstemp(suffix=_BQ_SCHEMA_FILE_SUFFIX)[1]
if not flatten_call_column.get_flatten_table_schema(flatten_schema_file):
raise ValueError('Failed to extract schema of flatten table')
# Create output flatten tables if needed
if not known_args.append:
# Create all sample optimized tables including those that will be empty.
for suffix in suffixes:
output_table_id = bigquery_util.compose_table_name(
known_args.sample_lookup_optimized_output_table, suffix)
partitioning.create_bq_table(
output_table_id, flatten_schema_file,
bigquery_util.ColumnKeyConstants.CALLS_SAMPLE_ID,
partitioning.MAX_RANGE_END)
_record_newly_created_table(output_table_id)
logging.info('Sample lookup optimized table %s was created.',
output_table_id)
# Copy to flatten sample lookup tables from the variant lookup tables.
# Note: uses WRITE_TRUNCATE to overwrite the existing tables (issue #607).
flatten_call_column.copy_to_flatten_table(
known_args.sample_lookup_optimized_output_table)
logging.info('All sample lookup optimized tables are fully loaded.')
except Exception as e:
logging.error('Something unexpected happened during the loading rows to '
'sample optimized table stage: %s', str(e))
raise e

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
try:
Expand Down

0 comments on commit fd3fd1f

Please sign in to comment.