Skip to content

Commit

Permalink
Break out credentials as a separate module (#1391)
Browse files Browse the repository at this point in the history
* break out credentials into its own module
* fix imports
* update unit test mocks for new location of get_bigquery_defaults
  • Loading branch information
mikealfare authored Nov 5, 2024
1 parent 23e8020 commit 35c32f1
Show file tree
Hide file tree
Showing 13 changed files with 275 additions and 259 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Under the Hood-20241104-173815.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Under the Hood
body: Separate credentials functionality into its own module for reuse in retry and
python submissions
time: 2024-11-04T17:38:15.940962-05:00
custom:
Author: mikealfare
Issue: "1391"
10 changes: 5 additions & 5 deletions dbt/adapters/bigquery/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from dbt.adapters.bigquery.connections import BigQueryConnectionManager # noqa
from dbt.adapters.bigquery.connections import BigQueryCredentials
from dbt.adapters.bigquery.relation import BigQueryRelation # noqa
from dbt.adapters.bigquery.column import BigQueryColumn # noqa
from dbt.adapters.bigquery.impl import BigQueryAdapter, GrantTarget, PartitionConfig # noqa
from dbt.adapters.bigquery.column import BigQueryColumn
from dbt.adapters.bigquery.connections import BigQueryConnectionManager
from dbt.adapters.bigquery.credentials import BigQueryCredentials
from dbt.adapters.bigquery.impl import BigQueryAdapter, GrantTarget, PartitionConfig
from dbt.adapters.bigquery.relation import BigQueryRelation

from dbt.adapters.base import AdapterPlugin
from dbt.include import bigquery
Expand Down
5 changes: 3 additions & 2 deletions dbt/adapters/bigquery/column.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from dataclasses import dataclass
from typing import Optional, List, TypeVar, Iterable, Type, Any, Dict, Union
from typing import Any, Dict, Iterable, List, Optional, Type, TypeVar, Union

from google.cloud.bigquery import SchemaField

from dbt.adapters.base.column import Column

from google.cloud.bigquery import SchemaField

_PARENT_DATA_TYPE_KEY = "__parent_data_type"

Expand Down
197 changes: 22 additions & 175 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,54 @@
from collections import defaultdict
from concurrent.futures import TimeoutError
from contextlib import contextmanager
from dataclasses import dataclass
import json
from multiprocessing.context import SpawnContext
import re
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Dict, Hashable, List, Optional, Tuple, TYPE_CHECKING
import uuid
from mashumaro.helper import pass_through

from functools import lru_cache
from requests.exceptions import ConnectionError

from multiprocessing.context import SpawnContext
from typing import Optional, Any, Dict, Tuple, Hashable, List, TYPE_CHECKING

from google.api_core import client_info, client_options, retry
import google.auth
from google.auth import impersonated_credentials
import google.auth.exceptions
import google.cloud.bigquery
import google.cloud.exceptions
from google.api_core import retry, client_info, client_options
from google.auth import impersonated_credentials
from google.oauth2 import (
credentials as GoogleCredentials,
service_account as GoogleServiceAccountCredentials,
)
from requests.exceptions import ConnectionError

from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
from dbt_common.exceptions import (
DbtRuntimeError,
DbtConfigError,
DbtDatabaseError,
)
from dbt_common.exceptions import DbtDatabaseError, DbtRuntimeError
from dbt_common.invocation import get_invocation_id
from dbt.adapters.bigquery import gcloud
from dbt.adapters.base import BaseConnectionManager
from dbt.adapters.contracts.connection import (
ConnectionState,
AdapterResponse,
Credentials,
AdapterRequiredConfig,
AdapterResponse,
ConnectionState,
)
from dbt.adapters.exceptions.connection import FailedToConnectError
from dbt.adapters.base import BaseConnectionManager
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.events.types import SQLQuery
from dbt.adapters.bigquery import __version__ as dbt_version
from dbt.adapters.bigquery.utility import is_base64, base64_to_string
from dbt.adapters.exceptions.connection import FailedToConnectError

from dbt_common.dataclass_schema import ExtensibleDbtClassMixin, StrEnum
import dbt.adapters.bigquery.__version__ as dbt_version
from dbt.adapters.bigquery.credentials import (
BigQueryConnectionMethod,
Priority,
get_bigquery_defaults,
setup_default_credentials,
)
from dbt.adapters.bigquery.utility import is_base64, base64_to_string

if TYPE_CHECKING:
# Indirectly imported via agate_helper, which is lazy loaded further downfile.
# Used by mypy for earlier type hints.
import agate


logger = AdapterLogger("BigQuery")

BQ_QUERY_JOB_SPLIT = "-----Query Job SQL Follows-----"
Expand All @@ -73,33 +69,6 @@
)


@lru_cache()
def get_bigquery_defaults(scopes=None) -> Tuple[Any, Optional[str]]:
"""
Returns (credentials, project_id)
project_id is returned available from the environment; otherwise None
"""
# Cached, because the underlying implementation shells out, taking ~1s
try:
credentials, _ = google.auth.default(scopes=scopes)
return credentials, _
except google.auth.exceptions.DefaultCredentialsError as e:
raise DbtConfigError(f"Failed to authenticate with supplied credentials\nerror:\n{e}")


class Priority(StrEnum):
Interactive = "interactive"
Batch = "batch"


class BigQueryConnectionMethod(StrEnum):
OAUTH = "oauth"
SERVICE_ACCOUNT = "service-account"
SERVICE_ACCOUNT_JSON = "service-account-json"
OAUTH_SECRETS = "oauth-secrets"


@dataclass
class BigQueryAdapterResponse(AdapterResponse):
bytes_processed: Optional[int] = None
Expand All @@ -110,128 +79,6 @@ class BigQueryAdapterResponse(AdapterResponse):
slot_ms: Optional[int] = None


@dataclass
class DataprocBatchConfig(ExtensibleDbtClassMixin):
def __init__(self, batch_config):
self.batch_config = batch_config


@dataclass
class BigQueryCredentials(Credentials):
method: BigQueryConnectionMethod = None # type: ignore

# BigQuery allows an empty database / project, where it defers to the
# environment for the project
database: Optional[str] = None
schema: Optional[str] = None
execution_project: Optional[str] = None
quota_project: Optional[str] = None
location: Optional[str] = None
priority: Optional[Priority] = None
maximum_bytes_billed: Optional[int] = None
impersonate_service_account: Optional[str] = None

job_retry_deadline_seconds: Optional[int] = None
job_retries: Optional[int] = 1
job_creation_timeout_seconds: Optional[int] = None
job_execution_timeout_seconds: Optional[int] = None

# Keyfile json creds (unicode or base 64 encoded)
keyfile: Optional[str] = None
keyfile_json: Optional[Dict[str, Any]] = None

# oauth-secrets
token: Optional[str] = None
refresh_token: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None
token_uri: Optional[str] = None

dataproc_region: Optional[str] = None
dataproc_cluster_name: Optional[str] = None
gcs_bucket: Optional[str] = None

dataproc_batch: Optional[DataprocBatchConfig] = field(
metadata={
"serialization_strategy": pass_through,
},
default=None,
)

scopes: Optional[Tuple[str, ...]] = (
"https://www.googleapis.com/auth/bigquery",
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/drive",
)

_ALIASES = {
# 'legacy_name': 'current_name'
"project": "database",
"dataset": "schema",
"target_project": "target_database",
"target_dataset": "target_schema",
"retries": "job_retries",
"timeout_seconds": "job_execution_timeout_seconds",
}

def __post_init__(self):
if self.keyfile_json and "private_key" in self.keyfile_json:
self.keyfile_json["private_key"] = self.keyfile_json["private_key"].replace(
"\\n", "\n"
)
if not self.method:
raise DbtRuntimeError("Must specify authentication method")

if not self.schema:
raise DbtRuntimeError("Must specify schema")

@property
def type(self):
return "bigquery"

@property
def unique_field(self):
return self.database

def _connection_keys(self):
return (
"method",
"database",
"execution_project",
"schema",
"location",
"priority",
"maximum_bytes_billed",
"impersonate_service_account",
"job_retry_deadline_seconds",
"job_retries",
"job_creation_timeout_seconds",
"job_execution_timeout_seconds",
"timeout_seconds",
"client_id",
"token_uri",
"dataproc_region",
"dataproc_cluster_name",
"gcs_bucket",
"dataproc_batch",
)

@classmethod
def __pre_deserialize__(cls, d: Dict[Any, Any]) -> Dict[Any, Any]:
# We need to inject the correct value of the database (aka project) at
# this stage, ref
# https://github.com/dbt-labs/dbt/pull/2908#discussion_r532927436.

# `database` is an alias of `project` in BigQuery
if "database" not in d:
_, database = get_bigquery_defaults()
d["database"] = database
# `execution_project` default to dataset/project
if "execution_project" not in d:
d["execution_project"] = d["database"]
return d


class BigQueryConnectionManager(BaseConnectionManager):
TYPE = "bigquery"

Expand Down Expand Up @@ -433,7 +280,7 @@ def open(cls, connection):

except google.auth.exceptions.DefaultCredentialsError:
logger.info("Please log into GCP to continue")
gcloud.setup_default_credentials()
setup_default_credentials()

handle = cls.get_bigquery_client(connection.credentials)

Expand Down
Loading

0 comments on commit 35c32f1

Please sign in to comment.