diff --git a/.changes/unreleased/Features-20230626-133439.yaml b/.changes/unreleased/Features-20230626-133439.yaml new file mode 100644 index 0000000000..7d0130cde2 --- /dev/null +++ b/.changes/unreleased/Features-20230626-133439.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Use dbt adapter to run queries and warehouse validations from MetricFlow CLI +time: 2023-06-26T13:34:39.613243-07:00 +custom: + Author: tlento + Issue: "624" diff --git a/metricflow/cli/cli_context.py b/metricflow/cli/cli_context.py index 2dbe590b57..be4d3f5540 100644 --- a/metricflow/cli/cli_context.py +++ b/metricflow/cli/cli_context.py @@ -125,7 +125,7 @@ def run_health_checks(self) -> Dict[str, Dict[str, str]]: def _initialize_metricflow_engine(self) -> None: """Initialize the MetricFlowEngine.""" try: - self._mf = MetricFlowEngine.from_dbt_project_root(self.config) + self._mf = MetricFlowEngine.from_dbt_project_root() except Exception as e: raise MetricFlowInitException from e diff --git a/metricflow/cli/dbt_connectors/dbt_config_accessor.py b/metricflow/cli/dbt_connectors/dbt_config_accessor.py new file mode 100644 index 0000000000..a35e298fb7 --- /dev/null +++ b/metricflow/cli/dbt_connectors/dbt_config_accessor.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +import dataclasses +import logging +from pathlib import Path +from typing import Type + +from dbt.adapters.base.impl import BaseAdapter +from dbt.adapters.factory import get_adapter_by_type +from dbt.cli.main import dbtRunner +from dbt.config.profile import Profile +from dbt.config.project import Project +from dbt.config.runtime import load_profile, load_project +from dbt_semantic_interfaces.implementations.semantic_manifest import PydanticSemanticManifest +from dbt_semantic_interfaces.pretty_print import pformat_big_objects +from dbt_semantic_interfaces.protocols.semantic_manifest import SemanticManifest +from dbt_semantic_interfaces.transformations.semantic_manifest_transformer import PydanticSemanticManifestTransformer +from typing_extensions import Self + +from metricflow.errors.errors import ModelCreationException + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass +class dbtArtifacts: + """Container with access to the dbt artifacts required to power the MetricFlow CLI.""" + + profile: Profile + project: Project + adapter: BaseAdapter + semantic_manifest: SemanticManifest + + @classmethod + def load_from_project_path(cls: Type[Self], project_path: Path) -> Self: + """Loads all dbt artifacts for the project associated with the given project path.""" + logger.info(f"Loading dbt artifacts for project located at {project_path}") + dbtRunner().invoke(["-q", "debug"], project_dir=str(project_path)) + profile = load_profile(str(project_path), {}) + project = load_project(str(project_path), version_check=False, profile=profile) + logger.info(f"Loaded project {project.project_name} with profile details:\n{pformat_big_objects(profile)}") + # dbt's get_adapter helper expects an AdapterRequiredConfig, but `project` is missing cli_vars + # In practice, get_adapter only actually requires HasCredentials, so we replicate the type extraction + # from get_adapter here rather than spinning up a full RuntimeConfig instance + # TODO: Move to a fully supported interface when one becomes available + adapter = get_adapter_by_type(profile.credentials.type) + semantic_manifest = dbtArtifacts.build_semantic_manifest_from_dbt_project_root(project_root=project_path) + return cls(profile=profile, project=project, adapter=adapter, semantic_manifest=semantic_manifest) + + @staticmethod + def build_semantic_manifest_from_dbt_project_root(project_root: Path) -> SemanticManifest: + """In the dbt project root, retrieve the manifest path and parse the SemanticManifest.""" + DEFAULT_TARGET_PATH = "target/semantic_manifest.json" + full_path_to_manifest = Path(project_root, DEFAULT_TARGET_PATH).resolve() + if not full_path_to_manifest.exists(): + raise ModelCreationException( + f"Unable to find {full_path_to_manifest}\n" + "Please ensure that you are running `mf` in the root directory of a dbt project " + "and that the semantic_manifest JSON exists." + ) + try: + with open(full_path_to_manifest, "r") as file: + raw_contents = file.read() + raw_model = PydanticSemanticManifest.parse_raw(raw_contents) + # The serialized object in the dbt project does not have all transformations applied to it at + # this time, which causes failures with input measure resolution. + # TODO: remove this transform call once the upstream changes are integrated into our dependency tree + model = PydanticSemanticManifestTransformer.transform(raw_model) + return model + except Exception as e: + raise ModelCreationException from e diff --git a/metricflow/cli/main.py b/metricflow/cli/main.py index 635fb9746e..35ca83578f 100644 --- a/metricflow/cli/main.py +++ b/metricflow/cli/main.py @@ -25,27 +25,19 @@ from metricflow.cli import PACKAGE_NAME from metricflow.cli.cli_context import CLIContext from metricflow.cli.constants import DEFAULT_RESULT_DECIMAL_PLACES, MAX_LIST_OBJECT_ELEMENTS +from metricflow.cli.dbt_connectors.adapter_backed_client import AdapterBackedSqlClient +from metricflow.cli.dbt_connectors.dbt_config_accessor import dbtArtifacts from metricflow.cli.tutorial import create_sample_data, gen_sample_model_configs, remove_sample_tables from metricflow.cli.utils import ( - MF_BIGQUERY_KEYS, - MF_CONFIG_KEYS, - MF_DATABRICKS_KEYS, - MF_POSTGRESQL_KEYS, - MF_REDSHIFT_KEYS, - MF_SNOWFLAKE_KEYS, exception_handler, - generate_duckdb_demo_keys, get_data_warehouse_config_link, query_options, start_end_time_options, ) -from metricflow.configuration.config_builder import YamlTemplateBuilder from metricflow.dag.dag_visualization import display_dag_as_svg from metricflow.dataflow.dataflow_plan_to_text import dataflow_plan_as_text from metricflow.engine.metricflow_engine import MetricFlowExplainResult, MetricFlowQueryRequest, MetricFlowQueryResult -from metricflow.engine.utils import build_semantic_manifest_from_dbt_project_root from metricflow.model.data_warehouse_model_validator import DataWarehouseModelValidator -from metricflow.sql_clients.common_client import SqlDialect from metricflow.telemetry.models import TelemetryLevel from metricflow.telemetry.reporter import TelemetryReporter, log_call @@ -101,72 +93,6 @@ def exit_signal_handler(signal_type: int, frame) -> None: # type: ignore signal.signal(signal.SIGTERM, exit_signal_handler) -@cli.command() -@click.option("--restart", is_flag=True, help="Wipe the config file and start over") -@pass_config -@log_call(module_name=__name__, telemetry_reporter=_telemetry_reporter) -def setup(cfg: CLIContext, restart: bool) -> None: - """Setup MetricFlow.""" - click.echo( - textwrap.dedent( - """\ - 🎉 Welcome to MetricFlow! 🎉 - """ - ) - ) - - path = pathlib.Path(cfg.config.file_path) - abs_path = path.absolute() - to_create = not path.exists() or restart - - # Seed the config template to the config file - if to_create: - dialect_map = { - SqlDialect.SNOWFLAKE.value: MF_SNOWFLAKE_KEYS, - SqlDialect.BIGQUERY.value: MF_BIGQUERY_KEYS, - SqlDialect.REDSHIFT.value: MF_REDSHIFT_KEYS, - SqlDialect.POSTGRESQL.value: MF_POSTGRESQL_KEYS, - SqlDialect.DUCKDB.value: generate_duckdb_demo_keys(config_dir=cfg.config.dir_path), - SqlDialect.DATABRICKS.value: MF_DATABRICKS_KEYS, - } - - click.echo("Please enter your data warehouse dialect.") - click.echo("Use 'duckdb' for a standalone demo.") - click.echo("") - dialect = click.prompt( - "Dialect", - type=click.Choice(sorted([x for x in dialect_map.keys()])), - show_choices=True, - ) - - # If there is a collision, prefer to use the key in the dialect. - config_keys = list(dialect_map[dialect]) - for mf_config_key in MF_CONFIG_KEYS: - if not any(x.key == mf_config_key.key for x in config_keys): - config_keys.append(mf_config_key) - - with open(abs_path, "w") as file: - YamlTemplateBuilder.write_yaml(config_keys, file) - - template_description = ( - f"A template config file has been created in {abs_path}.\n" - if to_create - else f"A template config file already exists in {abs_path}, so it was left alone.\n" - ) - click.echo( - textwrap.dedent( - f"""\ - 💻 {template_description} - If you are new to MetricFlow, we recommend you to run through our tutorial with `mf tutorial`\n - Next steps: - 1. Review and fill out relevant fields. - 2. Run `mf health-checks` to validate the data warehouse connection. - 3. Run `mf validate-configs` to validate the model configurations. - """ - ) - ) - - @cli.command() @click.option("-m", "--msg", is_flag=True, help="Output the final steps dialogue") @click.option("--skip-dw", is_flag=True, help="Skip the data warehouse health checks") @@ -176,7 +102,8 @@ def setup(cfg: CLIContext, restart: bool) -> None: @log_call(module_name=__name__, telemetry_reporter=_telemetry_reporter) def tutorial(ctx: click.core.Context, cfg: CLIContext, msg: bool, skip_dw: bool, drop_tables: bool) -> None: """Run user through a tutorial.""" - # This text is also located in the projects top-level README.md + click.echo("The mf tutorial command has not yet been updated to work with the dbt integration!") + exit() help_msg = textwrap.dedent( """\ 🤓 Please run the following steps, @@ -210,11 +137,7 @@ def tutorial(ctx: click.core.Context, cfg: CLIContext, msg: bool, skip_dw: bool, click.echo(help_msg) exit() - # Check if the MetricFlow configuration file exists - path = pathlib.Path(cfg.config.file_path) - if not path.absolute().exists(): - click.echo("💡 Please run `mf setup` to get your configs set up before going through the tutorial.") - exit() + # TODO: Add check for dbt project root here # Validate that the data warehouse connection is successful if not skip_dw: @@ -222,6 +145,7 @@ def tutorial(ctx: click.core.Context, cfg: CLIContext, msg: bool, skip_dw: bool, click.confirm("❓ Are the health-checks all passing? Please fix them before continuing", abort=True) click.echo("💡 For future reference, you can continue with the tutorial by adding `--skip-dw`\n") + # TODO: decide whether or not to allow management of tutorial datasets from the mf CLI and update accordingly if drop_tables: spinner = Halo(text="Dropping tables...", spinner="dots") spinner.start() @@ -239,7 +163,9 @@ def tutorial(ctx: click.core.Context, cfg: CLIContext, msg: bool, skip_dw: bool, spinner.succeed("📀 Sample tables have been successfully created into your data warehouse.") # Seed sample model file - model_path = os.path.join(cfg.config.dir_path, "sample_models") + # TODO: Make this work with the new dbt project locations. For now, put it in cwd to remove the reference + # to the model path from the MetricFlow config + model_path = os.path.join(os.getcwd(), "sample_models") pathlib.Path(model_path).mkdir(parents=True, exist_ok=True) click.echo(f"🤖 Attempting to generate model configs to your local filesystem in '{str(model_path)}'.") spinner = Halo(text="Dropping tables...", spinner="dots") @@ -672,9 +598,10 @@ def validate_configs( # Parsing Validation parsing_spinner = Halo(text="Building manifest from dbt project root", spinner="dots") parsing_spinner.start() + project_root = pathlib.Path.cwd() try: - semantic_manifest = build_semantic_manifest_from_dbt_project_root() + semantic_manifest = dbtArtifacts.build_semantic_manifest_from_dbt_project_root(project_root=project_root) parsing_spinner.succeed("🎉 Successfully parsed manifest from dbt project") except Exception as e: parsing_spinner.fail(f"Exception found when parsing manifest from dbt project ({str(e)})") @@ -698,7 +625,11 @@ def validate_configs( dw_results = SemanticManifestValidationResults() if not skip_dw: - dw_validator = DataWarehouseModelValidator(sql_client=cfg.sql_client, system_schema=cfg.mf_system_schema) + # fetch dbt adapters. This rebuilds the manifest again, but whatever. + dbt_artifacts = dbtArtifacts.load_from_project_path(project_path=project_root) + sql_client = AdapterBackedSqlClient(dbt_artifacts.adapter) + schema = dbt_artifacts.profile.credentials.schema + dw_validator = DataWarehouseModelValidator(sql_client=sql_client, system_schema=schema) dw_results = _data_warehouse_validations_runner( dw_validator=dw_validator, manifest=semantic_manifest, timeout=dw_timeout ) diff --git a/metricflow/configuration/config_builder.py b/metricflow/configuration/config_builder.py index 204b932d53..905714c113 100644 --- a/metricflow/configuration/config_builder.py +++ b/metricflow/configuration/config_builder.py @@ -1,10 +1,7 @@ from __future__ import annotations from dataclasses import dataclass -from typing import List, Optional, TextIO - -from ruamel.yaml import YAML -from ruamel.yaml.comments import CommentedMap +from typing import Optional @dataclass(frozen=True) @@ -14,15 +11,3 @@ class ConfigKey: key: str value: str = "" comment: Optional[str] = None - - -class YamlTemplateBuilder: - """Class to construct and write YAML files.""" - - @staticmethod - def write_yaml(config_keys: List[ConfigKey], file_stream: TextIO) -> None: # noqa: D - yaml = YAML() - file_data = CommentedMap() - for key in config_keys: - file_data.insert(0, key.key, key.value, comment=key.comment) - yaml.dump(file_data, file_stream) diff --git a/metricflow/engine/metricflow_engine.py b/metricflow/engine/metricflow_engine.py index 9b2fc880fd..ce76f089e5 100644 --- a/metricflow/engine/metricflow_engine.py +++ b/metricflow/engine/metricflow_engine.py @@ -5,12 +5,17 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum +from pathlib import Path from typing import List, Optional, Sequence import pandas as pd from dbt_semantic_interfaces.pretty_print import pformat_big_objects from dbt_semantic_interfaces.references import DimensionReference, EntityReference, MetricReference +from metricflow.cli.dbt_connectors.adapter_backed_client import AdapterBackedSqlClient +from metricflow.cli.dbt_connectors.dbt_config_accessor import ( + dbtArtifacts, +) from metricflow.configuration.constants import ( CONFIG_DWH_SCHEMA, ) @@ -31,7 +36,6 @@ from metricflow.engine.time_source import ServerTimeSource from metricflow.engine.utils import ( build_semantic_manifest_from_config, - build_semantic_manifest_from_dbt_project_root, ) from metricflow.errors.errors import ExecutionException from metricflow.execution.execution_plan import ExecutionPlan, SqlQuery @@ -306,11 +310,13 @@ def from_config(handler: YamlFileHandler) -> MetricFlowEngine: ) @staticmethod - def from_dbt_project_root(handler: YamlFileHandler) -> MetricFlowEngine: + def from_dbt_project_root() -> MetricFlowEngine: """Initialize MetricFlowEngine via the dbt project root directory.""" - sql_client = make_sql_client_from_config(handler) - system_schema = not_empty(handler.get_value(CONFIG_DWH_SCHEMA), CONFIG_DWH_SCHEMA, handler.url) - semantic_manifest_lookup = SemanticManifestLookup(build_semantic_manifest_from_dbt_project_root()) + dbt_artifacts = dbtArtifacts.load_from_project_path(Path.cwd()) + semantic_manifest_lookup = SemanticManifestLookup(dbt_artifacts.semantic_manifest) + sql_client = AdapterBackedSqlClient(dbt_artifacts.adapter) + # TODO: remove this if possible after the time spine schema is sourced from the semantic manifest + system_schema = dbt_artifacts.profile.credentials.schema return MetricFlowEngine( semantic_manifest_lookup=semantic_manifest_lookup, diff --git a/metricflow/engine/utils.py b/metricflow/engine/utils.py index 8e5157e4f4..b8c7b8d4c5 100644 --- a/metricflow/engine/utils.py +++ b/metricflow/engine/utils.py @@ -1,17 +1,14 @@ from __future__ import annotations import datetime as dt -import pathlib from typing import Optional from dateutil.parser import parse -from dbt_semantic_interfaces.implementations.semantic_manifest import PydanticSemanticManifest from dbt_semantic_interfaces.parsing.dir_to_model import ( SemanticManifestBuildResult, parse_directory_of_yaml_files_to_semantic_manifest, ) from dbt_semantic_interfaces.protocols.semantic_manifest import SemanticManifest -from dbt_semantic_interfaces.transformations.semantic_manifest_transformer import PydanticSemanticManifestTransformer from metricflow.configuration.constants import CONFIG_MODEL_PATH from metricflow.configuration.yaml_handler import YamlFileHandler @@ -50,29 +47,6 @@ def build_semantic_manifest_from_config(handler: YamlFileHandler) -> SemanticMan return model_build_result_from_config(handler=handler).semantic_manifest -def build_semantic_manifest_from_dbt_project_root() -> SemanticManifest: - """In the dbt project root, retrieve the manifest path and parse the SemanticManifest.""" - DEFAULT_TARGET_PATH = "target/semantic_manifest.json" - full_path_to_manifest = pathlib.Path(DEFAULT_TARGET_PATH).resolve() - if not full_path_to_manifest.exists(): - raise ModelCreationException( - "Unable to find {DBT_PROJECT_ROOT}/" - + DEFAULT_TARGET_PATH - + "\nPlease ensure that you are running `mf` in the root directory of a dbt project" - + " and that the semantic_manifest JSON exists." - ) - try: - with open(full_path_to_manifest, "r") as file: - raw_contents = file.read() - raw_model = PydanticSemanticManifest.parse_raw(raw_contents) - # The serialized object in the dbt project does not have transformations applied to it, - # since dbt-specific transformations do not rely on the PydanticSemanticManifest implementation - model = PydanticSemanticManifestTransformer.transform(raw_model) - return model - except Exception as e: - raise ModelCreationException from e - - def convert_to_datetime(datetime_str: Optional[str]) -> Optional[dt.datetime]: """Callback to convert string to datetime given as an iso8601 timestamp.""" if datetime_str is None: diff --git a/metricflow/test/cli/test_cli.py b/metricflow/test/cli/test_cli.py index 8d828ffe41..0f92741e20 100644 --- a/metricflow/test/cli/test_cli.py +++ b/metricflow/test/cli/test_cli.py @@ -8,6 +8,7 @@ from pathlib import Path from typing import Iterator +import pytest from dbt_semantic_interfaces.parsing.dir_to_model import ( parse_directory_of_yaml_files_to_semantic_manifest, parse_yaml_files_to_validation_ready_semantic_manifest, @@ -112,6 +113,7 @@ def test_health_checks(cli_runner: MetricFlowCliRunner) -> None: # noqa: D assert resp.exit_code == 0 +@pytest.mark.skip("Skipping tutorial tests pending update to work with dbt integration") def test_tutorial(cli_runner: MetricFlowCliRunner) -> None: # noqa: D cli_context = cli_runner.cli_context