diff --git a/CHANGELOG.md b/CHANGELOG.md index 7dd47a7682f..8e60b8f22c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Allow nullable `error_after` in source freshness ([#3874](https://github.com/dbt-labs/dbt-core/issues/3874), [#3955](https://github.com/dbt-labs/dbt-core/pull/3955)) - Increase performance of graph subset selection ([#4135](https://github.com/dbt-labs/dbt-core/issues/4135),[#4155](https://github.com/dbt-labs/dbt-core/pull/4155)) - Speed up node selection by skipping indirect node incorporation when not needed ([#4213](https://github.com/dbt-labs/dbt-core/issues/4213),[#4214](https://github.com/dbt-labs/dbt-core/pull/4214)) +- Add metrics nodes ([#4071](https://github.com/dbt-labs/dbt-core/issues/4071), [#4235](https://github.com/dbt-labs/dbt-core/pull/4235)) ### Fixes - Changes unit tests using `assertRaisesRegexp` to `assertRaisesRegex` ([#4136](https://github.com/dbt-labs/dbt-core/issues/4132), [#4136](https://github.com/dbt-labs/dbt-core/pull/4136)) diff --git a/core/dbt/compilation.py b/core/dbt/compilation.py index 52347db1875..b5c11005131 100644 --- a/core/dbt/compilation.py +++ b/core/dbt/compilation.py @@ -53,6 +53,7 @@ def print_compile_stats(stats): NodeType.Seed: 'seed file', NodeType.Source: 'source', NodeType.Exposure: 'exposure', + NodeType.Metric: 'metric' } results = {k: 0 for k in names.keys()} @@ -425,6 +426,8 @@ def link_graph(self, linker: Linker, manifest: Manifest, add_test_edges: bool = self.link_node(linker, node, manifest) for exposure in manifest.exposures.values(): self.link_node(linker, exposure, manifest) + for metric in manifest.metrics.values(): + self.link_node(linker, metric, manifest) cycle = linker.find_cycles() diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index 9fd2d64a7a8..28eec309251 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -32,6 +32,7 @@ from dbt.contracts.graph.parsed import ( ParsedMacro, ParsedExposure, + ParsedMetric, ParsedSeedNode, ParsedSourceDefinition, ) @@ -1386,6 +1387,31 @@ def generate_parse_exposure( } +class MetricRefResolver(BaseResolver): + def __call__(self, *args) -> str: + if len(args) not in (1, 2): + ref_invalid_args(self.model, args) + self.model.refs.append(list(args)) + return '' + + +def generate_parse_metrics( + metric: ParsedMetric, + config: RuntimeConfig, + manifest: Manifest, + package_name: str, +) -> Dict[str, Any]: + project = config.load_dependencies()[package_name] + return { + 'ref': MetricRefResolver( + None, + metric, + project, + manifest, + ), + } + + # This class is currently used by the schema parser in order # to limit the number of macros in the context by using # the TestMacroNamespace diff --git a/core/dbt/contracts/files.py b/core/dbt/contracts/files.py index f5889cd603b..3bf64340b27 100644 --- a/core/dbt/contracts/files.py +++ b/core/dbt/contracts/files.py @@ -223,6 +223,7 @@ class SchemaSourceFile(BaseSourceFile): tests: Dict[str, Any] = field(default_factory=dict) sources: List[str] = field(default_factory=list) exposures: List[str] = field(default_factory=list) + metrics: List[str] = field(default_factory=list) # node patches contain models, seeds, snapshots, analyses ndp: List[str] = field(default_factory=list) # any macro patches in this file by macro unique_id. diff --git a/core/dbt/contracts/graph/compiled.py b/core/dbt/contracts/graph/compiled.py index 06233cd4fd5..92113263186 100644 --- a/core/dbt/contracts/graph/compiled.py +++ b/core/dbt/contracts/graph/compiled.py @@ -6,6 +6,7 @@ ParsedHookNode, ParsedModelNode, ParsedExposure, + ParsedMetric, ParsedResource, ParsedRPCNode, ParsedSqlNode, @@ -232,8 +233,10 @@ def parsed_instance_for(compiled: CompiledNode) -> ParsedResource: ParsedSourceDefinition, ] -# anything that participates in the graph: sources, exposures, manifest nodes +# anything that participates in the graph: sources, exposures, metrics, +# or manifest nodes GraphMemberNode = Union[ CompileResultNode, ParsedExposure, + ParsedMetric, ] diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 2fbe0f30ed8..d382151eb7b 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -15,8 +15,8 @@ ) from dbt.contracts.graph.parsed import ( ParsedMacro, ParsedDocumentation, - ParsedSourceDefinition, ParsedExposure, HasUniqueID, - UnpatchedSourceDefinition, ManifestNodes + ParsedSourceDefinition, ParsedExposure, ParsedMetric, + HasUniqueID, UnpatchedSourceDefinition, ManifestNodes ) from dbt.contracts.graph.unparsed import SourcePatch from dbt.contracts.files import SourceFile, SchemaSourceFile, FileHash, AnySourceFile @@ -564,6 +564,7 @@ class Manifest(MacroMethods, DataClassMessagePackMixin, dbtClassMixin): macros: MutableMapping[str, ParsedMacro] = field(default_factory=dict) docs: MutableMapping[str, ParsedDocumentation] = field(default_factory=dict) exposures: MutableMapping[str, ParsedExposure] = field(default_factory=dict) + metrics: MutableMapping[str, ParsedMetric] = field(default_factory=dict) selectors: MutableMapping[str, Any] = field(default_factory=dict) files: MutableMapping[str, AnySourceFile] = field(default_factory=dict) metadata: ManifestMetadata = field(default_factory=ManifestMetadata) @@ -631,6 +632,9 @@ def sync_update_node( def update_exposure(self, new_exposure: ParsedExposure): _update_into(self.exposures, new_exposure) + def update_metric(self, new_metric: ParsedMetric): + _update_into(self.metrics, new_metric) + def update_node(self, new_node: ManifestNode): _update_into(self.nodes, new_node) @@ -648,6 +652,10 @@ def build_flat_graph(self): k: v.to_dict(omit_none=False) for k, v in self.exposures.items() }, + 'metrics': { + k: v.to_dict(omit_none=False) + for k, v in self.metrics.items() + }, 'nodes': { k: v.to_dict(omit_none=False) for k, v in self.nodes.items() @@ -700,7 +708,12 @@ def find_materialization_macro_by_name( def get_resource_fqns(self) -> Mapping[str, PathSet]: resource_fqns: Dict[str, Set[Tuple[str, ...]]] = {} - all_resources = chain(self.exposures.values(), self.nodes.values(), self.sources.values()) + all_resources = chain( + self.exposures.values(), + self.nodes.values(), + self.sources.values(), + self.metrics.values() + ) for resource in all_resources: resource_type_plural = resource.resource_type.pluralize() if resource_type_plural not in resource_fqns: @@ -728,6 +741,7 @@ def deepcopy(self): macros={k: _deepcopy(v) for k, v in self.macros.items()}, docs={k: _deepcopy(v) for k, v in self.docs.items()}, exposures={k: _deepcopy(v) for k, v in self.exposures.items()}, + metrics={k: _deepcopy(v) for k, v in self.metrics.items()}, selectors={k: _deepcopy(v) for k, v in self.selectors.items()}, metadata=self.metadata, disabled={k: _deepcopy(v) for k, v in self.disabled.items()}, @@ -740,6 +754,7 @@ def build_parent_and_child_maps(self): self.nodes.values(), self.sources.values(), self.exposures.values(), + self.metrics.values(), )) forward_edges, backward_edges = build_node_edges(edge_members) self.child_map = forward_edges @@ -761,6 +776,7 @@ def writable_manifest(self): macros=self.macros, docs=self.docs, exposures=self.exposures, + metrics=self.metrics, selectors=self.selectors, metadata=self.metadata, disabled=self.disabled, @@ -780,6 +796,8 @@ def expect(self, unique_id: str) -> GraphMemberNode: return self.sources[unique_id] elif unique_id in self.exposures: return self.exposures[unique_id] + elif unique_id in self.metrics: + return self.metrics[unique_id] else: # something terrible has happened raise dbt.exceptions.InternalException( @@ -1008,6 +1026,11 @@ def add_exposure(self, source_file: SchemaSourceFile, exposure: ParsedExposure): self.exposures[exposure.unique_id] = exposure source_file.exposures.append(exposure.unique_id) + def add_metric(self, source_file: SchemaSourceFile, metric: ParsedMetric): + _check_duplicates(metric, self.metrics) + self.metrics[metric.unique_id] = metric + source_file.metrics.append(metric.unique_id) + def add_disabled_nofile(self, node: CompileResultNode): # There can be multiple disabled nodes for the same unique_id if node.unique_id in self.disabled: @@ -1044,6 +1067,7 @@ def __reduce_ex__(self, protocol): self.macros, self.docs, self.exposures, + self.metrics, self.selectors, self.files, self.metadata, @@ -1101,6 +1125,11 @@ class WritableManifest(ArtifactMixin): 'The exposures defined in the dbt project and its dependencies' )) ) + metrics: Mapping[UniqueID, ParsedMetric] = field( + metadata=dict(description=( + 'The metrics defined in the dbt project and its dependencies' + )) + ) selectors: Mapping[UniqueID, Any] = field( metadata=dict(description=( 'The selectors defined in selectors.yml' diff --git a/core/dbt/contracts/graph/parsed.py b/core/dbt/contracts/graph/parsed.py index 90f6e036fe8..66a8fbc4e8a 100644 --- a/core/dbt/contracts/graph/parsed.py +++ b/core/dbt/contracts/graph/parsed.py @@ -26,7 +26,7 @@ UnparsedBaseNode, FreshnessThreshold, ExternalTable, HasYamlMetadata, MacroArgument, UnparsedSourceDefinition, UnparsedSourceTableDefinition, UnparsedColumn, TestDef, - ExposureOwner, ExposureType, MaturityType + ExposureOwner, ExposureType, MaturityType, MetricFilter ) from dbt.contracts.util import Replaceable, AdditionalPropertiesMixin from dbt.exceptions import warn_or_error @@ -783,6 +783,81 @@ def same_contents(self, old: Optional['ParsedExposure']) -> bool: ) +@dataclass +class ParsedMetric(UnparsedBaseNode, HasUniqueID, HasFqn): + model: str + name: str + description: str + label: str + type: str + sql: Optional[str] + timestamp: Optional[str] + filters: List[MetricFilter] + time_grains: List[str] + dimensions: List[str] + resource_type: NodeType = NodeType.Metric + meta: Dict[str, Any] = field(default_factory=dict) + tags: List[str] = field(default_factory=list) + sources: List[List[str]] = field(default_factory=list) + depends_on: DependsOn = field(default_factory=DependsOn) + refs: List[List[str]] = field(default_factory=list) + created_at: int = field(default_factory=lambda: int(time.time())) + + @property + def depends_on_nodes(self): + return self.depends_on.nodes + + @property + def search_name(self): + return self.name + + def same_model(self, old: 'ParsedMetric') -> bool: + return self.model == old.model + + def same_dimensions(self, old: 'ParsedMetric') -> bool: + return self.dimensions == old.dimensions + + def same_filters(self, old: 'ParsedMetric') -> bool: + return self.filters == old.filters + + def same_description(self, old: 'ParsedMetric') -> bool: + return self.description == old.description + + def same_label(self, old: 'ParsedMetric') -> bool: + return self.label == old.label + + def same_type(self, old: 'ParsedMetric') -> bool: + return self.type == old.type + + def same_sql(self, old: 'ParsedMetric') -> bool: + return self.sql == old.sql + + def same_timestamp(self, old: 'ParsedMetric') -> bool: + return self.timestamp == old.timestamp + + def same_time_grains(self, old: 'ParsedMetric') -> bool: + return self.time_grains == old.time_grains + + def same_contents(self, old: Optional['ParsedMetric']) -> bool: + # existing when it didn't before is a change! + # metadata/tags changes are not "changes" + if old is None: + return True + + return ( + self.same_model(old) and + self.same_dimensions(old) and + self.same_filters(old) and + self.same_description(old) and + self.same_label(old) and + self.same_type(old) and + self.same_sql(old) and + self.same_timestamp(old) and + self.same_time_grains(old) and + True + ) + + ManifestNodes = Union[ ParsedAnalysisNode, ParsedSingularTestNode, @@ -801,5 +876,6 @@ def same_contents(self, old: Optional['ParsedExposure']) -> bool: ParsedMacro, ParsedNode, ParsedExposure, + ParsedMetric, ParsedSourceDefinition, ] diff --git a/core/dbt/contracts/graph/unparsed.py b/core/dbt/contracts/graph/unparsed.py index 1b8c8285915..cc61e1bc7ad 100644 --- a/core/dbt/contracts/graph/unparsed.py +++ b/core/dbt/contracts/graph/unparsed.py @@ -446,3 +446,27 @@ class UnparsedExposure(dbtClassMixin, Replaceable): tags: List[str] = field(default_factory=list) url: Optional[str] = None depends_on: List[str] = field(default_factory=list) + + +@dataclass +class MetricFilter(dbtClassMixin, Replaceable): + field: str + operator: str + # TODO : Can we make this Any? + value: str + + +@dataclass +class UnparsedMetric(dbtClassMixin, Replaceable): + model: str + name: str + label: str + type: str + description: str = '' + sql: Optional[str] = None + timestamp: Optional[str] = None + time_grains: List[str] = field(default_factory=list) + dimensions: List[str] = field(default_factory=list) + filters: List[MetricFilter] = field(default_factory=list) + meta: Dict[str, Any] = field(default_factory=dict) + tags: List[str] = field(default_factory=list) diff --git a/core/dbt/graph/cli.py b/core/dbt/graph/cli.py index 9ac3c011c84..43e1180d17c 100644 --- a/core/dbt/graph/cli.py +++ b/core/dbt/graph/cli.py @@ -20,7 +20,7 @@ INTERSECTION_DELIMITER = ',' -DEFAULT_INCLUDES: List[str] = ['fqn:*', 'source:*', 'exposure:*'] +DEFAULT_INCLUDES: List[str] = ['fqn:*', 'source:*', 'exposure:*', 'metric:*'] DEFAULT_EXCLUDES: List[str] = [] diff --git a/core/dbt/graph/queue.py b/core/dbt/graph/queue.py index 5205cce655f..f631866bce1 100644 --- a/core/dbt/graph/queue.py +++ b/core/dbt/graph/queue.py @@ -5,7 +5,7 @@ from typing import Dict, Set, List, Generator, Optional from .graph import UniqueId -from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedExposure +from dbt.contracts.graph.parsed import ParsedSourceDefinition, ParsedExposure, ParsedMetric from dbt.contracts.graph.compiled import GraphMemberNode from dbt.contracts.graph.manifest import Manifest from dbt.node_types import NodeType @@ -47,8 +47,8 @@ def _include_in_cost(self, node_id: UniqueId) -> bool: node = self.manifest.expect(node_id) if node.resource_type != NodeType.Model: return False - # must be a Model - tell mypy this won't be a Source or Exposure - assert not isinstance(node, (ParsedSourceDefinition, ParsedExposure)) + # must be a Model - tell mypy this won't be a Source or Exposure or Metric + assert not isinstance(node, (ParsedSourceDefinition, ParsedExposure, ParsedMetric)) if node.is_ephemeral: return False return True diff --git a/core/dbt/graph/selector.py b/core/dbt/graph/selector.py index 949dc58c53c..abc8bcb74b9 100644 --- a/core/dbt/graph/selector.py +++ b/core/dbt/graph/selector.py @@ -167,6 +167,8 @@ def _is_graph_member(self, unique_id: UniqueId) -> bool: return source.config.enabled elif unique_id in self.manifest.exposures: return True + elif unique_id in self.manifest.metrics: + return True node = self.manifest.nodes[unique_id] return not node.empty and node.config.enabled @@ -184,6 +186,8 @@ def _is_match(self, unique_id: UniqueId) -> bool: node = self.manifest.sources[unique_id] elif unique_id in self.manifest.exposures: node = self.manifest.exposures[unique_id] + elif unique_id in self.manifest.metrics: + node = self.manifest.metrics[unique_id] else: raise InternalException( f'Node {unique_id} not found in the manifest!' diff --git a/core/dbt/graph/selector_methods.py b/core/dbt/graph/selector_methods.py index 7ac33136212..8df44144486 100644 --- a/core/dbt/graph/selector_methods.py +++ b/core/dbt/graph/selector_methods.py @@ -18,6 +18,7 @@ HasTestMetadata, ParsedSingularTestNode, ParsedExposure, + ParsedMetric, ParsedGenericTestNode, ParsedSourceDefinition, ) @@ -45,6 +46,7 @@ class MethodName(StrEnum): ResourceType = 'resource_type' State = 'state' Exposure = 'exposure' + Metric = 'metric' Result = 'result' @@ -72,7 +74,7 @@ def is_selected_node(fqn: List[str], node_selector: str): return True -SelectorTarget = Union[ParsedSourceDefinition, ManifestNode, ParsedExposure] +SelectorTarget = Union[ParsedSourceDefinition, ManifestNode, ParsedExposure, ParsedMetric] class SelectorMethod(metaclass=abc.ABCMeta): @@ -119,13 +121,25 @@ def exposure_nodes( continue yield unique_id, exposure + def metric_nodes( + self, + included_nodes: Set[UniqueId] + ) -> Iterator[Tuple[UniqueId, ParsedMetric]]: + + for key, metric in self.manifest.metrics.items(): + unique_id = UniqueId(key) + if unique_id not in included_nodes: + continue + yield unique_id, metric + def all_nodes( self, included_nodes: Set[UniqueId] ) -> Iterator[Tuple[UniqueId, SelectorTarget]]: yield from chain(self.parsed_nodes(included_nodes), self.source_nodes(included_nodes), - self.exposure_nodes(included_nodes)) + self.exposure_nodes(included_nodes), + self.metric_nodes(included_nodes)) def configurable_nodes( self, @@ -137,9 +151,10 @@ def configurable_nodes( def non_source_nodes( self, included_nodes: Set[UniqueId], - ) -> Iterator[Tuple[UniqueId, Union[ParsedExposure, ManifestNode]]]: + ) -> Iterator[Tuple[UniqueId, Union[ParsedExposure, ManifestNode, ParsedMetric]]]: yield from chain(self.parsed_nodes(included_nodes), - self.exposure_nodes(included_nodes)) + self.exposure_nodes(included_nodes), + self.metric_nodes(included_nodes)) @abc.abstractmethod def search( @@ -251,6 +266,33 @@ def search( yield node +class MetricSelectorMethod(SelectorMethod): + def search( + self, included_nodes: Set[UniqueId], selector: str + ) -> Iterator[UniqueId]: + parts = selector.split('.') + target_package = SELECTOR_GLOB + if len(parts) == 1: + target_name = parts[0] + elif len(parts) == 2: + target_package, target_name = parts + else: + msg = ( + 'Invalid metric selector value "{}". Metrics must be of ' + 'the form ${{metric_name}} or ' + '${{metric_package.metric_name}}' + ).format(selector) + raise RuntimeException(msg) + + for node, real_node in self.metric_nodes(included_nodes): + if target_package not in (real_node.package_name, SELECTOR_GLOB): + continue + if target_name not in (real_node.name, SELECTOR_GLOB): + continue + + yield node + + class PathSelectorMethod(SelectorMethod): def search( self, included_nodes: Set[UniqueId], selector: str @@ -512,6 +554,8 @@ def search( previous_node = manifest.sources[node] elif node in manifest.exposures: previous_node = manifest.exposures[node] + elif node in manifest.metrics: + previous_node = manifest.metrics[node] if checker(previous_node, real_node): yield node @@ -544,8 +588,10 @@ class MethodManager: MethodName.Config: ConfigSelectorMethod, MethodName.TestName: TestNameSelectorMethod, MethodName.TestType: TestTypeSelectorMethod, + MethodName.ResourceType: ResourceTypeSelectorMethod, MethodName.State: StateSelectorMethod, MethodName.Exposure: ExposureSelectorMethod, + MethodName.Metric: MetricSelectorMethod, MethodName.Result: ResultSelectorMethod, } diff --git a/core/dbt/node_types.py b/core/dbt/node_types.py index fd47bd15026..5f4e1c5e578 100644 --- a/core/dbt/node_types.py +++ b/core/dbt/node_types.py @@ -17,6 +17,7 @@ class NodeType(StrEnum): Source = 'source' Macro = 'macro' Exposure = 'exposure' + Metric = 'metric' @classmethod def executable(cls) -> List['NodeType']: @@ -49,7 +50,8 @@ def documentable(cls) -> List['NodeType']: cls.Source, cls.Macro, cls.Analysis, - cls.Exposure + cls.Exposure, + cls.Metric ] def pluralize(self) -> str: diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index bc679dec2de..d36d4a8e840 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -37,7 +37,7 @@ Manifest, Disabled, MacroManifest, ManifestStateCheck, ParsingInfo ) from dbt.contracts.graph.parsed import ( - ParsedSourceDefinition, ParsedNode, ParsedMacro, ColumnInfo, ParsedExposure + ParsedSourceDefinition, ParsedNode, ParsedMacro, ColumnInfo, ParsedExposure, ParsedMetric ) from dbt.contracts.util import Writable from dbt.exceptions import ( @@ -804,6 +804,10 @@ def process_refs(self, current_project: str): if exposure.created_at < self.started_at: continue _process_refs_for_exposure(self.manifest, current_project, exposure) + for metric in self.manifest.metrics.values(): + if metric.created_at < self.started_at: + continue + _process_refs_for_metric(self.manifest, current_project, metric) # nodes: node and column descriptions # sources: source and table descriptions, column descriptions @@ -850,6 +854,16 @@ def process_docs(self, config: RuntimeConfig): config.project_name, ) _process_docs_for_exposure(ctx, exposure) + for metric in self.manifest.metrics.values(): + if metric.created_at < self.started_at: + continue + ctx = generate_runtime_docs_context( + config, + metric, + self.manifest, + config.project_name, + ) + _process_docs_for_metrics(ctx, metric) # Loops through all nodes and exposures, for each element in # 'sources' array finds the source node and updates the @@ -1028,6 +1042,12 @@ def _process_docs_for_exposure( exposure.description = get_rendered(exposure.description, context) +def _process_docs_for_metrics( + context: Dict[str, Any], metric: ParsedMetric +) -> None: + metric.description = get_rendered(metric.description, context) + + def _process_refs_for_exposure( manifest: Manifest, current_project: str, exposure: ParsedExposure ): @@ -1069,6 +1089,47 @@ def _process_refs_for_exposure( manifest.update_exposure(exposure) +def _process_refs_for_metric( + manifest: Manifest, current_project: str, metric: ParsedMetric +): + """Given a manifest and a metric in that manifest, process its refs""" + for ref in metric.refs: + target_model: Optional[Union[Disabled, ManifestNode]] = None + target_model_name: str + target_model_package: Optional[str] = None + + if len(ref) == 1: + target_model_name = ref[0] + elif len(ref) == 2: + target_model_package, target_model_name = ref + else: + raise dbt.exceptions.InternalException( + f'Refs should always be 1 or 2 arguments - got {len(ref)}' + ) + + target_model = manifest.resolve_ref( + target_model_name, + target_model_package, + current_project, + metric.package_name, + ) + + if target_model is None or isinstance(target_model, Disabled): + # This may raise. Even if it doesn't, we don't want to add + # this exposure to the graph b/c there is no destination exposure + invalid_ref_fail_unless_test( + metric, target_model_name, target_model_package, + disabled=(isinstance(target_model, Disabled)) + ) + + continue + + target_model_id = target_model.unique_id + + metric.depends_on.nodes.append(target_model_id) + manifest.update_metric(metric) + + def _process_refs_for_node( manifest: Manifest, current_project: str, node: ManifestNode ): @@ -1139,6 +1200,30 @@ def _process_sources_for_exposure( manifest.update_exposure(exposure) +def _process_sources_for_metric( + manifest: Manifest, current_project: str, metric: ParsedMetric +): + target_source: Optional[Union[Disabled, ParsedSourceDefinition]] = None + for source_name, table_name in metric.sources: + target_source = manifest.resolve_source( + source_name, + table_name, + current_project, + metric.package_name, + ) + if target_source is None or isinstance(target_source, Disabled): + invalid_source_fail_unless_test( + metric, + source_name, + table_name, + disabled=(isinstance(target_source, Disabled)) + ) + continue + target_source_id = target_source.unique_id + metric.depends_on.nodes.append(target_source_id) + manifest.update_metric(metric) + + def _process_sources_for_node( manifest: Manifest, current_project: str, node: ManifestNode ): diff --git a/core/dbt/parser/partial.py b/core/dbt/parser/partial.py index 93db4f56b8e..b29d3387800 100644 --- a/core/dbt/parser/partial.py +++ b/core/dbt/parser/partial.py @@ -405,6 +405,18 @@ def schedule_nodes_for_parsing(self, unique_ids): if exposure_element: self.delete_schema_exposure(schema_file, exposure_element) self.merge_patch(schema_file, 'exposures', exposure_element) + elif unique_id in self.saved_manifest.metrics: + metric = self.saved_manifest.metrics[unique_id] + file_id = metric.file_id + if file_id in self.saved_files and file_id not in self.file_diff['deleted']: + schema_file = self.saved_files[file_id] + metrics = [] + if 'metrics' in schema_file.dict_from_yaml: + metrics = schema_file.dict_from_yaml['metrics'] + metric_element = self.get_schema_element(metrics, metric.name) + if metric_element: + self.delete_schema_metric(schema_file, metric_element) + self.merge_patch(schema_file, 'metrics', metric_element) elif unique_id in self.saved_manifest.macros: macro = self.saved_manifest.macros[unique_id] file_id = macro.file_id @@ -679,6 +691,19 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict self.delete_schema_exposure(schema_file, exposure) self.merge_patch(schema_file, dict_key, exposure) + # metrics + metric_diff = self.get_diff_for('metrics', saved_yaml_dict, new_yaml_dict) + if metric_diff['changed']: + for metric in metric_diff['changed']: + self.delete_schema_metric(schema_file, metric) + self.merge_patch(schema_file, 'metrics', metric) + if metric_diff['deleted']: + for metric in metric_diff['deleted']: + self.delete_schema_metric(schema_file, metric) + if metric_diff['added']: + for metric in metric_diff['added']: + self.merge_patch(schema_file, 'metrics', metric) + # Take a "section" of the schema file yaml dictionary from saved and new schema files # and determine which parts have changed def get_diff_for(self, key, saved_yaml_dict, new_yaml_dict): @@ -828,6 +853,20 @@ def delete_schema_exposure(self, schema_file, exposure_dict): schema_file.exposures.remove(unique_id) logger.debug(f"Partial parsing: deleted exposure {unique_id}") + # metric are created only from schema files, so just delete + # the metric. + def delete_schema_metric(self, schema_file, metric_dict): + metric_name = metric_dict['name'] + metrics = schema_file.metrics.copy() + for unique_id in metrics: + metric = self.saved_manifest.metrics[unique_id] + if unique_id in self.saved_manifest.metrics: + if metric.name == metric_name: + self.deleted_manifest.metrics[unique_id] = \ + self.saved_manifest.metrics.pop(unique_id) + schema_file.metrics.remove(unique_id) + logger.debug(f"Partial parsing: deleted metric {unique_id}") + def get_schema_element(self, elem_list, elem_name): for element in elem_list: if 'name' in element and element['name'] == elem_name: diff --git a/core/dbt/parser/schemas.py b/core/dbt/parser/schemas.py index 9c90cd8c04c..b25b050d614 100644 --- a/core/dbt/parser/schemas.py +++ b/core/dbt/parser/schemas.py @@ -19,7 +19,7 @@ ) from dbt.context.configured import generate_schema_yml_context, SchemaYamlVars from dbt.context.providers import ( - generate_parse_exposure, generate_test_context + generate_parse_exposure, generate_parse_metrics, generate_test_context ) from dbt.context.macro_resolver import MacroResolver from dbt.contracts.files import FileHash, SchemaSourceFile @@ -30,6 +30,7 @@ ParsedMacroPatch, UnpatchedSourceDefinition, ParsedExposure, + ParsedMetric, ) from dbt.contracts.graph.unparsed import ( HasColumnDocs, @@ -41,6 +42,7 @@ UnparsedMacroUpdate, UnparsedNodeUpdate, UnparsedExposure, + UnparsedMetric, UnparsedSourceDefinition, ) from dbt.exceptions import ( @@ -539,8 +541,14 @@ def parse_file(self, block: FileBlock, dct: Dict = None) -> None: # parse exposures if 'exposures' in dct: exp_parser = ExposureParser(self, yaml_block) - for node in exp_parser.parse(): - self.manifest.add_exposure(yaml_block.file, node) + for exposure_node in exp_parser.parse(): + self.manifest.add_exposure(yaml_block.file, exposure_node) + + # parse metrics + if 'metrics' in dct: + metric_parser = MetricParser(self, yaml_block) + for metric_node in metric_parser.parse(): + self.manifest.add_metric(yaml_block.file, metric_node) def check_format_version( @@ -1019,3 +1027,61 @@ def parse(self) -> Iterable[ParsedExposure]: raise ParsingException(msg) from exc parsed = self.parse_exposure(unparsed) yield parsed + + +class MetricParser(YamlReader): + def __init__(self, schema_parser: SchemaParser, yaml: YamlBlock): + super().__init__(schema_parser, yaml, NodeType.Metric.pluralize()) + self.schema_parser = schema_parser + self.yaml = yaml + + def parse_metric(self, unparsed: UnparsedMetric) -> ParsedMetric: + package_name = self.project.project_name + unique_id = f'{NodeType.Metric}.{package_name}.{unparsed.name}' + path = self.yaml.path.relative_path + + fqn = self.schema_parser.get_fqn_prefix(path) + fqn.append(unparsed.name) + + parsed = ParsedMetric( + package_name=package_name, + root_path=self.project.project_root, + path=path, + original_file_path=self.yaml.path.original_file_path, + unique_id=unique_id, + fqn=fqn, + model=unparsed.model, + name=unparsed.name, + description=unparsed.description, + label=unparsed.label, + type=unparsed.type, + sql=unparsed.sql, + timestamp=unparsed.timestamp, + dimensions=unparsed.dimensions, + time_grains=unparsed.time_grains, + filters=unparsed.filters, + meta=unparsed.meta, + tags=unparsed.tags, + ) + + ctx = generate_parse_metrics( + parsed, + self.root_project, + self.schema_parser.manifest, + package_name, + ) + model_ref = '{{ ' + unparsed.model + ' }}' + get_rendered( + model_ref, ctx, parsed, capture_macros=True + ) + return parsed + + def parse(self) -> Iterable[ParsedMetric]: + for data in self.get_key_dicts(): + try: + UnparsedMetric.validate(data) + unparsed = UnparsedMetric.from_dict(data) + except (ValidationError, JSONValidationException) as exc: + msg = error_context(self.yaml.path, self.key, data, exc) + raise ParsingException(msg) from exc + yield self.parse_metric(unparsed) diff --git a/core/dbt/task/list.py b/core/dbt/task/list.py index 19240321afa..8d34d2da7c5 100644 --- a/core/dbt/task/list.py +++ b/core/dbt/task/list.py @@ -2,7 +2,8 @@ from dbt.contracts.graph.parsed import ( ParsedExposure, - ParsedSourceDefinition + ParsedSourceDefinition, + ParsedMetric ) from dbt.graph import ResourceTypeSelector from dbt.task.runnable import GraphRunnableTask, ManifestTask @@ -20,6 +21,7 @@ class ListTask(GraphRunnableTask): NodeType.Test, NodeType.Source, NodeType.Exposure, + NodeType.Metric, )) ALL_RESOURCE_VALUES = DEFAULT_RESOURCE_VALUES | frozenset(( NodeType.Analysis, @@ -74,6 +76,8 @@ def _iterate_selected_nodes(self): yield self.manifest.sources[node] elif node in self.manifest.exposures: yield self.manifest.exposures[node] + elif node in self.manifest.metrics: + yield self.manifest.metrics[node] else: raise RuntimeException( f'Got an unexpected result from node selection: "{node}"' @@ -94,6 +98,11 @@ def generate_selectors(self): # exposures are searched for by pkg.exposure_name exposure_selector = '.'.join([node.package_name, node.name]) yield f'exposure:{exposure_selector}' + elif node.resource_type == NodeType.Metric: + assert isinstance(node, ParsedMetric) + # metrics are searched for by pkg.metric_name + metric_selector = '.'.join([node.package_name, node.name]) + yield f'metric:{metric_selector}' else: # everything else is from `fqn` yield '.'.join(node.fqn) diff --git a/test/integration/029_docs_generate_tests/test_docs_generate.py b/test/integration/029_docs_generate_tests/test_docs_generate.py index 4d7cb887a99..67f33b6e55c 100644 --- a/test/integration/029_docs_generate_tests/test_docs_generate.py +++ b/test/integration/029_docs_generate_tests/test_docs_generate.py @@ -1181,6 +1181,7 @@ def expected_seeded_manifest(self, model_database=None, quote_model=False): 'tags': [] } }, + 'metrics': {}, 'selectors': {}, 'parent_map': { 'model.test.model': ['seed.test.seed'], @@ -1599,6 +1600,7 @@ def expected_postgres_references_manifest(self, model_database=None): 'url': 'http://example.com/notebook/1' }, }, + 'metrics': {}, 'selectors': {}, 'docs': { 'dbt.__overview__': ANY, @@ -1802,7 +1804,7 @@ def verify_manifest(self, expected_manifest): manifest = _read_json('./target/manifest.json') manifest_keys = frozenset({ - 'nodes', 'sources', 'macros', 'parent_map', 'child_map', + 'nodes', 'sources', 'macros', 'parent_map', 'child_map', 'metrics', 'docs', 'metadata', 'docs', 'disabled', 'exposures', 'selectors', }) diff --git a/test/integration/068_partial_parsing_tests/test-files/my_metric.yml b/test/integration/068_partial_parsing_tests/test-files/my_metric.yml new file mode 100644 index 00000000000..d91de1607cc --- /dev/null +++ b/test/integration/068_partial_parsing_tests/test-files/my_metric.yml @@ -0,0 +1,23 @@ +version: 2 +metrics: + - name: new_customers + label: New Customers + model: customers + description: "The number of paid customers who are using the product" + type: count + sql: user_id + timestamp: signup_date + time_grains: [day, week, month] + dimensions: + - plan + - country + filters: + - field: is_paying + value: True + operator: '=' + +meta: + is_okr: True + tags: + - okrs + + diff --git a/test/integration/068_partial_parsing_tests/test-files/people.sql b/test/integration/068_partial_parsing_tests/test-files/people.sql new file mode 100644 index 00000000000..ce58d41a599 --- /dev/null +++ b/test/integration/068_partial_parsing_tests/test-files/people.sql @@ -0,0 +1,3 @@ +select 1 as id, 'Drew' as first_name, 'Banin' as last_name, 'yellow' as favorite_color, true as loves_dbt, 5 as tenure, current_timestamp as created_at +union all +select 1 as id, 'Jeremy' as first_name, 'Cohen' as last_name, 'indigo' as favorite_color, true as loves_dbt, 4 as tenure, current_timestamp as created_at diff --git a/test/integration/068_partial_parsing_tests/test-files/people_metrics.yml b/test/integration/068_partial_parsing_tests/test-files/people_metrics.yml new file mode 100644 index 00000000000..763bc0bcafb --- /dev/null +++ b/test/integration/068_partial_parsing_tests/test-files/people_metrics.yml @@ -0,0 +1,30 @@ +version: 2 + +metrics: + + - model: "ref('people')" + name: number_of_people + description: Total count of people + label: "Number of people" + type: count + sql: "*" + timestamp: created_at + time_grains: [day, week, month] + dimensions: + - favorite_color + - loves_dbt + meta: + my_meta: 'testing' + + - model: "ref('people')" + name: collective_tenure + description: Total number of years of team experience + label: "Collective tenure" + type: sum + sql: tenure + timestamp: created_at + time_grains: [day] + filters: + - field: loves_dbt + operator: is + value: 'true' diff --git a/test/integration/068_partial_parsing_tests/test-files/people_metrics2.yml b/test/integration/068_partial_parsing_tests/test-files/people_metrics2.yml new file mode 100644 index 00000000000..96b10a89510 --- /dev/null +++ b/test/integration/068_partial_parsing_tests/test-files/people_metrics2.yml @@ -0,0 +1,30 @@ +version: 2 + +metrics: + + - model: "ref('people')" + name: number_of_people + description: Total count of people + label: "Number of people" + type: count + sql: "*" + timestamp: created_at + time_grains: [day, week, month] + dimensions: + - favorite_color + - loves_dbt + meta: + my_meta: 'replaced' + + - model: "ref('people')" + name: collective_tenure + description: Total number of years of team experience + label: "Collective tenure" + type: sum + sql: tenure + timestamp: created_at + time_grains: [day] + filters: + - field: loves_dbt + operator: is + value: 'true' diff --git a/test/integration/068_partial_parsing_tests/test_pp_metrics.py b/test/integration/068_partial_parsing_tests/test_pp_metrics.py new file mode 100644 index 00000000000..cc0315b1d12 --- /dev/null +++ b/test/integration/068_partial_parsing_tests/test_pp_metrics.py @@ -0,0 +1,85 @@ +from dbt.exceptions import CompilationException, UndefinedMacroException +from dbt.contracts.graph.manifest import Manifest +from dbt.contracts.files import ParseFileType +from dbt.contracts.results import TestStatus +from dbt.parser.partial import special_override_macros +from test.integration.base import DBTIntegrationTest, use_profile, normalize, get_manifest +import shutil +import os + + +# Note: every test case needs to have separate directories, otherwise +# they will interfere with each other when tests are multi-threaded + +class BasePPTest(DBTIntegrationTest): + + @property + def schema(self): + return "test_068A" + + @property + def models(self): + return "models" + + @property + def project_config(self): + return { + 'config-version': 2, + 'data-paths': ['seeds'], + 'test-paths': ['tests'], + 'macro-paths': ['macros'], + 'analysis-paths': ['analyses'], + 'snapshot-paths': ['snapshots'], + 'seeds': { + 'quote_columns': False, + }, + } + + def setup_directories(self): + # Create the directories for the test in the `self.test_root_dir` + # directory after everything else is symlinked. We can copy to and + # delete files in this directory without tests interfering with each other. + os.mkdir(os.path.join(self.test_root_dir, 'models')) + os.mkdir(os.path.join(self.test_root_dir, 'tests')) + os.mkdir(os.path.join(self.test_root_dir, 'seeds')) + os.mkdir(os.path.join(self.test_root_dir, 'macros')) + os.mkdir(os.path.join(self.test_root_dir, 'analyses')) + os.mkdir(os.path.join(self.test_root_dir, 'snapshots')) + + + +class MetricsTest(BasePPTest): + + @use_profile('postgres') + def test_postgres_env_vars_models(self): + self.setup_directories() + # initial run + self.copy_file('test-files/people.sql', 'models/people.sql') + results = self.run_dbt(["run"]) + self.assertEqual(len(results), 1) + manifest = get_manifest() + self.assertEqual(len(manifest.nodes), 1) + + # Add metrics yaml file + self.copy_file('test-files/people_metrics.yml', 'models/people_metrics.yml') + results = self.run_dbt(["run"]) + self.assertEqual(len(results), 1) + manifest = get_manifest() + self.assertEqual(len(manifest.metrics), 2) + metric_people_id = 'metric.test.number_of_people' + metric_tenure_id = 'metric.test.collective_tenure' + metric_people = manifest.metrics[metric_people_id] + metric_tenure = manifest.metrics[metric_tenure_id] + expected_meta = {'my_meta': 'testing'} + self.assertEqual(metric_people.meta, expected_meta) + self.assertEqual(metric_people.refs, [['people']]) + self.assertEqual(metric_tenure.refs, [['people']]) + + # Change metrics yaml files + self.copy_file('test-files/people_metrics2.yml', 'models/people_metrics.yml') + results = self.run_dbt(["run"]) + self.assertEqual(len(results), 1) + manifest = get_manifest() + metric_people = manifest.metrics[metric_people_id] + expected_meta = {'my_meta': 'replaced'} + self.assertEqual(metric_people.meta, expected_meta) diff --git a/test/unit/test_compiler.py b/test/unit/test_compiler.py index 46ce72ee807..6421156affd 100644 --- a/test/unit/test_compiler.py +++ b/test/unit/test_compiler.py @@ -124,6 +124,7 @@ def test__prepend_ctes__already_has_cte(self): disabled=[], files={}, exposures={}, + metrics={}, selectors={}, ) @@ -186,6 +187,7 @@ def test__prepend_ctes__no_ctes(self): disabled=[], files={}, exposures={}, + metrics={}, selectors={}, ) @@ -260,6 +262,7 @@ def test__prepend_ctes(self): disabled=[], files={}, exposures={}, + metrics={}, selectors={}, ) @@ -360,6 +363,7 @@ def test__prepend_ctes__cte_not_compiled(self): disabled=[], files={}, exposures={}, + metrics={}, selectors={}, ) @@ -451,6 +455,7 @@ def test__prepend_ctes__multiple_levels(self): disabled=[], files={}, exposures={}, + metrics={}, selectors={}, ) @@ -537,6 +542,7 @@ def test__prepend_ctes__valid_ephemeral_sql(self): disabled=[], files={}, exposures={}, + metrics={}, selectors={}, ) diff --git a/test/unit/test_contracts_graph_parsed.py b/test/unit/test_contracts_graph_parsed.py index 63b3be78a08..d8aa9e60121 100644 --- a/test/unit/test_contracts_graph_parsed.py +++ b/test/unit/test_contracts_graph_parsed.py @@ -22,6 +22,7 @@ ParsedNodePatch, ParsedMacro, ParsedExposure, + ParsedMetric, ParsedSeedNode, Docs, MacroDependsOn, @@ -33,6 +34,7 @@ ) from dbt.contracts.graph.unparsed import ( ExposureType, + MetricFilter, FreshnessThreshold, MaturityType, Quoting, @@ -2183,3 +2185,79 @@ def test_compare_unchanged_parsed_exposure(func, basic_parsed_exposure_object): def test_compare_changed_exposure(func, basic_parsed_exposure_object): node, compare = func(basic_parsed_exposure_object) assert not node.same_contents(compare) + + +# METRICS +@pytest.fixture +def minimal_parsed_metric_dict(): + return { + 'name': 'my_metric', + 'type': 'count', + 'timestamp': 'created_at', + 'time_grains': ['day'], + 'fqn': ['test', 'metrics', 'my_metric'], + 'unique_id': 'metric.test.my_metric', + 'package_name': 'test', + 'meta': {}, + 'tags': [], + 'path': 'models/something.yml', + 'root_path': '/usr/src/app', + 'original_file_path': 'models/something.yml', + 'description': '', + 'created_at': 1.0, + } + + +@pytest.fixture +def basic_parsed_metric_dict(): + return { + 'name': 'new_customers', + 'label': 'New Customers', + 'model': 'ref("dim_customers")', + 'type': 'count', + 'sql': 'user_id', + 'timestamp': 'signup_date', + 'time_grains': ['day', 'week', 'month'], + 'dimensions': ['plan', 'country'], + 'filters': [ + { + "field": "is_paying", + "value": "true", + "operator": "=", + } + ], + 'resource_type': 'metric', + 'refs': [['dim_customers']], + 'sources': [], + 'fqn': ['test', 'metrics', 'my_metric'], + 'unique_id': 'metric.test.my_metric', + 'package_name': 'test', + 'path': 'models/something.yml', + 'root_path': '/usr/src/app', + 'original_file_path': 'models/something.yml', + 'description': '', + 'meta': {}, + 'tags': [], + 'created_at': 1.0, + 'depends_on': { + 'nodes': [], + 'macros': [], + }, + } + + +@pytest.fixture +def basic_parsed_metric_object(): + return ParsedMetric( + name='my_metric', + type='count', + fqn=['test', 'metrics', 'my_metric'], + unique_id='metric.test.my_metric', + package_name='test', + path='models/something.yml', + root_path='/usr/src/app', + original_file_path='models/something.yml', + description='', + meta={}, + tags=[] + ) diff --git a/test/unit/test_contracts_graph_unparsed.py b/test/unit/test_contracts_graph_unparsed.py index 097325ac08b..f82e617f55c 100644 --- a/test/unit/test_contracts_graph_unparsed.py +++ b/test/unit/test_contracts_graph_unparsed.py @@ -7,7 +7,7 @@ FreshnessThreshold, Quoting, UnparsedSourceDefinition, UnparsedSourceTableDefinition, UnparsedDocumentationFile, UnparsedColumn, UnparsedNodeUpdate, Docs, UnparsedExposure, MaturityType, ExposureOwner, - ExposureType + ExposureType, UnparsedMetric, MetricFilter ) from dbt.contracts.results import FreshnessStatus from dbt.node_types import NodeType @@ -659,3 +659,73 @@ def test_bad_tags(self): tst = self.get_ok_dict() tst['tags'] = [123] self.assert_fails_validation(tst) + + +class TestUnparsedMetric(ContractTestCase): + ContractType = UnparsedMetric + + def get_ok_dict(self): + return { + 'name': 'new_customers', + 'label': 'New Customers', + 'model': 'ref("dim_customers")', + 'description': 'New customers', + 'type': 'count', + 'sql': 'user_id', + 'timestamp': 'signup_date', + 'time_grains': ['day', 'week', 'month'], + 'dimensions': ['plan', 'country'], + 'filters': [ + { + "field": "is_paying", + "value": "True", + "operator": "=", + } + ], + 'tags': [], + 'meta': { + 'is_okr': True + }, + } + + def test_ok(self): + metric = self.ContractType( + name='new_customers', + label='New Customers', + model='ref("dim_customers")', + description="New customers", + type='count', + sql="user_id", + timestamp="signup_date", + time_grains=['day', 'week', 'month'], + dimensions=['plan', 'country'], + filters=[MetricFilter( + field="is_paying", + value='True', + operator="=", + )], + meta={'is_okr': True}, + ) + dct = self.get_ok_dict() + self.assert_symmetric(metric, dct) + pickle.loads(pickle.dumps(metric)) + + def test_bad_metric_no_type(self): + tst = self.get_ok_dict() + del tst['type'] + self.assert_fails_validation(tst) + + def test_bad_metric_no_model(self): + tst = self.get_ok_dict() + tst['model'] = None + self.assert_fails_validation(tst) + + def test_bad_filter_missing_things(self): + tst = self.get_ok_dict() + del tst['filters'][0]['operator'] + self.assert_fails_validation(tst) + + def test_bad_tags(self): + tst = self.get_ok_dict() + tst['tags'] = [123] + self.assert_fails_validation(tst) diff --git a/test/unit/test_graph_selector_methods.py b/test/unit/test_graph_selector_methods.py index 6fe9a46f880..0bd57015bb3 100644 --- a/test/unit/test_graph_selector_methods.py +++ b/test/unit/test_graph_selector_methods.py @@ -12,6 +12,7 @@ ParsedMacro, ParsedModelNode, ParsedExposure, + ParsedMetric, ParsedSeedNode, ParsedSingularTestNode, ParsedGenericTestNode, @@ -21,7 +22,7 @@ ColumnInfo, ) from dbt.contracts.graph.manifest import Manifest -from dbt.contracts.graph.unparsed import ExposureType, ExposureOwner +from dbt.contracts.graph.unparsed import ExposureType, ExposureOwner, MetricFilter from dbt.contracts.state import PreviousState from dbt.node_types import NodeType from dbt.graph.selector_methods import ( @@ -36,6 +37,7 @@ TestTypeSelectorMethod, StateSelectorMethod, ExposureSelectorMethod, + MetricSelectorMethod, ) import dbt.exceptions import dbt.contracts.graph.parsed @@ -342,6 +344,35 @@ def make_exposure(pkg, name, path=None, fqn_extras=None, owner=None): ) +def make_metric(pkg, name, path=None): + if path is None: + path = 'schema.yml' + + return ParsedMetric( + name=name, + path='schema.yml', + package_name=pkg, + root_path='/usr/src/app', + original_file_path=path, + unique_id=f'metric.{pkg}.{name}', + fqn=[pkg, 'metrics', name], + label='New Customers', + model='ref("multi")', + description="New customers", + type='count', + sql="user_id", + timestamp="signup_date", + time_grains=['day', 'week', 'month'], + dimensions=['plan', 'country'], + filters=[MetricFilter( + field="is_paying", + value=True, + operator="=", + )], + meta={'is_okr': True}, + tags=['okrs'], + ) + @pytest.fixture def macro_test_unique(): @@ -571,6 +602,7 @@ def manifest(seed, source, ephemeral_model, view_model, table_model, ext_source, docs={}, files={}, exposures={}, + metrics={}, disabled=[], selectors={}, ) @@ -579,7 +611,7 @@ def manifest(seed, source, ephemeral_model, view_model, table_model, ext_source, def search_manifest_using_method(manifest, method, selection): selected = method.search(set(manifest.nodes) | set( - manifest.sources) | set(manifest.exposures), selection) + manifest.sources) | set(manifest.exposures) | set(manifest.metrics), selection) results = {manifest.expect(uid).search_name for uid in selected} return results @@ -735,6 +767,18 @@ def test_select_exposure(manifest): manifest, method, 'not_my_exposure') +def test_select_metric(manifest): + metric = make_metric('test', 'my_metric') + manifest.metrics[metric.unique_id] = metric + methods = MethodManager(manifest, None) + method = methods.get_method('metric', []) + assert isinstance(method, MetricSelectorMethod) + assert search_manifest_using_method( + manifest, method, 'my_metric') == {'my_metric'} + assert not search_manifest_using_method( + manifest, method, 'not_my_metric') + + @pytest.fixture def previous_state(manifest): writable = copy.deepcopy(manifest).writable_manifest() diff --git a/test/unit/test_manifest.py b/test/unit/test_manifest.py index 27d2639ffd2..47405e2811f 100644 --- a/test/unit/test_manifest.py +++ b/test/unit/test_manifest.py @@ -21,12 +21,14 @@ ParsedSeedNode, ParsedSourceDefinition, ParsedExposure, + ParsedMetric ) from dbt.contracts.graph.unparsed import ( ExposureType, ExposureOwner, - MaturityType + MaturityType, + MetricFilter ) from dbt.contracts.graph.compiled import CompiledModelNode @@ -93,6 +95,37 @@ def setUp(self): ) } + self.metrics = { + 'metric.root.my_metric': ParsedMetric( + name='new_customers', + label='New Customers', + model='ref("multi")', + description="New customers", + type='count', + sql="user_id", + timestamp="signup_date", + time_grains=['day', 'week', 'month'], + dimensions=['plan', 'country'], + filters=[MetricFilter( + field="is_paying", + value='True', + operator="=", + )], + meta={'is_okr': True}, + tags=['okrs'], + resource_type=NodeType.Metric, + depends_on=DependsOn(nodes=['model.root.multi']), + refs=[['multi']], + sources=[], + fqn=['root', 'my_metric'], + unique_id='metric.root.my_metric', + package_name='root', + root_path='', + path='my_metric.yml', + original_file_path='my_metric.yml' + ) + } + self.nested_nodes = { 'model.snowplow.events': ParsedModelNode( name='events', @@ -243,6 +276,8 @@ def setUp(self): } for exposure in self.exposures.values(): exposure.validate(exposure.to_dict(omit_none=True)) + for metric in self.metrics.values(): + metric.validate(metric.to_dict(omit_none=True)) for node in self.nested_nodes.values(): node.validate(node.to_dict(omit_none=True)) for source in self.sources.values(): @@ -257,7 +292,7 @@ def tearDown(self): def test__no_nodes(self): manifest = Manifest( nodes={}, sources={}, macros={}, docs={}, disabled={}, files={}, - exposures={}, selectors={}, + exposures={}, metrics={}, selectors={}, metadata=ManifestMetadata(generated_at=datetime.utcnow()), ) self.assertEqual( @@ -267,6 +302,7 @@ def test__no_nodes(self): 'sources': {}, 'macros': {}, 'exposures': {}, + 'metrics': {}, 'selectors': {}, 'parent_map': {}, 'child_map': {}, @@ -287,7 +323,7 @@ def test__nested_nodes(self): nodes = copy.copy(self.nested_nodes) manifest = Manifest( nodes=nodes, sources={}, macros={}, docs={}, disabled={}, files={}, - exposures={}, selectors={}, + exposures={}, metrics={}, selectors={}, metadata=ManifestMetadata(generated_at=datetime.utcnow()), ) serialized = manifest.writable_manifest().to_dict(omit_none=True) @@ -352,17 +388,21 @@ def test__nested_nodes(self): def test__build_flat_graph(self): exposures = copy.copy(self.exposures) + metrics = copy.copy(self.metrics) nodes = copy.copy(self.nested_nodes) sources = copy.copy(self.sources) manifest = Manifest(nodes=nodes, sources=sources, macros={}, docs={}, - disabled={}, files={}, exposures=exposures, selectors={}) + disabled={}, files={}, exposures=exposures, + metrics=metrics, selectors={}) manifest.build_flat_graph() flat_graph = manifest.flat_graph flat_exposures = flat_graph['exposures'] + flat_metrics = flat_graph['metrics'] flat_nodes = flat_graph['nodes'] flat_sources = flat_graph['sources'] - self.assertEqual(set(flat_graph), set(['exposures', 'nodes', 'sources'])) + self.assertEqual(set(flat_graph), set(['exposures', 'nodes', 'sources', 'metrics'])) self.assertEqual(set(flat_exposures), set(self.exposures)) + self.assertEqual(set(flat_metrics), set(self.metrics)) self.assertEqual(set(flat_nodes), set(self.nested_nodes)) self.assertEqual(set(flat_sources), set(self.sources)) for node in flat_nodes.values(): @@ -412,6 +452,7 @@ def test_no_nodes_with_metadata(self, mock_user): 'sources': {}, 'macros': {}, 'exposures': {}, + 'metrics': {}, 'selectors': {}, 'parent_map': {}, 'child_map': {}, @@ -459,8 +500,12 @@ def test_get_resource_fqns(self): checksum=FileHash.empty(), ) manifest = Manifest(nodes=nodes, sources=self.sources, macros={}, docs={}, - disabled={}, files={}, exposures=self.exposures, selectors={}) + disabled={}, files={}, exposures=self.exposures, + metrics=self.metrics, selectors={}) expect = { + 'metrics': frozenset([ + ('root', 'my_metric') + ]), 'exposures': frozenset([ ('root', 'my_exposure') ]), @@ -654,6 +699,7 @@ def test__no_nodes(self): 'macros': {}, 'sources': {}, 'exposures': {}, + 'metrics': {}, 'selectors': {}, 'parent_map': {}, 'child_map': {}, @@ -743,7 +789,7 @@ def test__build_flat_graph(self): manifest.build_flat_graph() flat_graph = manifest.flat_graph flat_nodes = flat_graph['nodes'] - self.assertEqual(set(flat_graph), set(['exposures', 'nodes', 'sources'])) + self.assertEqual(set(flat_graph), set(['exposures', 'metrics', 'nodes', 'sources'])) self.assertEqual(set(flat_nodes), set(self.nested_nodes)) compiled_count = 0 for node in flat_nodes.values(): @@ -788,6 +834,7 @@ def setUp(self): disabled={}, files={}, exposures={}, + metrics={}, selectors={}, ) @@ -809,6 +856,7 @@ def make_manifest(nodes=[], sources=[], macros=[], docs=[]): disabled={}, files={}, exposures={}, + metrics={}, selectors={}, )