Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External catalog config #10814

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/dbt/artifacts/resources/v1/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from datetime import timedelta
from typing import Any, Dict, List, Optional, Union

from dbt_config.catalog_config import ExternalCatalog

from dbt.artifacts.resources.base import Docs, FileHash, GraphResource
from dbt.artifacts.resources.types import NodeType, TimePeriod
from dbt.artifacts.resources.v1.config import NodeConfig
Expand Down Expand Up @@ -164,6 +166,7 @@ class DeferRelation(HasRelationMetadata):
meta: Dict[str, Any]
tags: List[str]
config: Optional[NodeConfig]
external_catalog: Optional[ExternalCatalog]

@property
def identifier(self):
Expand Down
24 changes: 24 additions & 0 deletions core/dbt/config/external_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import Dict, Optional

from dbt.clients.yaml_helper import load_yaml_text
from dbt.constants import EXTERNAL_CATALOG_FILE_NAME
from dbt_common.clients.system import load_file_contents, path_exists


def _load_yaml(path):
contents = load_file_contents(path)
return load_yaml_text(contents)


def _load_yml_dict(file_path):
if path_exists(file_path):
ret = _load_yaml(file_path) or {}
return ret
return None


def load_external_catalog_config(project) -> Optional[Dict]:
unparsed_config = _load_yml_dict(f"{project.project_root}/{EXTERNAL_CATALOG_FILE_NAME}")
if unparsed_config is not None:
return unparsed_config
return None
6 changes: 6 additions & 0 deletions core/dbt/config/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,9 @@ class PackageRenderer(SecretRenderer):
@property
def name(self):
return "Packages config"


class CatalogRenderer(SecretRenderer):
@property
def name(self):
return "Catalog config"
19 changes: 16 additions & 3 deletions core/dbt/config/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
Type,
)

from dbt_config.catalog_config import ExternalCatalogConfig

from dbt import tracking
from dbt.adapters.contracts.connection import (
AdapterRequiredConfig,
Expand All @@ -39,6 +41,7 @@
from dbt_common.events.functions import warn_or_error
from dbt_common.helper_types import DictDefaultEmptyStr, FQNPath, PathSet

from .external_config import load_external_catalog_config
from .profile import Profile
from .project import Project
from .renderer import DbtProjectYamlRenderer, ProfileRenderer
Expand Down Expand Up @@ -98,6 +101,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig):
profile_name: str
cli_vars: Dict[str, Any]
dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None
catalogs: Optional[ExternalCatalogConfig] = None

def __post_init__(self):
self.validate()
Expand Down Expand Up @@ -125,12 +129,15 @@ def from_parts(
profile: Profile,
args: Any,
dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None,
catalogs: Optional[ExternalCatalogConfig] = None,
) -> "RuntimeConfig":
"""Instantiate a RuntimeConfig from its components.

:param profile: A parsed dbt Profile.
:param project: A parsed dbt Project.
:param args: The parsed command-line arguments.
:param dependencies: A mapping of project names to RuntimeConfigs.
:param catalogs: A parsed dbt ExternalCatalogConfig.
:returns RuntimeConfig: The new configuration.
"""
quoting: Dict[str, Any] = (
Expand Down Expand Up @@ -194,6 +201,7 @@ def from_parts(
dependencies=dependencies,
dbt_cloud=project.dbt_cloud,
flags=project.flags,
catalogs=catalogs,
)

# Called by 'load_projects' in this class
Expand Down Expand Up @@ -253,7 +261,9 @@ def validate(self):

# Called by RuntimeConfig.from_args
@classmethod
def collect_parts(cls: Type["RuntimeConfig"], args: Any) -> Tuple[Project, Profile]:
def collect_parts(
cls: Type["RuntimeConfig"], args: Any
) -> Tuple[Project, Profile, Optional[ExternalCatalogConfig]]:
# profile_name from the project
project_root = args.project_dir if args.project_dir else os.getcwd()
cli_vars: Dict[str, Any] = getattr(args, "vars", {})
Expand All @@ -264,7 +274,9 @@ def collect_parts(cls: Type["RuntimeConfig"], args: Any) -> Tuple[Project, Profi
)
flags = get_flags()
project = load_project(project_root, bool(flags.VERSION_CHECK), profile, cli_vars)
return project, profile
catalog_yml = load_external_catalog_config(project)
catalogs = ExternalCatalogConfig.model_validate(catalog_yml) if catalog_yml else None
return project, profile, catalogs

# Called in task/base.py, in BaseTask.from_args
@classmethod
Expand All @@ -278,12 +290,13 @@ def from_args(cls, args: Any) -> "RuntimeConfig":
:raises DbtProfileError: If the profile is invalid or missing.
:raises DbtValidationError: If the cli variables are invalid.
"""
project, profile = cls.collect_parts(args)
project, profile, catalogs = cls.collect_parts(args)

return cls.from_parts(
project=project,
profile=profile,
args=args,
catalogs=catalogs,
)

def get_metadata(self) -> ManifestMetadata:
Expand Down
1 change: 1 addition & 0 deletions core/dbt/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
PACKAGES_FILE_NAME = "packages.yml"
DEPENDENCIES_FILE_NAME = "dependencies.yml"
PACKAGE_LOCK_FILE_NAME = "package-lock.yml"
EXTERNAL_CATALOG_FILE_NAME = "catalog.yml"
MANIFEST_FILE_NAME = "manifest.json"
SEMANTIC_MANIFEST_FILE_NAME = "semantic_manifest.json"
LEGACY_TIME_SPINE_MODEL_NAME = "metricflow_time_spine"
Expand Down
5 changes: 5 additions & 0 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing_extensions import Protocol

from dbt import selected_resources
from dbt.adapters.base.catalog import ExternalCatalogIntegrations
from dbt.adapters.base.column import Column
from dbt.adapters.base.relation import EventTimeFilter
from dbt.adapters.contracts.connection import AdapterResponse
Expand Down Expand Up @@ -890,6 +891,9 @@ def __init__(
self.context_config: Optional[ContextConfig] = context_config
self.provider: Provider = provider
self.adapter = get_adapter(self.config)
self.catalog_integrations = ExternalCatalogIntegrations.from_json_strings(
self.manifest.catalogs.values(), self.adapter.ExternalCatalogIntegration
)
# The macro namespace is used in creating the DatabaseWrapper
self.db_wrapper = self.provider.DatabaseWrapper(self.adapter, self.namespace)

Expand Down Expand Up @@ -1287,6 +1291,7 @@ def api(self) -> Dict[str, Any]:
return {
"Relation": self.db_wrapper.Relation,
"Column": self.adapter.Column,
"catalogs": self.catalog_integrations,
}

@contextproperty()
Expand Down
22 changes: 22 additions & 0 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Union,
)

from dbt_config.catalog_config import ExternalCatalog
from typing_extensions import Protocol

import dbt_common.exceptions
Expand Down Expand Up @@ -882,6 +883,7 @@ class Manifest(MacroMethods, dbtClassMixin):
unit_tests: MutableMapping[str, UnitTestDefinition] = field(default_factory=dict)
saved_queries: MutableMapping[str, SavedQuery] = field(default_factory=dict)
fixtures: MutableMapping[str, UnitTestFileFixture] = field(default_factory=dict)
catalogs: MutableMapping[str, str] = field(default_factory=dict)

_doc_lookup: Optional[DocLookup] = field(
default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None}
Expand Down Expand Up @@ -1379,6 +1381,26 @@ def resolve_source(
current_project: str,
node_package: str,
) -> MaybeParsedSource:
if target_source_name in self.catalogs:
catalog = ExternalCatalog.model_validate_json(self.catalogs[target_source_name])
identifier = f"{target_source_name}.{target_table_name}"
catalog_database = catalog.configuration.internal_namespace.database
catalog_schema = catalog.configuration.internal_namespace.schema_
return SourceDefinition(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add merge functionality to merge with sources.yml

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move down to below if disabled

database=catalog_database,
schema=catalog_schema,
fqn=[catalog_database, catalog_schema, catalog.name, target_table_name],
name=target_table_name,
source_description=f"External Catalog source for {target_source_name}.{target_table_name}",
source_name=target_source_name,
unique_id=identifier,
identifier=identifier,
package_name="dbt",
path="/root/catalogs.yml",
loader=catalog.type.value,
resource_type=NodeType.Source,
original_file_path="/root/catalogs.yml",
)
search_name = f"{target_source_name}.{target_table_name}"
candidates = _packages_to_search(current_project, node_package)

Expand Down
15 changes: 15 additions & 0 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Any, Callable, Dict, List, Mapping, Optional, Set, Tuple, Type, Union

import msgpack
from dbt_config.catalog_config import ExternalCatalogConfig

import dbt.deprecations
import dbt.exceptions
Expand All @@ -29,6 +30,7 @@
from dbt.clients.jinja import MacroStack, get_rendered
from dbt.clients.jinja_static import statically_extract_macro_calls
from dbt.config import Project, RuntimeConfig
from dbt.config.external_config import load_external_catalog_config
from dbt.constants import (
MANIFEST_FILE_NAME,
PARTIAL_PARSE_FILE_NAME,
Expand Down Expand Up @@ -444,6 +446,13 @@ def load(self) -> Manifest:
self.manifest.sources = patcher.sources
self._perf_info.patch_sources_elapsed = time.perf_counter() - start_patch

# Get catalog.yml and update the manifest
raw_catalog = load_external_catalog_config(self.root_project)
if raw_catalog:
catalog_config = ExternalCatalogConfig.model_validate(raw_catalog)
self.manifest.catalogs = {
c.name: c.model_dump_json(by_alias=True) for c in catalog_config.catalogs
}
# We need to rebuild disabled in order to include disabled sources
self.manifest.rebuild_disabled_lookup()

Expand All @@ -466,6 +475,7 @@ def load(self) -> Manifest:
self.process_docs(self.root_project)
self.process_metrics(self.root_project)
self.process_saved_queries(self.root_project)
self.process_catalog(self.root_project)
self.process_model_inferred_primary_keys()
self.check_valid_group_config()
self.check_valid_access_property()
Expand Down Expand Up @@ -1140,6 +1150,11 @@ def process_metrics(self, config: RuntimeConfig):
continue
_process_metrics_for_node(self.manifest, current_project, exposure)

def process_catalog(self, config: RuntimeConfig):
if config.catalogs:
for catalog in config.catalogs.catalogs:
self.manifest.catalogs[catalog.name] = catalog.model_dump_json(by_alias=True)

def process_saved_queries(self, config: RuntimeConfig):
"""Processes SavedQuery nodes to populate their `depends_on`."""
# Note: This will also capture various nodes which have been re-parsed
Expand Down
3 changes: 2 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
git+https://github.com/dbt-labs/dbt-adapters.git@main
git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git@main
git+https://github.com/dbt-labs/dbt-common.git@feature/externalCatalogConfig
git+https://github.com/dbt-labs/dbt-common.git@feature/externalCatalogConfig#egg=dbt-config&subdirectory=config
git+https://github.com/dbt-labs/dbt-postgres.git@main
# black must match what's in .pre-commit-config.yaml to be sure local env matches CI
black==24.3.0
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/configs/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

models__untagged_sql = """
{{
config(materialized='table')
config(materialized=table)
}}

select id, value from {{ source('raw', 'seed') }}
Expand Down
48 changes: 48 additions & 0 deletions tests/functional/test_external_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pytest
import yaml
from dbt_config.catalog_config import ExternalCatalog

from dbt.tests.util import run_dbt, write_file


@pytest.fixture(scope="class", autouse=True)
def dbt_catalog_config(project_root):
config = {
"catalogs": [
{
"name": "my_external_catalog",
"type": "iceberg",
"configuration": {
"table_format": "iceberg",
"catalog_namespace": "dbt",
"internal_namespace": {
"database": "my_db",
"schema": "my_schema",
},
"external_location": "s3://my-bucket/my-path",
},
"management": {
"enabled": True,
"create_if_not_exists": False,
"alter_if_different": False,
"read_only": True,
"refresh": "on-start",
},
}
],
}
write_file(yaml.safe_dump(config), project_root, "catalog.yml")


class TestCatalogConfig:
@pytest.fixture(scope="class")
def models(self):
return {
"model.sql": "select 1 as id from {{ source('my_external_catalog', 'my_table') }}",
}

def test_supplying_external_catalog(self, project):
manifest = run_dbt(["parse"])
assert manifest.catalogs != {}
assert manifest.nodes["model.test.model"].sources == [["my_external_catalog", "my_table"]]
ExternalCatalog.model_validate_json(manifest.catalogs["my_external_catalog"])
20 changes: 18 additions & 2 deletions tests/unit/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@

import os
import string
from typing import Dict
from unittest import TestCase, mock

import agate
import pytest
from dbt_config.catalog_config import ExternalCatalogConfig

from dbt.config.project import PartialProject
from dbt.config.renderer import CatalogRenderer
from dbt.contracts.graph.manifest import Manifest
from dbt_common.dataclass_schema import ValidationError

Expand Down Expand Up @@ -57,6 +60,14 @@ def profile_from_dict(profile, profile_name, cli_vars="{}"):
)


def catalog_from_dict(catalog, cli_vars=None):
if cli_vars is None:
cli_vars = {}
renderer = CatalogRenderer(cli_vars)
rendered = renderer.render_value(catalog)
return ExternalCatalogConfig.model_validate(rendered)


def project_from_dict(project, profile, packages=None, selectors=None, cli_vars="{}"):
from dbt.config.renderer import DbtProjectYamlRenderer
from dbt.config.utils import parse_cli_vars
Expand All @@ -77,7 +88,9 @@ def project_from_dict(project, profile, packages=None, selectors=None, cli_vars=
return partial.render(renderer)


def config_from_parts_or_dicts(project, profile, packages=None, selectors=None, cli_vars={}):
def config_from_parts_or_dicts(
project, profile, packages=None, selectors=None, cli_vars={}, catalogs=None
):
from copy import deepcopy

from dbt.config import Profile, Project, RuntimeConfig
Expand All @@ -103,10 +116,13 @@ def config_from_parts_or_dicts(project, profile, packages=None, selectors=None,
cli_vars,
)

if isinstance(catalogs, Dict):
catalogs = catalog_from_dict(catalogs, cli_vars)

args = Obj()
args.vars = cli_vars
args.profile_dir = "/dev/null"
return RuntimeConfig.from_parts(project=project, profile=profile, args=args)
return RuntimeConfig.from_parts(project=project, profile=profile, args=args, catalogs=catalogs)


def inject_plugin(plugin):
Expand Down
Loading