Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/decouple adapters from core #972

Merged
merged 112 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
112 commits
Select commit Hold shift + click to select a range
fd6f6f0
Add Github action for integration test
JCZuurmond Sep 29, 2023
795e40a
Update tox
JCZuurmond Sep 29, 2023
ff39c5d
Fetch spark from https link
JCZuurmond Sep 29, 2023
1505fc6
Use Spark version 3.1.2
JCZuurmond Sep 29, 2023
44fe33f
Seperate running Spark session and thrift
JCZuurmond Sep 29, 2023
2655631
Use Spark 3.1.2 and Hadoop 3.2
JCZuurmond Sep 29, 2023
915f67e
Reset tox.ini
JCZuurmond Sep 29, 2023
f0ef215
Remove base pythons in tox.ini
JCZuurmond Sep 29, 2023
e8457df
Fix reference to Docker compose file
JCZuurmond Sep 29, 2023
842466a
Remove timeout
JCZuurmond Sep 29, 2023
0738f2d
Remove artifact steps
JCZuurmond Sep 29, 2023
277bef1
Bump Spark and Hadoop versions
JCZuurmond Sep 29, 2023
8d5853d
Reset Spark and Hadoop version
JCZuurmond Sep 29, 2023
919528a
Update comment
JCZuurmond Sep 29, 2023
15e48fd
Add changie
JCZuurmond Sep 29, 2023
ab90c4c
Merge branch 'main' into add-github-workflow-for-integration-tests
Fleid Oct 12, 2023
31cb05e
add databricks and PR execution protections
colin-rogers-dbt Oct 18, 2023
31eceb5
Merge branch 'main' into migrateOffCircleCI
colin-rogers-dbt Oct 18, 2023
fd54d7f
use single quotes
colin-rogers-dbt Oct 23, 2023
8de8339
remove `_target` suffix
colin-rogers-dbt Oct 23, 2023
e85232f
add comment to test
colin-rogers-dbt Oct 23, 2023
fe3300e
specify container user as root
colin-rogers-dbt Oct 23, 2023
b37e14b
formatting
colin-rogers-dbt Oct 23, 2023
51511ec
remove python setup for pre-existing container
colin-rogers-dbt Oct 23, 2023
98607b6
download simba
colin-rogers-dbt Oct 23, 2023
e6ec414
fix curl call
colin-rogers-dbt Oct 23, 2023
05a2c08
fix curl call
colin-rogers-dbt Oct 23, 2023
a89ec58
fix curl call
colin-rogers-dbt Oct 23, 2023
2a18fad
fix curl call
colin-rogers-dbt Oct 23, 2023
1481396
fix curl call
colin-rogers-dbt Oct 23, 2023
31b427c
fix curl call
colin-rogers-dbt Oct 23, 2023
15ba1da
fix db test naming
colin-rogers-dbt Oct 23, 2023
ca33a23
confirm ODBC driver installed
colin-rogers-dbt Oct 23, 2023
6274d77
add odbc driver env var
colin-rogers-dbt Oct 23, 2023
0ba91a2
add odbc driver env var
colin-rogers-dbt Oct 23, 2023
f092026
specify platform
colin-rogers-dbt Oct 23, 2023
b968985
check odbc driver integrity
colin-rogers-dbt Oct 23, 2023
8a49567
add dbt user env var
colin-rogers-dbt Oct 23, 2023
7723e8d
add dbt user env var
colin-rogers-dbt Oct 23, 2023
ea5ebfa
fix host_name env var
colin-rogers-dbt Oct 23, 2023
610e5e9
try removing architecture arg
colin-rogers-dbt Oct 24, 2023
b4411ab
swap back to pull_request_target
colin-rogers-dbt Oct 24, 2023
cae6c8a
try running on host instead of container
colin-rogers-dbt Oct 24, 2023
0c68972
Update .github/workflows/integration.yml
colin-rogers-dbt Oct 24, 2023
b2f63bd
try running odbcinst -j
colin-rogers-dbt Oct 24, 2023
80eb7e4
remove bash
colin-rogers-dbt Oct 24, 2023
4bbfa71
add sudo
colin-rogers-dbt Oct 24, 2023
b1d2020
add sudo
colin-rogers-dbt Oct 24, 2023
38fda3d
update odbc.ini
colin-rogers-dbt Oct 24, 2023
6b599a1
install libsasl2-modules-gssapi-mit
colin-rogers-dbt Oct 24, 2023
0976c4f
install libsasl2-modules-gssapi-mit
colin-rogers-dbt Oct 24, 2023
42f2784
set -e on odbc install
colin-rogers-dbt Oct 24, 2023
4f11291
set -e on odbc install
colin-rogers-dbt Oct 24, 2023
1384084
set -e on odbc install
colin-rogers-dbt Oct 24, 2023
543e321
sudo echo odbc.inst
colin-rogers-dbt Oct 24, 2023
307a9af
Merge branch 'main' into migrateOffCircleCI
mikealfare Oct 27, 2023
f380d46
remove postgres components
mikealfare Nov 2, 2023
c334f32
remove release related items
mikealfare Nov 2, 2023
19dcff3
remove irrelevant output
mikealfare Nov 2, 2023
01b0c0c
move long bash script into its own file
mikealfare Nov 2, 2023
d3d2844
update integration.yml to align with other adapters
mikealfare Nov 2, 2023
94af018
Merge branch 'main' into migrateOffCircleCI
mikealfare Nov 2, 2023
72daf90
revert name change
mikealfare Nov 2, 2023
4f63a3c
Merge remote-tracking branch 'origin/migrateOffCircleCI' into migrate…
mikealfare Nov 2, 2023
b43c9d1
revert name change
mikealfare Nov 2, 2023
91715d2
combine databricks and spark tests
mikealfare Nov 2, 2023
943a8dc
combine databricks and spark tests
mikealfare Nov 2, 2023
3d0dece
Add dagger
colin-rogers-dbt Nov 30, 2023
080b816
remove platform
colin-rogers-dbt Nov 30, 2023
c8477ce
add dagger setup
colin-rogers-dbt Jan 8, 2024
c0a37ae
add dagger setup
colin-rogers-dbt Jan 8, 2024
9b9dc79
Merge branch 'main' into migrateOffCircleCI
colin-rogers-dbt Jan 8, 2024
8c6a745
set env vars
colin-rogers-dbt Jan 8, 2024
6a6b4ce
Merge remote-tracking branch 'origin/migrateOffCircleCI' into migrate…
colin-rogers-dbt Jan 8, 2024
1ae321a
install requirements
colin-rogers-dbt Jan 8, 2024
6361429
install requirements
colin-rogers-dbt Jan 8, 2024
6bca5dc
add DEFAULT_ENV_VARS and test_path arg
colin-rogers-dbt Jan 8, 2024
f4293e0
remove circle ci
colin-rogers-dbt Jan 8, 2024
d398065
formatting
colin-rogers-dbt Jan 9, 2024
6108d44
update changie
colin-rogers-dbt Jan 9, 2024
d472f3b
Update .changes/unreleased/Under the Hood-20230929-161218.yaml
colin-rogers-dbt Jan 9, 2024
ce92bcf
formatting fixes and simplify env_var handling
colin-rogers-dbt Jan 9, 2024
0c4ed9e
Merge remote-tracking branch 'origin/migrateOffCircleCI' into migrate…
colin-rogers-dbt Jan 9, 2024
56b14bc
remove tox, update CONTRIBUTING.md and cleanup GHA workflows
colin-rogers-dbt Jan 9, 2024
9849c1c
remove tox, update CONTRIBUTING.md and cleanup GHA workflows
colin-rogers-dbt Jan 9, 2024
f9a4c58
install test reqs in main.yml
colin-rogers-dbt Jan 9, 2024
bbe17a8
install test reqs in main.yml
colin-rogers-dbt Jan 9, 2024
3f44e96
formatting
colin-rogers-dbt Jan 9, 2024
afd3866
remove tox from dev-requirements.txt and Makefile
colin-rogers-dbt Jan 10, 2024
259ebc7
clarify spark crt instantiation
colin-rogers-dbt Jan 10, 2024
a8a7010
add comments on python-version
colin-rogers-dbt Jan 10, 2024
fcf074b
initial migration changes
colin-rogers-dbt Jan 10, 2024
5da682a
Merge branch 'main' into feature/decouple-adapters-from-core
colin-rogers-dbt Jan 10, 2024
1b1fcec
unpin
colin-rogers-dbt Jan 10, 2024
0a2b73d
implement core / adapters decoupling
colin-rogers-dbt Jan 11, 2024
bd86ee1
fix list_relations
colin-rogers-dbt Jan 11, 2024
cb5e05c
fix typing and exception imports
colin-rogers-dbt Jan 11, 2024
fd7a22f
fix typing and exception imports
colin-rogers-dbt Jan 11, 2024
77df8b7
add changie
colin-rogers-dbt Jan 11, 2024
f216bb6
Merge branch 'main' into feature/decouple-adapters-from-core
colin-rogers-dbt Jan 11, 2024
dfd5885
replace dbt.common with dbt_common
colin-rogers-dbt Jan 12, 2024
3fc6d07
update setup.py
colin-rogers-dbt Jan 12, 2024
17607c1
add dbt-adapters
colin-rogers-dbt Jan 16, 2024
79d74aa
update setup.py
colin-rogers-dbt Jan 22, 2024
011c9b5
fix credentials import
colin-rogers-dbt Jan 22, 2024
a40b07c
fix dev-requirements.txt
colin-rogers-dbt Jan 22, 2024
8aac398
dagger improvements to caching and installing package under test
colin-rogers-dbt Jan 24, 2024
6edcdcf
update requirements
colin-rogers-dbt Jan 24, 2024
eeba17f
add cluster start fixture
colin-rogers-dbt Jan 24, 2024
f3a4c2d
update conftest.py
colin-rogers-dbt Jan 25, 2024
32c05bb
re-order dagger setup to reduce cache invalidation
colin-rogers-dbt Jan 25, 2024
e8e4543
renove dbt-core version dependency version check
colin-rogers-dbt Jan 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240111-114806.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Update import paths and list_relations to support decoupling adapters/core
time: 2024-01-11T11:48:06.120111-08:00
custom:
Author: colin-rogers-dbt
Issue: "972"
2 changes: 1 addition & 1 deletion dagger/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
dagger-io~=0.8.0
dagger-io~=0.9.7
python-dotenv
47 changes: 35 additions & 12 deletions dagger/run_dbt_spark_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import argparse
import sys
from typing import Dict

import anyio as anyio
import dagger as dagger
Expand All @@ -19,7 +20,7 @@
TESTING_ENV_VARS.update({"ODBC_DRIVER": "/opt/simba/spark/lib/64/libsparkodbc_sb64.so"})


def env_variables(envs: dict[str, str]):
def env_variables(envs: Dict[str, str]):
def env_variables_inner(ctr: dagger.Container):
for key, value in envs.items():
ctr = ctr.with_env_variable(key, value)
Expand All @@ -28,18 +29,19 @@ def env_variables_inner(ctr: dagger.Container):
return env_variables_inner


async def get_postgres_container(client: dagger.Client) -> (dagger.Container, str):
ctr = await (
def get_postgres_container(client: dagger.Client) -> (dagger.Container, str):
ctr = (
client.container()
.from_("postgres:13")
.with_env_variable("POSTGRES_PASSWORD", "postgres")
.with_exposed_port(PG_PORT)
.as_service()
)

return ctr, "postgres_db"


async def get_spark_container(client: dagger.Client) -> (dagger.Container, str):
def get_spark_container(client: dagger.Client) -> (dagger.Service, str):
spark_dir = client.host().directory("./dagger/spark-container")
spark_ctr_base = (
client.container()
Expand All @@ -63,7 +65,7 @@ async def get_spark_container(client: dagger.Client) -> (dagger.Container, str):
)

# postgres is the metastore here
pg_ctr, pg_host = await get_postgres_container(client)
pg_ctr, pg_host = get_postgres_container(client)

spark_ctr = (
spark_ctr_base.with_service_binding(alias=pg_host, service=pg_ctr)
Expand All @@ -77,6 +79,7 @@ async def get_spark_container(client: dagger.Client) -> (dagger.Container, str):
]
)
.with_exposed_port(10000)
.as_service()
)

return spark_ctr, "spark_db"
Expand All @@ -85,29 +88,49 @@ async def get_spark_container(client: dagger.Client) -> (dagger.Container, str):
async def test_spark(test_args):
async with dagger.Connection(dagger.Config(log_output=sys.stderr)) as client:
test_profile = test_args.profile
req_files = client.host().directory("./", include=["*.txt", "*.env", "*.ini"])

# create cache volumes, these are persisted between runs saving time when developing locally
os_reqs_cache = client.cache_volume("os_reqs")
pip_cache = client.cache_volume("pip")

# setup directories as we don't want to copy the whole repo into the container
req_files = client.host().directory(
"./", include=["*.txt", "*.env", "*.ini", "*.md", "setup.py"]
)
dbt_spark_dir = client.host().directory("./dbt")
test_dir = client.host().directory("./tests")
scripts = client.host().directory("./dagger/scripts")

platform = dagger.Platform("linux/amd64")
tst_container = (
client.container(platform=platform)
.from_("python:3.8-slim")
.with_directory("/.", req_files)
.with_directory("/dbt", dbt_spark_dir)
.with_directory("/tests", test_dir)
.with_mounted_cache("/var/cache/apt/archives", os_reqs_cache)
.with_mounted_cache("/root/.cache/pip", pip_cache)
# install OS deps first so any local changes don't invalidate the cache
.with_directory("/scripts", scripts)
.with_exec("./scripts/install_os_reqs.sh")
.with_exec(["./scripts/install_os_reqs.sh"])
# install dbt-spark + python deps
.with_directory("/src", req_files)
.with_directory("src/dbt", dbt_spark_dir)
.with_directory("src/tests", test_dir)
.with_workdir("/src")
.with_exec(["pip", "install", "-U", "pip"])
.with_exec(["pip", "install", "-r", "requirements.txt"])
.with_exec(["pip", "install", "-r", "dev-requirements.txt"])
.with_exec(["pip", "install", "-e", "."])
)

if test_profile == "apache_spark":
spark_ctr, spark_host = await get_spark_container(client)
spark_ctr, spark_host = get_spark_container(client)
tst_container = tst_container.with_service_binding(alias=spark_host, service=spark_ctr)

elif test_profile in ["databricks_cluster", "databricks_sql_endpoint"]:
tst_container = tst_container.with_exec("./scripts/configure_odbc.sh")
tst_container = (
tst_container.with_workdir("/")
.with_exec(["./scripts/configure_odbc.sh"])
.with_workdir("/src")
)

elif test_profile == "spark_session":
tst_container = tst_container.with_exec(["pip", "install", "pyspark"])
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/spark/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any, Dict, Optional, TypeVar, Union

from dbt.adapters.base.column import Column
from dbt.dataclass_schema import dbtClassMixin
from dbt_common.dataclass_schema import dbtClassMixin

Self = TypeVar("Self", bound="SparkColumn")

Expand Down
57 changes: 29 additions & 28 deletions dbt/adapters/spark/connections.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from contextlib import contextmanager

import dbt.exceptions
from dbt.adapters.base import Credentials
from dbt.adapters.contracts.connection import (
AdapterResponse,
ConnectionState,
Connection,
Credentials,
)
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.exceptions import FailedToConnectError
from dbt.adapters.sql import SQLConnectionManager
from dbt.contracts.connection import ConnectionState, AdapterResponse
from dbt.events import AdapterLogger
from dbt.utils import DECIMALS
from dbt_common.exceptions import DbtConfigError, DbtRuntimeError, DbtDatabaseError

from dbt_common.utils.encoding import DECIMALS
from dbt.adapters.spark import __version__

try:
Expand All @@ -22,8 +28,7 @@
pyodbc = None
from datetime import datetime
import sqlparams
from dbt.contracts.connection import Connection
from dbt.dataclass_schema import StrEnum
from dbt_common.dataclass_schema import StrEnum
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Union, Tuple, List, Generator, Iterable, Sequence

Expand Down Expand Up @@ -92,15 +97,15 @@ def cluster_id(self) -> Optional[str]:

def __post_init__(self) -> None:
if self.method is None:
raise dbt.exceptions.DbtRuntimeError("Must specify `method` in profile")
raise DbtRuntimeError("Must specify `method` in profile")
if self.host is None:
raise dbt.exceptions.DbtRuntimeError("Must specify `host` in profile")
raise DbtRuntimeError("Must specify `host` in profile")
if self.schema is None:
raise dbt.exceptions.DbtRuntimeError("Must specify `schema` in profile")
raise DbtRuntimeError("Must specify `schema` in profile")

# spark classifies database and schema as the same thing
if self.database is not None and self.database != self.schema:
raise dbt.exceptions.DbtRuntimeError(
raise DbtRuntimeError(
f" schema: {self.schema} \n"
f" database: {self.database} \n"
f"On Spark, database must be omitted or have the same value as"
Expand All @@ -112,7 +117,7 @@ def __post_init__(self) -> None:
try:
import pyodbc # noqa: F401
except ImportError as e:
raise dbt.exceptions.DbtRuntimeError(
raise DbtRuntimeError(
f"{self.method} connection method requires "
"additional dependencies. \n"
"Install the additional required dependencies with "
Expand All @@ -121,7 +126,7 @@ def __post_init__(self) -> None:
) from e

if self.method == SparkConnectionMethod.ODBC and self.cluster and self.endpoint:
raise dbt.exceptions.DbtRuntimeError(
raise DbtRuntimeError(
"`cluster` and `endpoint` cannot both be set when"
f" using {self.method} method to connect to Spark"
)
Expand All @@ -130,7 +135,7 @@ def __post_init__(self) -> None:
self.method == SparkConnectionMethod.HTTP
or self.method == SparkConnectionMethod.THRIFT
) and not (ThriftState and THttpClient and hive):
raise dbt.exceptions.DbtRuntimeError(
raise DbtRuntimeError(
f"{self.method} connection method requires "
"additional dependencies. \n"
"Install the additional required dependencies with "
Expand All @@ -141,7 +146,7 @@ def __post_init__(self) -> None:
try:
import pyspark # noqa: F401
except ImportError as e:
raise dbt.exceptions.DbtRuntimeError(
raise DbtRuntimeError(
f"{self.method} connection method requires "
"additional dependencies. \n"
"Install the additional required dependencies with "
Expand Down Expand Up @@ -291,13 +296,11 @@ def execute(self, sql: str, bindings: Optional[List[Any]] = None) -> None:
if poll_state.errorMessage:
logger.debug("Poll response: {}".format(poll_state))
logger.debug("Poll status: {}".format(state))
raise dbt.exceptions.DbtDatabaseError(poll_state.errorMessage)
raise DbtDatabaseError(poll_state.errorMessage)

elif state not in STATE_SUCCESS:
status_type = ThriftState._VALUES_TO_NAMES.get(state, "Unknown<{!r}>".format(state))
raise dbt.exceptions.DbtDatabaseError(
"Query failed with status: {}".format(status_type)
)
raise DbtDatabaseError("Query failed with status: {}".format(status_type))

logger.debug("Poll status: {}, query complete".format(state))

Expand Down Expand Up @@ -358,9 +361,9 @@ def exception_handler(self, sql: str) -> Generator[None, None, None]:
thrift_resp = exc.args[0]
if hasattr(thrift_resp, "status"):
msg = thrift_resp.status.errorMessage
raise dbt.exceptions.DbtRuntimeError(msg)
raise DbtRuntimeError(msg)
else:
raise dbt.exceptions.DbtRuntimeError(str(exc))
raise DbtRuntimeError(str(exc))

def cancel(self, connection: Connection) -> None:
connection.handle.cancel()
Expand Down Expand Up @@ -390,7 +393,7 @@ def validate_creds(cls, creds: Any, required: Iterable[str]) -> None:

for key in required:
if not hasattr(creds, key):
raise dbt.exceptions.DbtProfileError(
raise DbtConfigError(
"The config '{}' is required when using the {} method"
" to connect to Spark".format(key, method)
)
Expand Down Expand Up @@ -481,7 +484,7 @@ def open(cls, connection: Connection) -> Connection:
endpoint=creds.endpoint
)
else:
raise dbt.exceptions.DbtProfileError(
raise DbtConfigError(
"Either `cluster` or `endpoint` must set when"
" using the odbc method to connect to Spark"
)
Expand Down Expand Up @@ -525,9 +528,7 @@ def open(cls, connection: Connection) -> Connection:
Connection(server_side_parameters=creds.server_side_parameters)
)
else:
raise dbt.exceptions.DbtProfileError(
f"invalid credential method: {creds.method}"
)
raise DbtConfigError(f"invalid credential method: {creds.method}")
break
except Exception as e:
exc = e
Expand All @@ -537,7 +538,7 @@ def open(cls, connection: Connection) -> Connection:
msg = "Failed to connect"
if creds.token is not None:
msg += ", is your token valid?"
raise dbt.exceptions.FailedToConnectError(msg) from e
raise FailedToConnectError(msg) from e
retryable_message = _is_retryable_error(e)
if retryable_message and creds.connect_retries > 0:
msg = (
Expand All @@ -558,7 +559,7 @@ def open(cls, connection: Connection) -> Connection:
logger.warning(msg)
time.sleep(creds.connect_timeout)
else:
raise dbt.exceptions.FailedToConnectError("failed to connect") from e
raise FailedToConnectError("failed to connect") from e
else:
raise exc # type: ignore

Expand Down
Loading
Loading