Skip to content

Commit

Permalink
Merge pull request #623 from dbt-labs/dbt-mf-cli-integration
Browse files Browse the repository at this point in the history
Enable dbt AdapterBackedSqlClient queries from the MetricFlow CLI
  • Loading branch information
tlento authored Jun 27, 2023
2 parents e9814a8 + 671d346 commit f00b86d
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 133 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230626-133439.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Use dbt adapter to run queries and warehouse validations from MetricFlow CLI
time: 2023-06-26T13:34:39.613243-07:00
custom:
Author: tlento
Issue: "624"
2 changes: 1 addition & 1 deletion metricflow/cli/cli_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def run_health_checks(self) -> Dict[str, Dict[str, str]]:
def _initialize_metricflow_engine(self) -> None:
"""Initialize the MetricFlowEngine."""
try:
self._mf = MetricFlowEngine.from_dbt_project_root(self.config)
self._mf = MetricFlowEngine.from_dbt_project_root()
except Exception as e:
raise MetricFlowInitException from e

Expand Down
71 changes: 71 additions & 0 deletions metricflow/cli/dbt_connectors/dbt_config_accessor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from __future__ import annotations

import dataclasses
import logging
from pathlib import Path
from typing import Type

from dbt.adapters.base.impl import BaseAdapter
from dbt.adapters.factory import get_adapter_by_type
from dbt.cli.main import dbtRunner
from dbt.config.profile import Profile
from dbt.config.project import Project
from dbt.config.runtime import load_profile, load_project
from dbt_semantic_interfaces.implementations.semantic_manifest import PydanticSemanticManifest
from dbt_semantic_interfaces.pretty_print import pformat_big_objects
from dbt_semantic_interfaces.protocols.semantic_manifest import SemanticManifest
from dbt_semantic_interfaces.transformations.semantic_manifest_transformer import PydanticSemanticManifestTransformer
from typing_extensions import Self

from metricflow.errors.errors import ModelCreationException

logger = logging.getLogger(__name__)


@dataclasses.dataclass
class dbtArtifacts:
"""Container with access to the dbt artifacts required to power the MetricFlow CLI."""

profile: Profile
project: Project
adapter: BaseAdapter
semantic_manifest: SemanticManifest

@classmethod
def load_from_project_path(cls: Type[Self], project_path: Path) -> Self:
"""Loads all dbt artifacts for the project associated with the given project path."""
logger.info(f"Loading dbt artifacts for project located at {project_path}")
dbtRunner().invoke(["-q", "debug"], project_dir=str(project_path))
profile = load_profile(str(project_path), {})
project = load_project(str(project_path), version_check=False, profile=profile)
logger.info(f"Loaded project {project.project_name} with profile details:\n{pformat_big_objects(profile)}")
# dbt's get_adapter helper expects an AdapterRequiredConfig, but `project` is missing cli_vars
# In practice, get_adapter only actually requires HasCredentials, so we replicate the type extraction
# from get_adapter here rather than spinning up a full RuntimeConfig instance
# TODO: Move to a fully supported interface when one becomes available
adapter = get_adapter_by_type(profile.credentials.type)
semantic_manifest = dbtArtifacts.build_semantic_manifest_from_dbt_project_root(project_root=project_path)
return cls(profile=profile, project=project, adapter=adapter, semantic_manifest=semantic_manifest)

@staticmethod
def build_semantic_manifest_from_dbt_project_root(project_root: Path) -> SemanticManifest:
"""In the dbt project root, retrieve the manifest path and parse the SemanticManifest."""
DEFAULT_TARGET_PATH = "target/semantic_manifest.json"
full_path_to_manifest = Path(project_root, DEFAULT_TARGET_PATH).resolve()
if not full_path_to_manifest.exists():
raise ModelCreationException(
f"Unable to find {full_path_to_manifest}\n"
"Please ensure that you are running `mf` in the root directory of a dbt project "
"and that the semantic_manifest JSON exists."
)
try:
with open(full_path_to_manifest, "r") as file:
raw_contents = file.read()
raw_model = PydanticSemanticManifest.parse_raw(raw_contents)
# The serialized object in the dbt project does not have all transformations applied to it at
# this time, which causes failures with input measure resolution.
# TODO: remove this transform call once the upstream changes are integrated into our dependency tree
model = PydanticSemanticManifestTransformer.transform(raw_model)
return model
except Exception as e:
raise ModelCreationException from e
101 changes: 16 additions & 85 deletions metricflow/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,19 @@
from metricflow.cli import PACKAGE_NAME
from metricflow.cli.cli_context import CLIContext
from metricflow.cli.constants import DEFAULT_RESULT_DECIMAL_PLACES, MAX_LIST_OBJECT_ELEMENTS
from metricflow.cli.dbt_connectors.adapter_backed_client import AdapterBackedSqlClient
from metricflow.cli.dbt_connectors.dbt_config_accessor import dbtArtifacts
from metricflow.cli.tutorial import create_sample_data, gen_sample_model_configs, remove_sample_tables
from metricflow.cli.utils import (
MF_BIGQUERY_KEYS,
MF_CONFIG_KEYS,
MF_DATABRICKS_KEYS,
MF_POSTGRESQL_KEYS,
MF_REDSHIFT_KEYS,
MF_SNOWFLAKE_KEYS,
exception_handler,
generate_duckdb_demo_keys,
get_data_warehouse_config_link,
query_options,
start_end_time_options,
)
from metricflow.configuration.config_builder import YamlTemplateBuilder
from metricflow.dag.dag_visualization import display_dag_as_svg
from metricflow.dataflow.dataflow_plan_to_text import dataflow_plan_as_text
from metricflow.engine.metricflow_engine import MetricFlowExplainResult, MetricFlowQueryRequest, MetricFlowQueryResult
from metricflow.engine.utils import build_semantic_manifest_from_dbt_project_root
from metricflow.model.data_warehouse_model_validator import DataWarehouseModelValidator
from metricflow.sql_clients.common_client import SqlDialect
from metricflow.telemetry.models import TelemetryLevel
from metricflow.telemetry.reporter import TelemetryReporter, log_call

Expand Down Expand Up @@ -101,72 +93,6 @@ def exit_signal_handler(signal_type: int, frame) -> None: # type: ignore
signal.signal(signal.SIGTERM, exit_signal_handler)


@cli.command()
@click.option("--restart", is_flag=True, help="Wipe the config file and start over")
@pass_config
@log_call(module_name=__name__, telemetry_reporter=_telemetry_reporter)
def setup(cfg: CLIContext, restart: bool) -> None:
"""Setup MetricFlow."""
click.echo(
textwrap.dedent(
"""\
🎉 Welcome to MetricFlow! 🎉
"""
)
)

path = pathlib.Path(cfg.config.file_path)
abs_path = path.absolute()
to_create = not path.exists() or restart

# Seed the config template to the config file
if to_create:
dialect_map = {
SqlDialect.SNOWFLAKE.value: MF_SNOWFLAKE_KEYS,
SqlDialect.BIGQUERY.value: MF_BIGQUERY_KEYS,
SqlDialect.REDSHIFT.value: MF_REDSHIFT_KEYS,
SqlDialect.POSTGRESQL.value: MF_POSTGRESQL_KEYS,
SqlDialect.DUCKDB.value: generate_duckdb_demo_keys(config_dir=cfg.config.dir_path),
SqlDialect.DATABRICKS.value: MF_DATABRICKS_KEYS,
}

click.echo("Please enter your data warehouse dialect.")
click.echo("Use 'duckdb' for a standalone demo.")
click.echo("")
dialect = click.prompt(
"Dialect",
type=click.Choice(sorted([x for x in dialect_map.keys()])),
show_choices=True,
)

# If there is a collision, prefer to use the key in the dialect.
config_keys = list(dialect_map[dialect])
for mf_config_key in MF_CONFIG_KEYS:
if not any(x.key == mf_config_key.key for x in config_keys):
config_keys.append(mf_config_key)

with open(abs_path, "w") as file:
YamlTemplateBuilder.write_yaml(config_keys, file)

template_description = (
f"A template config file has been created in {abs_path}.\n"
if to_create
else f"A template config file already exists in {abs_path}, so it was left alone.\n"
)
click.echo(
textwrap.dedent(
f"""\
💻 {template_description}
If you are new to MetricFlow, we recommend you to run through our tutorial with `mf tutorial`\n
Next steps:
1. Review and fill out relevant fields.
2. Run `mf health-checks` to validate the data warehouse connection.
3. Run `mf validate-configs` to validate the model configurations.
"""
)
)


@cli.command()
@click.option("-m", "--msg", is_flag=True, help="Output the final steps dialogue")
@click.option("--skip-dw", is_flag=True, help="Skip the data warehouse health checks")
Expand All @@ -176,7 +102,8 @@ def setup(cfg: CLIContext, restart: bool) -> None:
@log_call(module_name=__name__, telemetry_reporter=_telemetry_reporter)
def tutorial(ctx: click.core.Context, cfg: CLIContext, msg: bool, skip_dw: bool, drop_tables: bool) -> None:
"""Run user through a tutorial."""
# This text is also located in the projects top-level README.md
click.echo("The mf tutorial command has not yet been updated to work with the dbt integration!")
exit()
help_msg = textwrap.dedent(
"""\
🤓 Please run the following steps,
Expand Down Expand Up @@ -210,18 +137,15 @@ def tutorial(ctx: click.core.Context, cfg: CLIContext, msg: bool, skip_dw: bool,
click.echo(help_msg)
exit()

# Check if the MetricFlow configuration file exists
path = pathlib.Path(cfg.config.file_path)
if not path.absolute().exists():
click.echo("💡 Please run `mf setup` to get your configs set up before going through the tutorial.")
exit()
# TODO: Add check for dbt project root here

# Validate that the data warehouse connection is successful
if not skip_dw:
ctx.invoke(health_checks)
click.confirm("❓ Are the health-checks all passing? Please fix them before continuing", abort=True)
click.echo("💡 For future reference, you can continue with the tutorial by adding `--skip-dw`\n")

# TODO: decide whether or not to allow management of tutorial datasets from the mf CLI and update accordingly
if drop_tables:
spinner = Halo(text="Dropping tables...", spinner="dots")
spinner.start()
Expand All @@ -239,7 +163,9 @@ def tutorial(ctx: click.core.Context, cfg: CLIContext, msg: bool, skip_dw: bool,
spinner.succeed("📀 Sample tables have been successfully created into your data warehouse.")

# Seed sample model file
model_path = os.path.join(cfg.config.dir_path, "sample_models")
# TODO: Make this work with the new dbt project locations. For now, put it in cwd to remove the reference
# to the model path from the MetricFlow config
model_path = os.path.join(os.getcwd(), "sample_models")
pathlib.Path(model_path).mkdir(parents=True, exist_ok=True)
click.echo(f"🤖 Attempting to generate model configs to your local filesystem in '{str(model_path)}'.")
spinner = Halo(text="Dropping tables...", spinner="dots")
Expand Down Expand Up @@ -672,9 +598,10 @@ def validate_configs(
# Parsing Validation
parsing_spinner = Halo(text="Building manifest from dbt project root", spinner="dots")
parsing_spinner.start()
project_root = pathlib.Path.cwd()

try:
semantic_manifest = build_semantic_manifest_from_dbt_project_root()
semantic_manifest = dbtArtifacts.build_semantic_manifest_from_dbt_project_root(project_root=project_root)
parsing_spinner.succeed("🎉 Successfully parsed manifest from dbt project")
except Exception as e:
parsing_spinner.fail(f"Exception found when parsing manifest from dbt project ({str(e)})")
Expand All @@ -698,7 +625,11 @@ def validate_configs(

dw_results = SemanticManifestValidationResults()
if not skip_dw:
dw_validator = DataWarehouseModelValidator(sql_client=cfg.sql_client, system_schema=cfg.mf_system_schema)
# fetch dbt adapters. This rebuilds the manifest again, but whatever.
dbt_artifacts = dbtArtifacts.load_from_project_path(project_path=project_root)
sql_client = AdapterBackedSqlClient(dbt_artifacts.adapter)
schema = dbt_artifacts.profile.credentials.schema
dw_validator = DataWarehouseModelValidator(sql_client=sql_client, system_schema=schema)
dw_results = _data_warehouse_validations_runner(
dw_validator=dw_validator, manifest=semantic_manifest, timeout=dw_timeout
)
Expand Down
17 changes: 1 addition & 16 deletions metricflow/configuration/config_builder.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import List, Optional, TextIO

from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap
from typing import Optional


@dataclass(frozen=True)
Expand All @@ -14,15 +11,3 @@ class ConfigKey:
key: str
value: str = ""
comment: Optional[str] = None


class YamlTemplateBuilder:
"""Class to construct and write YAML files."""

@staticmethod
def write_yaml(config_keys: List[ConfigKey], file_stream: TextIO) -> None: # noqa: D
yaml = YAML()
file_data = CommentedMap()
for key in config_keys:
file_data.insert(0, key.key, key.value, comment=key.comment)
yaml.dump(file_data, file_stream)
16 changes: 11 additions & 5 deletions metricflow/engine/metricflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import List, Optional, Sequence

import pandas as pd
from dbt_semantic_interfaces.pretty_print import pformat_big_objects
from dbt_semantic_interfaces.references import DimensionReference, EntityReference, MetricReference

from metricflow.cli.dbt_connectors.adapter_backed_client import AdapterBackedSqlClient
from metricflow.cli.dbt_connectors.dbt_config_accessor import (
dbtArtifacts,
)
from metricflow.configuration.constants import (
CONFIG_DWH_SCHEMA,
)
Expand All @@ -31,7 +36,6 @@
from metricflow.engine.time_source import ServerTimeSource
from metricflow.engine.utils import (
build_semantic_manifest_from_config,
build_semantic_manifest_from_dbt_project_root,
)
from metricflow.errors.errors import ExecutionException
from metricflow.execution.execution_plan import ExecutionPlan, SqlQuery
Expand Down Expand Up @@ -306,11 +310,13 @@ def from_config(handler: YamlFileHandler) -> MetricFlowEngine:
)

@staticmethod
def from_dbt_project_root(handler: YamlFileHandler) -> MetricFlowEngine:
def from_dbt_project_root() -> MetricFlowEngine:
"""Initialize MetricFlowEngine via the dbt project root directory."""
sql_client = make_sql_client_from_config(handler)
system_schema = not_empty(handler.get_value(CONFIG_DWH_SCHEMA), CONFIG_DWH_SCHEMA, handler.url)
semantic_manifest_lookup = SemanticManifestLookup(build_semantic_manifest_from_dbt_project_root())
dbt_artifacts = dbtArtifacts.load_from_project_path(Path.cwd())
semantic_manifest_lookup = SemanticManifestLookup(dbt_artifacts.semantic_manifest)
sql_client = AdapterBackedSqlClient(dbt_artifacts.adapter)
# TODO: remove this if possible after the time spine schema is sourced from the semantic manifest
system_schema = dbt_artifacts.profile.credentials.schema

return MetricFlowEngine(
semantic_manifest_lookup=semantic_manifest_lookup,
Expand Down
26 changes: 0 additions & 26 deletions metricflow/engine/utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
from __future__ import annotations

import datetime as dt
import pathlib
from typing import Optional

from dateutil.parser import parse
from dbt_semantic_interfaces.implementations.semantic_manifest import PydanticSemanticManifest
from dbt_semantic_interfaces.parsing.dir_to_model import (
SemanticManifestBuildResult,
parse_directory_of_yaml_files_to_semantic_manifest,
)
from dbt_semantic_interfaces.protocols.semantic_manifest import SemanticManifest
from dbt_semantic_interfaces.transformations.semantic_manifest_transformer import PydanticSemanticManifestTransformer

from metricflow.configuration.constants import CONFIG_MODEL_PATH
from metricflow.configuration.yaml_handler import YamlFileHandler
Expand Down Expand Up @@ -50,29 +47,6 @@ def build_semantic_manifest_from_config(handler: YamlFileHandler) -> SemanticMan
return model_build_result_from_config(handler=handler).semantic_manifest


def build_semantic_manifest_from_dbt_project_root() -> SemanticManifest:
"""In the dbt project root, retrieve the manifest path and parse the SemanticManifest."""
DEFAULT_TARGET_PATH = "target/semantic_manifest.json"
full_path_to_manifest = pathlib.Path(DEFAULT_TARGET_PATH).resolve()
if not full_path_to_manifest.exists():
raise ModelCreationException(
"Unable to find {DBT_PROJECT_ROOT}/"
+ DEFAULT_TARGET_PATH
+ "\nPlease ensure that you are running `mf` in the root directory of a dbt project"
+ " and that the semantic_manifest JSON exists."
)
try:
with open(full_path_to_manifest, "r") as file:
raw_contents = file.read()
raw_model = PydanticSemanticManifest.parse_raw(raw_contents)
# The serialized object in the dbt project does not have transformations applied to it,
# since dbt-specific transformations do not rely on the PydanticSemanticManifest implementation
model = PydanticSemanticManifestTransformer.transform(raw_model)
return model
except Exception as e:
raise ModelCreationException from e


def convert_to_datetime(datetime_str: Optional[str]) -> Optional[dt.datetime]:
"""Callback to convert string to datetime given as an iso8601 timestamp."""
if datetime_str is None:
Expand Down
2 changes: 2 additions & 0 deletions metricflow/test/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pathlib import Path
from typing import Iterator

import pytest
from dbt_semantic_interfaces.parsing.dir_to_model import (
parse_directory_of_yaml_files_to_semantic_manifest,
parse_yaml_files_to_validation_ready_semantic_manifest,
Expand Down Expand Up @@ -112,6 +113,7 @@ def test_health_checks(cli_runner: MetricFlowCliRunner) -> None: # noqa: D
assert resp.exit_code == 0


@pytest.mark.skip("Skipping tutorial tests pending update to work with dbt integration")
def test_tutorial(cli_runner: MetricFlowCliRunner) -> None: # noqa: D
cli_context = cli_runner.cli_context

Expand Down

0 comments on commit f00b86d

Please sign in to comment.