Skip to content

Commit

Permalink
🔧 change logic to see if ds or table exists (#91)
Browse files Browse the repository at this point in the history
* 🔧 change logic to see if ds or table exists

* 🔥 refactoring a lil bit

* 🔥 undoing some linting

* 🔧 re-thinking try except blocks

* 🔧 more work on exc block

---------

Co-authored-by: Alejandro Martinez <[email protected]>
  • Loading branch information
AlejandroUPC and Alejandro Martinez authored Jul 1, 2024
1 parent f8f74be commit f9384ca
Showing 1 changed file with 35 additions and 34 deletions.
69 changes: 35 additions & 34 deletions target_bigquery/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from textwrap import dedent, indent
from typing import IO, TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type, Union

from google.api_core.exceptions import Conflict, Forbidden
from google.api_core.exceptions import Conflict, Forbidden, NotFound
from google.cloud import bigquery, bigquery_storage_v1, storage
from google.cloud.bigquery import SchemaField
from google.cloud.bigquery.table import TimePartitioning, TimePartitioningType
Expand Down Expand Up @@ -166,33 +166,30 @@ def create_table(
This is a convenience method that wraps the creation of a dataset and
table in a single method call. It is idempotent and will not create
a new table if one already exists."""
if not hasattr(self, "_dataset"):
try:
self._dataset = client.create_dataset(
self.as_dataset(**kwargs["dataset"]), exists_ok=False
try:
dataset = client.get_dataset(self.as_dataset(**kwargs["dataset"]))
except NotFound:
dataset = client.create_dataset(self.as_dataset(**kwargs["dataset"]))
except (Conflict, Forbidden):
if dataset.location != kwargs["dataset"]["location"]:
raise Exception(
f"Location of existing dataset {dataset.dataset_id} ({dataset.location}) "
f"does not match specified location: {kwargs['dataset']['location']}"
)
except (Conflict, Forbidden):
dataset = client.get_dataset(self.as_dataset(**kwargs["dataset"]))
if dataset.location != kwargs["dataset"]["location"]:
raise Exception(
f"Location of existing dataset {dataset.dataset_id} ({dataset.location}) "
f"does not match specified location: {kwargs['dataset']['location']}"
)
else:
self._dataset = dataset
if not hasattr(self, "_table"):
try:
self._table = client.create_table(
self.as_table(
apply_transforms and self.ingestion_strategy != IngestionStrategy.FIXED,
**kwargs["table"],
)
finally:
self._dataset = dataset
try:
self._table = client.get_table(self.as_ref())
except NotFound:
self._table = client.create_table(
self.as_table(
apply_transforms and self.ingestion_strategy != IngestionStrategy.FIXED,
**kwargs["table"],
)
except Conflict:
self._table = client.get_table(self.as_ref())
else:
# Wait for eventual consistency
time.sleep(5)
)
else:
# Wait for eventual consistency
time.sleep(5)
return self._dataset, self._table

def default_table_options(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -515,7 +512,7 @@ def clean_up(self) -> None:
tmp = f"{self.merge_target.name}__tmp"
dedupe_query = (
f"SELECT * FROM {self.table.get_escaped_name()} "
f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {', '.join(f'`{p}`' for p in self.key_properties)} "
f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {', '.join(f'`{p}`' for p in self.key_properties)} "
f"ORDER BY COALESCE({', '.join(date_columns)}) DESC) = 1"
)
ctas_tmp = f"CREATE OR REPLACE TEMP TABLE `{tmp}` AS {dedupe_query}"
Expand Down Expand Up @@ -807,15 +804,15 @@ def _translate_record_to_bigquery_schema(
) -> SchemaField:
"""Translate a JSON schema record into a BigQuery schema."""
properties = list(schema_property.get("properties", {}).items())

# If no properties defined, store as JSON instead of RECORD
if len(properties) == 0:
return SchemaField(name, "JSON", mode)

fields = [
self._jsonschema_property_to_bigquery_column(col, t)
for col, t in properties
]
self._jsonschema_property_to_bigquery_column(col, t)
for col, t in properties
]
return SchemaField(name, "RECORD", mode, fields=fields)

def _bigquery_field_to_projection(
Expand Down Expand Up @@ -892,14 +889,18 @@ def _wrap_json_array(
)
v = _v.as_sql().rstrip(", \n")
return (" " * depth * 2) + indent(
dedent(f"""
dedent(
f"""
ARRAY(
SELECT {v}
FROM UNNEST(
JSON_QUERY_ARRAY({base}, '{path}.{field.name}')
) AS {field.name}__rows
WHERE {_v.projection} IS NOT NULL
""" + (" " * depth * 2) + f") AS {field.name},\n").lstrip(),
"""
+ (" " * depth * 2)
+ f") AS {field.name},\n"
).lstrip(),
" " * depth * 2,
)

Expand Down

0 comments on commit f9384ca

Please sign in to comment.