Skip to content

Commit

Permalink
(fix) Add support for attaching a MotherDuck database with token in s…
Browse files Browse the repository at this point in the history
…ettings or config (#444)

* swap attach and plugin init, set motherduck_token if md: path in attach list, add UT
* support getting token from settings and path, add UT
* always add token to config and make path unique (because duckdb 1.1.0 cares)
  • Loading branch information
guenp authored Sep 24, 2024
1 parent b9c46e4 commit acf622d
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 35 deletions.
20 changes: 16 additions & 4 deletions dbt/adapters/duckdb/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ class Attachment(dbtClassMixin):
read_only: bool = False

def to_sql(self) -> str:
base = f"ATTACH '{self.path}'"
# remove query parameters (not supported in ATTACH)
parsed = urlparse(self.path)
path = self.path.replace(f"?{parsed.query}", "")
base = f"ATTACH '{path}'"
if self.alias:
base += f" AS {self.alias}"
options = []
Expand Down Expand Up @@ -188,14 +191,23 @@ def secrets_sql(self) -> List[str]:
return [secret.to_sql() for secret in self._secrets]

@property
def is_motherduck(self):
def motherduck_attach(self):
# Check if any MotherDuck paths are attached
attach = []
for attached_db in self.attach or []:
parsed = urlparse(attached_db.path)
if self._is_motherduck(parsed.scheme):
return True
attach.append(attached_db)
return attach

@property
def is_motherduck_attach(self):
return len(self.motherduck_attach) > 0

@property
def is_motherduck(self):
parsed = urlparse(self.path)
return self._is_motherduck(parsed.scheme)
return self._is_motherduck(parsed.scheme) or self.is_motherduck_attach

@staticmethod
def _is_motherduck(scheme: str) -> bool:
Expand Down
10 changes: 5 additions & 5 deletions dbt/adapters/duckdb/environments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,17 @@ def initialize_db(
fs = fsspec.filesystem(fsimpl, **curr)
conn.register_filesystem(fs)

# attach any databases that we will be using
if creds.attach:
for attachment in creds.attach:
conn.execute(attachment.to_sql())

# let the plugins do any configuration on the
# connection that they need to do
if plugins:
for plugin in plugins.values():
plugin.configure_connection(conn)

# attach any databases that we will be using
if creds.attach:
for attachment in creds.attach:
conn.execute(attachment.to_sql())

return conn

@classmethod
Expand Down
72 changes: 53 additions & 19 deletions dbt/adapters/duckdb/plugins/motherduck.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Any
from typing import Dict
from urllib.parse import parse_qs
from urllib.parse import urlparse

from duckdb import DuckDBPyConnection

Expand All @@ -8,40 +10,72 @@
from dbt.adapters.duckdb.credentials import DuckDBCredentials
from dbt.version import __version__

TOKEN = "token"
MOTHERDUCK_TOKEN = "motherduck_token"
CUSTOM_USER_AGENT = "custom_user_agent"
MOTHERDUCK_EXT = "motherduck"
MOTHERDUCK_CONFIG_OPTIONS = [MOTHERDUCK_TOKEN]


class Plugin(BasePlugin):
def initialize(self, config: Dict[str, Any]):
self._token = config.get("token")
def initialize(self, plugin_config: Dict[str, Any]):
self._config = plugin_config
self._token = self.token_from_config(plugin_config)

def configure_connection(self, conn: DuckDBPyConnection):
conn.load_extension("motherduck")
conn.load_extension(MOTHERDUCK_EXT)
# If a MotherDuck database is in attachments,
# set config options *before* attaching
if self.creds is not None and self.creds.is_motherduck_attach:
# Check if the config options are specified in the path
for attachment in self.creds.motherduck_attach:
parsed = urlparse(attachment.path)
qs = parse_qs(parsed.query)
for KEY in MOTHERDUCK_CONFIG_OPTIONS:
value = qs.get(KEY)
if value:
conn.execute(f"SET {KEY} = '{value[0]}'")
# If config options are specified via plugin config, set them here
if self._config:
conn.execute(f"SET {MOTHERDUCK_TOKEN} = '{self._token}'")
elif self.creds.settings:
if MOTHERDUCK_TOKEN in self.creds.settings:
token = self.creds.settings.pop(MOTHERDUCK_TOKEN)
conn.execute(f"SET {MOTHERDUCK_TOKEN} = '{token}'")

@staticmethod
def token_from_config(creds: DuckDBCredentials) -> str:
def token_from_config(config: Dict[str, Any]) -> str:
"""Load the token from the MotherDuck plugin config
If not specified, this returns an empty string
:param str: MotherDuck token
"""
plugins = creds.plugins or []
for plugin in plugins:
if plugin.config:
token = plugin.config.get("token") or ""
return str(token)
if (
TOKEN in config
or TOKEN.upper() in config
or MOTHERDUCK_TOKEN in config
or MOTHERDUCK_TOKEN.upper() in config
):
token = (
config.get(TOKEN)
or config.get(TOKEN.upper())
or config.get(MOTHERDUCK_TOKEN)
or config.get(MOTHERDUCK_TOKEN.upper())
)
return str(token)
return ""

def update_connection_config(self, creds: DuckDBCredentials, config: Dict[str, Any]):
user_agent = f"dbt/{__version__} dbt-duckdb/{__plugin_version__}"
if "custom_user_agent" in config:
user_agent = f"{user_agent} {config['custom_user_agent']}"
if CUSTOM_USER_AGENT in config:
user_agent = f"{user_agent} {config[CUSTOM_USER_AGENT]}"
settings: Dict[str, Any] = creds.settings or {}
if "custom_user_agent" in settings:
user_agent = f"{user_agent} {settings.pop('custom_user_agent')}"
if CUSTOM_USER_AGENT in settings:
user_agent = f"{user_agent} {settings.pop(CUSTOM_USER_AGENT)}"

config["custom_user_agent"] = user_agent
config[CUSTOM_USER_AGENT] = user_agent

# If a user specified the token via the plugin config,
# pass it to the config kwarg in duckdb.connect
token = self.token_from_config(creds)
if token != "":
config["motherduck_token"] = token
# If a user specified MotherDuck config options via the plugin config,
# pass it to the config kwarg in duckdb.connect.
if not creds.is_motherduck_attach and self._token:
config[MOTHERDUCK_TOKEN] = self._token
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def dbt_profile_target(profile_type, bv_server_process, tmp_path_factory):
environment variable to run tests against MotherDuck"
)
profile["token"] = os.environ.get(TEST_MOTHERDUCK_TOKEN)
else:
profile["token"] = os.environ.get(MOTHERDUCK_TOKEN, os.environ.get(MOTHERDUCK_TOKEN.lower()))
profile["disable_transactions"] = True
profile["path"] = "md:test"
elif profile_type == "memory":
Expand Down
16 changes: 9 additions & 7 deletions tests/functional/plugins/test_motherduck.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ def profiles_config_update(self, dbt_profile_target):
"outputs": {
"dev": {
"type": "duckdb",
"path": dbt_profile_target.get("path", ":memory:"),
# make path unique from other tests that don't pass the token via config
# to avoid duckdb 1.1.0 caching error (see https://github.com/duckdb/duckdb/pull/13129)
"path": dbt_profile_target.get("path", ":memory:") + "?user=test_motherduck",
"plugins": plugins,
}
},
Expand Down Expand Up @@ -130,23 +132,23 @@ def test_incremental_temp_table_exists(self, project):


@pytest.fixture
def mock_md_plugin():
return Plugin.create("motherduck")
def mock_plugin_config():
return {"token": "quack"}


@pytest.fixture
def mock_creds(dbt_profile_target):
plugin_config = PluginConfig(module="motherduck", config={"token": "quack"})
def mock_creds(dbt_profile_target, mock_plugin_config):
plugin_config = PluginConfig(module="motherduck", config=mock_plugin_config)
if "md:" in dbt_profile_target["path"]:
return DuckDBCredentials(path=dbt_profile_target["path"], plugins=[plugin_config])
return DuckDBCredentials(path=dbt_profile_target["path"])


@pytest.fixture
def mock_plugins(mock_creds, mock_md_plugin):
def mock_plugins(mock_creds, mock_plugin_config):
plugins = {}
if mock_creds.is_motherduck:
plugins["motherduck"] = mock_md_plugin
plugins["motherduck"] = Plugin.create("motherduck", config=mock_plugin_config)
return plugins


Expand Down
155 changes: 155 additions & 0 deletions tests/functional/plugins/test_motherduck_attach.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from urllib.parse import urlparse
import pytest
from dbt.tests.util import (
run_dbt,
)

random_logs_sql = """
{{ config(materialized='table', meta=dict(temp_schema_name='dbt_temp_test')) }}
select
uuid()::varchar as log_id,
'2023-10-01'::timestamp + interval 1 minute * (random() * 20000)::int as dt ,
(random() * 4)::int64 as user_id
from generate_series(1, 10000) g(x)
"""

summary_of_logs_sql = """
{{
config(
materialized='incremental',
meta=dict(temp_schema_name='dbt_temp_test'),
)
}}
select dt::date as dt, user_id, count(1) as c
from {{ ref('random_logs_test') }}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
-- (uses > to include records whose timestamp occurred since the last run of this model)
where dt > '2023-10-08'::timestamp
{% endif %}
group by all
"""

python_pyarrow_table_model = """
import pyarrow as pa
def model(dbt, con):
return pa.Table.from_pydict({"a": [1,2,3]})
"""

@pytest.mark.skip_profile("buenavista", "file", "memory")
class TestMDPluginAttach:
@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target):
md_config = {"token": dbt_profile_target.get("token")}
plugins = [{"module": "motherduck", "config": md_config}]
return {
"test": {
"outputs": {
"dev": {
"type": "duckdb",
"path": ":memory:",
"plugins": plugins,
"attach": [
{
"path": dbt_profile_target.get("path", ":memory:"),
"type": "motherduck"
}
]
}
},
"target": "dev",
}
}

@pytest.fixture(scope="class")
def models(self, md_sql):
return {
"md_table.sql": md_sql,
"random_logs_test.sql": random_logs_sql,
"summary_of_logs_test.sql": summary_of_logs_sql,
"python_pyarrow_table_model.py": python_pyarrow_table_model,
}

@pytest.fixture(scope="class")
def database_name(self, dbt_profile_target):
return urlparse(dbt_profile_target["path"]).path

@pytest.fixture(scope="class")
def md_sql(self, database_name):
# Reads from a MD database in my test account in the cloud
return f"""
select * FROM {database_name}.main.plugin_table
"""

@pytest.fixture(autouse=True)
def run_dbt_scope(self, project, database_name):
# CREATE DATABASE does not work with SaaS mode on duckdb 1.0.0
# This will be fixed in duckdb 1.1.0
# project.run_sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
project.run_sql("CREATE OR REPLACE TABLE plugin_table (i integer, j string)")
project.run_sql("INSERT INTO plugin_table (i, j) VALUES (1, 'foo')")
yield
project.run_sql("DROP VIEW md_table")
project.run_sql("DROP TABLE random_logs_test")
project.run_sql("DROP TABLE summary_of_logs_test")
project.run_sql("DROP TABLE plugin_table")
project.run_sql("DROP TABLE python_pyarrow_table_model")

def test_motherduck(self, project):
run_dbt(expect_pass=False)


@pytest.mark.skip_profile("buenavista", "file", "memory")
class TestMDPluginAttachWithSettings(TestMDPluginAttach):
@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target):
md_setting = {"motherduck_token": dbt_profile_target.get("token")}
return {
"test": {
"outputs": {
"dev": {
"type": "duckdb",
"path": ":memory:",
"attach": [
{
"path": dbt_profile_target.get("path", ":memory:"),
"type": "motherduck"
}
],
"settings": md_setting
}
},
"target": "dev",
}
}


@pytest.mark.skip_profile("buenavista", "file", "memory")
class TestMDPluginAttachWithTokenInPath(TestMDPluginAttach):
@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target):
token = dbt_profile_target.get("token")
return {
"test": {
"outputs": {
"dev": {
"type": "duckdb",
"path": ":memory:",
"attach": [
{
"path": dbt_profile_target.get("path", ":memory:") + f"?motherduck_token={token}&user=1",
"type": "motherduck"
}
]
}
},
"target": "dev",
}
}

0 comments on commit acf622d

Please sign in to comment.