Skip to content

Commit 56a563b

Browse files
feldjaygabe-lyons
andauthored
feat(ingest/bigquery): Add query job retries for transient errors (datahub-project#11162)
Co-authored-by: Gabe Lyons <[email protected]> Co-authored-by: Gabe Lyons <[email protected]>
1 parent 414dc54 commit 56a563b

File tree

1 file changed

+17
-1
lines changed

1 file changed

+17
-1
lines changed

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py

+17-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from google.api_core import retry
88
from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3
9+
from google.cloud.bigquery import retry as bq_retry
910
from google.cloud.bigquery.table import (
1011
RowIterator,
1112
TableListItem,
@@ -155,8 +156,23 @@ def __init__(
155156
self.datacatalog_client = datacatalog_client
156157

157158
def get_query_result(self, query: str) -> RowIterator:
159+
def _should_retry(exc: BaseException) -> bool:
160+
logger.debug(f"Exception occured for job query. Reason: {exc}")
161+
# Jobs sometimes fail with transient errors.
162+
# This is not currently handled by the python-bigquery client.
163+
# https://github.com/googleapis/python-bigquery/issues/23
164+
return "Retrying the job may solve the problem" in str(exc)
165+
158166
logger.debug(f"Query : {query}")
159-
resp = self.bq_client.query(query)
167+
resp = self.bq_client.query(
168+
query,
169+
job_retry=retry.Retry(
170+
predicate=lambda exc: (
171+
bq_retry.DEFAULT_JOB_RETRY._predicate(exc) or _should_retry(exc)
172+
),
173+
deadline=bq_retry.DEFAULT_JOB_RETRY._deadline,
174+
),
175+
)
160176
return resp.result()
161177

162178
def get_projects(self, max_results_per_page: int = 100) -> List[BigqueryProject]:

0 commit comments

Comments
 (0)