Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/decouple adapters from core #972

Merged
merged 112 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 95 commits
Commits
Show all changes
112 commits
Select commit Hold shift + click to select a range
fd6f6f0
Add Github action for integration test
JCZuurmond Sep 29, 2023
795e40a
Update tox
JCZuurmond Sep 29, 2023
ff39c5d
Fetch spark from https link
JCZuurmond Sep 29, 2023
1505fc6
Use Spark version 3.1.2
JCZuurmond Sep 29, 2023
44fe33f
Seperate running Spark session and thrift
JCZuurmond Sep 29, 2023
2655631
Use Spark 3.1.2 and Hadoop 3.2
JCZuurmond Sep 29, 2023
915f67e
Reset tox.ini
JCZuurmond Sep 29, 2023
f0ef215
Remove base pythons in tox.ini
JCZuurmond Sep 29, 2023
e8457df
Fix reference to Docker compose file
JCZuurmond Sep 29, 2023
842466a
Remove timeout
JCZuurmond Sep 29, 2023
0738f2d
Remove artifact steps
JCZuurmond Sep 29, 2023
277bef1
Bump Spark and Hadoop versions
JCZuurmond Sep 29, 2023
8d5853d
Reset Spark and Hadoop version
JCZuurmond Sep 29, 2023
919528a
Update comment
JCZuurmond Sep 29, 2023
15e48fd
Add changie
JCZuurmond Sep 29, 2023
ab90c4c
Merge branch 'main' into add-github-workflow-for-integration-tests
Fleid Oct 12, 2023
31cb05e
add databricks and PR execution protections
colin-rogers-dbt Oct 18, 2023
31eceb5
Merge branch 'main' into migrateOffCircleCI
colin-rogers-dbt Oct 18, 2023
fd54d7f
use single quotes
colin-rogers-dbt Oct 23, 2023
8de8339
remove `_target` suffix
colin-rogers-dbt Oct 23, 2023
e85232f
add comment to test
colin-rogers-dbt Oct 23, 2023
fe3300e
specify container user as root
colin-rogers-dbt Oct 23, 2023
b37e14b
formatting
colin-rogers-dbt Oct 23, 2023
51511ec
remove python setup for pre-existing container
colin-rogers-dbt Oct 23, 2023
98607b6
download simba
colin-rogers-dbt Oct 23, 2023
e6ec414
fix curl call
colin-rogers-dbt Oct 23, 2023
05a2c08
fix curl call
colin-rogers-dbt Oct 23, 2023
a89ec58
fix curl call
colin-rogers-dbt Oct 23, 2023
2a18fad
fix curl call
colin-rogers-dbt Oct 23, 2023
1481396
fix curl call
colin-rogers-dbt Oct 23, 2023
31b427c
fix curl call
colin-rogers-dbt Oct 23, 2023
15ba1da
fix db test naming
colin-rogers-dbt Oct 23, 2023
ca33a23
confirm ODBC driver installed
colin-rogers-dbt Oct 23, 2023
6274d77
add odbc driver env var
colin-rogers-dbt Oct 23, 2023
0ba91a2
add odbc driver env var
colin-rogers-dbt Oct 23, 2023
f092026
specify platform
colin-rogers-dbt Oct 23, 2023
b968985
check odbc driver integrity
colin-rogers-dbt Oct 23, 2023
8a49567
add dbt user env var
colin-rogers-dbt Oct 23, 2023
7723e8d
add dbt user env var
colin-rogers-dbt Oct 23, 2023
ea5ebfa
fix host_name env var
colin-rogers-dbt Oct 23, 2023
610e5e9
try removing architecture arg
colin-rogers-dbt Oct 24, 2023
b4411ab
swap back to pull_request_target
colin-rogers-dbt Oct 24, 2023
cae6c8a
try running on host instead of container
colin-rogers-dbt Oct 24, 2023
0c68972
Update .github/workflows/integration.yml
colin-rogers-dbt Oct 24, 2023
b2f63bd
try running odbcinst -j
colin-rogers-dbt Oct 24, 2023
80eb7e4
remove bash
colin-rogers-dbt Oct 24, 2023
4bbfa71
add sudo
colin-rogers-dbt Oct 24, 2023
b1d2020
add sudo
colin-rogers-dbt Oct 24, 2023
38fda3d
update odbc.ini
colin-rogers-dbt Oct 24, 2023
6b599a1
install libsasl2-modules-gssapi-mit
colin-rogers-dbt Oct 24, 2023
0976c4f
install libsasl2-modules-gssapi-mit
colin-rogers-dbt Oct 24, 2023
42f2784
set -e on odbc install
colin-rogers-dbt Oct 24, 2023
4f11291
set -e on odbc install
colin-rogers-dbt Oct 24, 2023
1384084
set -e on odbc install
colin-rogers-dbt Oct 24, 2023
543e321
sudo echo odbc.inst
colin-rogers-dbt Oct 24, 2023
307a9af
Merge branch 'main' into migrateOffCircleCI
mikealfare Oct 27, 2023
f380d46
remove postgres components
mikealfare Nov 2, 2023
c334f32
remove release related items
mikealfare Nov 2, 2023
19dcff3
remove irrelevant output
mikealfare Nov 2, 2023
01b0c0c
move long bash script into its own file
mikealfare Nov 2, 2023
d3d2844
update integration.yml to align with other adapters
mikealfare Nov 2, 2023
94af018
Merge branch 'main' into migrateOffCircleCI
mikealfare Nov 2, 2023
72daf90
revert name change
mikealfare Nov 2, 2023
4f63a3c
Merge remote-tracking branch 'origin/migrateOffCircleCI' into migrate…
mikealfare Nov 2, 2023
b43c9d1
revert name change
mikealfare Nov 2, 2023
91715d2
combine databricks and spark tests
mikealfare Nov 2, 2023
943a8dc
combine databricks and spark tests
mikealfare Nov 2, 2023
3d0dece
Add dagger
colin-rogers-dbt Nov 30, 2023
080b816
remove platform
colin-rogers-dbt Nov 30, 2023
c8477ce
add dagger setup
colin-rogers-dbt Jan 8, 2024
c0a37ae
add dagger setup
colin-rogers-dbt Jan 8, 2024
9b9dc79
Merge branch 'main' into migrateOffCircleCI
colin-rogers-dbt Jan 8, 2024
8c6a745
set env vars
colin-rogers-dbt Jan 8, 2024
6a6b4ce
Merge remote-tracking branch 'origin/migrateOffCircleCI' into migrate…
colin-rogers-dbt Jan 8, 2024
1ae321a
install requirements
colin-rogers-dbt Jan 8, 2024
6361429
install requirements
colin-rogers-dbt Jan 8, 2024
6bca5dc
add DEFAULT_ENV_VARS and test_path arg
colin-rogers-dbt Jan 8, 2024
f4293e0
remove circle ci
colin-rogers-dbt Jan 8, 2024
d398065
formatting
colin-rogers-dbt Jan 9, 2024
6108d44
update changie
colin-rogers-dbt Jan 9, 2024
d472f3b
Update .changes/unreleased/Under the Hood-20230929-161218.yaml
colin-rogers-dbt Jan 9, 2024
ce92bcf
formatting fixes and simplify env_var handling
colin-rogers-dbt Jan 9, 2024
0c4ed9e
Merge remote-tracking branch 'origin/migrateOffCircleCI' into migrate…
colin-rogers-dbt Jan 9, 2024
56b14bc
remove tox, update CONTRIBUTING.md and cleanup GHA workflows
colin-rogers-dbt Jan 9, 2024
9849c1c
remove tox, update CONTRIBUTING.md and cleanup GHA workflows
colin-rogers-dbt Jan 9, 2024
f9a4c58
install test reqs in main.yml
colin-rogers-dbt Jan 9, 2024
bbe17a8
install test reqs in main.yml
colin-rogers-dbt Jan 9, 2024
3f44e96
formatting
colin-rogers-dbt Jan 9, 2024
afd3866
remove tox from dev-requirements.txt and Makefile
colin-rogers-dbt Jan 10, 2024
259ebc7
clarify spark crt instantiation
colin-rogers-dbt Jan 10, 2024
a8a7010
add comments on python-version
colin-rogers-dbt Jan 10, 2024
fcf074b
initial migration changes
colin-rogers-dbt Jan 10, 2024
5da682a
Merge branch 'main' into feature/decouple-adapters-from-core
colin-rogers-dbt Jan 10, 2024
1b1fcec
unpin
colin-rogers-dbt Jan 10, 2024
0a2b73d
implement core / adapters decoupling
colin-rogers-dbt Jan 11, 2024
bd86ee1
fix list_relations
colin-rogers-dbt Jan 11, 2024
cb5e05c
fix typing and exception imports
colin-rogers-dbt Jan 11, 2024
fd7a22f
fix typing and exception imports
colin-rogers-dbt Jan 11, 2024
77df8b7
add changie
colin-rogers-dbt Jan 11, 2024
f216bb6
Merge branch 'main' into feature/decouple-adapters-from-core
colin-rogers-dbt Jan 11, 2024
dfd5885
replace dbt.common with dbt_common
colin-rogers-dbt Jan 12, 2024
3fc6d07
update setup.py
colin-rogers-dbt Jan 12, 2024
17607c1
add dbt-adapters
colin-rogers-dbt Jan 16, 2024
79d74aa
update setup.py
colin-rogers-dbt Jan 22, 2024
011c9b5
fix credentials import
colin-rogers-dbt Jan 22, 2024
a40b07c
fix dev-requirements.txt
colin-rogers-dbt Jan 22, 2024
8aac398
dagger improvements to caching and installing package under test
colin-rogers-dbt Jan 24, 2024
6edcdcf
update requirements
colin-rogers-dbt Jan 24, 2024
eeba17f
add cluster start fixture
colin-rogers-dbt Jan 24, 2024
f3a4c2d
update conftest.py
colin-rogers-dbt Jan 25, 2024
32c05bb
re-order dagger setup to reduce cache invalidation
colin-rogers-dbt Jan 25, 2024
e8e4543
renove dbt-core version dependency version check
colin-rogers-dbt Jan 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dagger/run_dbt_spark_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import argparse
import sys
from typing import Dict

import anyio as anyio
import dagger as dagger
Expand All @@ -19,7 +20,7 @@
TESTING_ENV_VARS.update({"ODBC_DRIVER": "Simba"})


def env_variables(envs: dict[str, str]):
def env_variables(envs: Dict[str, str]):
def env_variables_inner(ctr: dagger.Container):
for key, value in envs.items():
ctr = ctr.with_env_variable(key, value)
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/spark/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any, Dict, Optional, TypeVar, Union

from dbt.adapters.base.column import Column
from dbt.dataclass_schema import dbtClassMixin
from dbt.common.dataclass_schema import dbtClassMixin

Self = TypeVar("Self", bound="SparkColumn")

Expand Down
17 changes: 9 additions & 8 deletions dbt/adapters/spark/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import dbt.exceptions
from dbt.adapters.base import Credentials
from dbt.adapters.contracts.connection import AdapterResponse, ConnectionState, Connection
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.sql import SQLConnectionManager
from dbt.contracts.connection import ConnectionState, AdapterResponse
from dbt.events import AdapterLogger
from dbt.utils import DECIMALS
from dbt.common.exceptions import DbtConfigError

from dbt.common.utils.encoding import DECIMALS
from dbt.adapters.spark import __version__

try:
Expand All @@ -22,8 +24,7 @@
pyodbc = None
from datetime import datetime
import sqlparams
from dbt.contracts.connection import Connection
from dbt.dataclass_schema import StrEnum
from dbt.common.dataclass_schema import StrEnum
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Union, Tuple, List, Generator, Iterable, Sequence

Expand Down Expand Up @@ -390,7 +391,7 @@ def validate_creds(cls, creds: Any, required: Iterable[str]) -> None:

for key in required:
if not hasattr(creds, key):
raise dbt.exceptions.DbtProfileError(
raise DbtConfigError(
"The config '{}' is required when using the {} method"
" to connect to Spark".format(key, method)
)
Expand Down Expand Up @@ -481,7 +482,7 @@ def open(cls, connection: Connection) -> Connection:
endpoint=creds.endpoint
)
else:
raise dbt.exceptions.DbtProfileError(
raise DbtConfigError(
"Either `cluster` or `endpoint` must set when"
" using the odbc method to connect to Spark"
)
Expand Down Expand Up @@ -525,7 +526,7 @@ def open(cls, connection: Connection) -> Connection:
Connection(server_side_parameters=creds.server_side_parameters)
)
else:
raise dbt.exceptions.DbtProfileError(
raise DbtConfigError(
f"invalid credential method: {creds.method}"
)
break
Expand Down
22 changes: 11 additions & 11 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from typing import Any, Dict, Iterable, List, Optional, Union, Type, Tuple, Callable, Set

from dbt.adapters.base.relation import InformationSchema
from dbt.contracts.graph.manifest import Manifest
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.events.logging import AdapterLogger
from dbt.common.utils import AttrDict, executor

from typing_extensions import TypeAlias

Expand All @@ -13,6 +15,7 @@
import dbt
import dbt.exceptions


from dbt.adapters.base import AdapterConfig, PythonJobHelper
from dbt.adapters.base.impl import catch_as_completed, ConstraintSupport
from dbt.adapters.sql import SQLAdapter
Expand All @@ -24,12 +27,9 @@
AllPurposeClusterPythonJobHelper,
)
from dbt.adapters.base import BaseRelation
from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.graph.nodes import ConstraintType
from dbt.contracts.relation import RelationType
from dbt.events import AdapterLogger
from dbt.utils import executor, AttrDict
from dbt.adapters.contracts.relation import RelationType, RelationConfig
from dbt.common.clients.agate_helper import DEFAULT_TYPE_TESTER
from dbt.common.contracts.constraints import ConstraintType

logger = AdapterLogger("Spark")

Expand Down Expand Up @@ -352,9 +352,9 @@ def _get_columns_for_catalog(self, relation: BaseRelation) -> Iterable[Dict[str,
yield as_dict

def get_catalog(
self, manifest: Manifest, selected_nodes: Optional[Set] = None
self, relation_configs: Iterable[RelationConfig], selected_nodes: Optional[Set] = None
) -> Tuple[agate.Table, List[Exception]]:
schema_map = self._get_catalog_schemas(manifest)
schema_map = self._get_catalog_schemas(relation_configs)
if len(schema_map) > 1:
raise dbt.exceptions.CompilationError(
f"Expected only one database in get_catalog, found " f"{list(schema_map)}"
Expand All @@ -371,7 +371,7 @@ def get_catalog(
self._get_one_catalog,
info,
[schema],
manifest,
relation_configs,
)
)
catalogs, exceptions = catch_as_completed(futures)
Expand All @@ -381,7 +381,7 @@ def _get_one_catalog(
self,
information_schema: InformationSchema,
schemas: Set[str],
manifest: Manifest,
relation_configs: Iterable[RelationConfig],
) -> agate.Table:
if len(schemas) != 1:
raise dbt.exceptions.CompilationError(
Expand Down
4 changes: 2 additions & 2 deletions dbt/adapters/spark/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from dataclasses import dataclass, field

from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.adapters.events.logging import AdapterLogger

from dbt.exceptions import DbtRuntimeError
from dbt.events import AdapterLogger
from dbt.common.exceptions import DbtRuntimeError

logger = AdapterLogger("Spark")

Expand Down
6 changes: 3 additions & 3 deletions dbt/adapters/spark/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from typing import Any, Dict, List, Optional, Tuple, Union, Sequence

from dbt.adapters.spark.connections import SparkConnectionWrapper
from dbt.events import AdapterLogger
from dbt.utils import DECIMALS
from dbt.exceptions import DbtRuntimeError
from dbt.adapters.events.logging import AdapterLogger
from dbt.common.utils.encoding import DECIMALS
from dbt.common.exceptions import DbtRuntimeError
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.utils import AnalysisException

Expand Down
4 changes: 2 additions & 2 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# install latest changes in dbt-core
# TODO: how to automate switching from develop to version branches?
git+https://github.com/dbt-labs/dbt-core.git@c2bc2f009bbeeb46b3c69d082ab4d485597898af#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git@c2bc2f009bbeeb46b3c69d082ab4d485597898af#egg=dbt-tests-adapter&subdirectory=tests/adapter
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter

# if version 1.x or greater -> pin to major version
# if version 0.x -> pin to minor
Expand Down
27 changes: 14 additions & 13 deletions tests/unit/test_adapter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
from multiprocessing import get_context
from unittest import mock

import dbt.flags as flags
Expand Down Expand Up @@ -146,7 +147,7 @@ def _get_target_odbc_sql_endpoint(self, project):

def test_http_connection(self):
config = self._get_target_http(self.project_cfg)
adapter = SparkAdapter(config)
adapter = SparkAdapter(config, get_context("spawn"))

def hive_http_connect(thrift_transport, configuration):
self.assertEqual(thrift_transport.scheme, "https")
Expand All @@ -171,7 +172,7 @@ def hive_http_connect(thrift_transport, configuration):

def test_thrift_connection(self):
config = self._get_target_thrift(self.project_cfg)
adapter = SparkAdapter(config)
adapter = SparkAdapter(config, get_context("spawn"))

def hive_thrift_connect(
host, port, username, auth, kerberos_service_name, password, configuration
Expand All @@ -195,7 +196,7 @@ def hive_thrift_connect(

def test_thrift_ssl_connection(self):
config = self._get_target_use_ssl_thrift(self.project_cfg)
adapter = SparkAdapter(config)
adapter = SparkAdapter(config, get_context("spawn"))

def hive_thrift_connect(thrift_transport, configuration):
self.assertIsNotNone(thrift_transport)
Expand All @@ -215,7 +216,7 @@ def hive_thrift_connect(thrift_transport, configuration):

def test_thrift_connection_kerberos(self):
config = self._get_target_thrift_kerberos(self.project_cfg)
adapter = SparkAdapter(config)
adapter = SparkAdapter(config, get_context("spawn"))

def hive_thrift_connect(
host, port, username, auth, kerberos_service_name, password, configuration
Expand All @@ -239,7 +240,7 @@ def hive_thrift_connect(

def test_odbc_cluster_connection(self):
config = self._get_target_odbc_cluster(self.project_cfg)
adapter = SparkAdapter(config)
adapter = SparkAdapter(config, get_context("spawn"))

def pyodbc_connect(connection_str, autocommit):
self.assertTrue(autocommit)
Expand All @@ -266,7 +267,7 @@ def pyodbc_connect(connection_str, autocommit):

def test_odbc_endpoint_connection(self):
config = self._get_target_odbc_sql_endpoint(self.project_cfg)
adapter = SparkAdapter(config)
adapter = SparkAdapter(config, get_context("spawn"))

def pyodbc_connect(connection_str, autocommit):
self.assertTrue(autocommit)
Expand Down Expand Up @@ -329,7 +330,7 @@ def test_parse_relation(self):
input_cols = [Row(keys=["col_name", "data_type"], values=r) for r in plain_rows]

config = self._get_target_http(self.project_cfg)
rows = SparkAdapter(config).parse_describe_extended(relation, input_cols)
rows = SparkAdapter(config, get_context("spawn")).parse_describe_extended(relation, input_cols)
self.assertEqual(len(rows), 4)
self.assertEqual(
rows[0].to_column_dict(omit_none=False),
Expand Down Expand Up @@ -418,7 +419,7 @@ def test_parse_relation_with_integer_owner(self):
input_cols = [Row(keys=["col_name", "data_type"], values=r) for r in plain_rows]

config = self._get_target_http(self.project_cfg)
rows = SparkAdapter(config).parse_describe_extended(relation, input_cols)
rows = SparkAdapter(config, get_context("spawn")).parse_describe_extended(relation, input_cols)

self.assertEqual(rows[0].to_column_dict().get("table_owner"), "1234")

Expand Down Expand Up @@ -454,7 +455,7 @@ def test_parse_relation_with_statistics(self):
input_cols = [Row(keys=["col_name", "data_type"], values=r) for r in plain_rows]

config = self._get_target_http(self.project_cfg)
rows = SparkAdapter(config).parse_describe_extended(relation, input_cols)
rows = SparkAdapter(config, get_context("spawn")).parse_describe_extended(relation, input_cols)
self.assertEqual(len(rows), 1)
self.assertEqual(
rows[0].to_column_dict(omit_none=False),
Expand Down Expand Up @@ -483,7 +484,7 @@ def test_parse_relation_with_statistics(self):

def test_relation_with_database(self):
config = self._get_target_http(self.project_cfg)
adapter = SparkAdapter(config)
adapter = SparkAdapter(config, get_context("spawn"))
# fine
adapter.Relation.create(schema="different", identifier="table")
with self.assertRaises(DbtRuntimeError):
Expand Down Expand Up @@ -564,7 +565,7 @@ def test_parse_columns_from_information_with_table_type_and_delta_provider(self)
)

config = self._get_target_http(self.project_cfg)
columns = SparkAdapter(config).parse_columns_from_information(relation)
columns = SparkAdapter(config, get_context("spawn")).parse_columns_from_information(relation)
self.assertEqual(len(columns), 4)
self.assertEqual(
columns[0].to_column_dict(omit_none=False),
Expand Down Expand Up @@ -649,7 +650,7 @@ def test_parse_columns_from_information_with_view_type(self):
)

config = self._get_target_http(self.project_cfg)
columns = SparkAdapter(config).parse_columns_from_information(relation)
columns = SparkAdapter(config, get_context("spawn")).parse_columns_from_information(relation)
self.assertEqual(len(columns), 4)
self.assertEqual(
columns[1].to_column_dict(omit_none=False),
Expand Down Expand Up @@ -715,7 +716,7 @@ def test_parse_columns_from_information_with_table_type_and_parquet_provider(sel
)

config = self._get_target_http(self.project_cfg)
columns = SparkAdapter(config).parse_columns_from_information(relation)
columns = SparkAdapter(config, get_context("spawn")).parse_columns_from_information(relation)
self.assertEqual(len(columns), 4)

self.assertEqual(
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import agate
import pytest
from dbt.dataclass_schema import ValidationError
from dbt.common.dataclass_schema import ValidationError
from dbt.config.project import PartialProject


Expand Down
Loading