Skip to content

Commit

Permalink
Support custom granularity in linkable spec resolution (#1415)
Browse files Browse the repository at this point in the history
Further updates to linkable spec resolution needed to support custom
granularities.

This also fixes a bug related to cumulative metric restrictions:
Cumulative metrics CAN be queried with non-default granularity now, so
this removes that restriction. This was not an issue in actual queries
because that logic was only hit when querying metadata, but it could
result in a bug when listing dimensions for a cumulative metric. Instead
of restricting derived time granularity, restrict cumulative metrics
from being used with date part (this matches existing query behavior).
This change required adding a `DATE_PART` `LinkableElementProperty` and
updating `LinkableDimensions` accordingly, so that's what you'll see in
the snapshot changes.
  • Loading branch information
courtneyholcomb authored Sep 24, 2024
1 parent 9f6af7e commit 963b4e1
Show file tree
Hide file tree
Showing 21 changed files with 1,218 additions and 1,177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class LinkableElementProperty(Enum):
METRIC_TIME = "metric_time"
# Refers to a metric, not a dimension.
METRIC = "metric"
# A time dimension with a DatePart.
DATE_PART = "date_part"

@staticmethod
def all_properties() -> FrozenSet[LinkableElementProperty]: # noqa: D102
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ class SemanticManifestLookup:

def __init__(self, semantic_manifest: SemanticManifest) -> None: # noqa: D107
self._semantic_manifest = semantic_manifest
self._semantic_model_lookup = SemanticModelLookup(semantic_manifest)
self._metric_lookup = MetricLookup(self._semantic_manifest, self._semantic_model_lookup)
self._time_spine_sources = TimeSpineSource.build_standard_time_spine_sources(semantic_manifest)
self._custom_granularities = TimeSpineSource.build_custom_granularities(list(self._time_spine_sources.values()))
self._semantic_model_lookup = SemanticModelLookup(
model=semantic_manifest, custom_granularities=self._custom_granularities
)
self._metric_lookup = MetricLookup(
semantic_manifest=self._semantic_manifest,
semantic_model_lookup=self._semantic_model_lookup,
custom_granularities=self._custom_granularities,
)

@property
def semantic_manifest(self) -> SemanticManifest: # noqa: D102
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from metricflow_semantics.model.semantics.linkable_element import (
ElementPathKey,
LinkableDimension,
LinkableElementType,
LinkableEntity,
LinkableMetric,
MetricSubqueryJoinPathElement,
Expand Down Expand Up @@ -100,18 +99,10 @@ def __init__(
self._metric_references_to_metrics[MetricReference(metric.name)] = metric
linkable_sets_for_measure = []
for measure in metric.measure_references:
# Cumulative metrics currently can't be queried by other time granularities.
if metric.type is MetricType.CUMULATIVE:
linkable_sets_for_measure.append(
self._get_linkable_element_set_for_measure(measure).filter(
with_any_of=LinkableElementProperty.all_properties(),
# Use filter() here becasue `without_all_of` param is only available on that method.
without_all_of=frozenset(
{
LinkableElementProperty.METRIC_TIME,
LinkableElementProperty.DERIVED_TIME_GRANULARITY,
}
),
self._get_linkable_element_set_for_measure(
measure, without_any_of=frozenset({LinkableElementProperty.DATE_PART})
)
)
elif (
Expand Down Expand Up @@ -216,6 +207,8 @@ def create(
properties = set(with_properties)
if time_granularity.is_custom_granularity or time_granularity.base_granularity != defined_time_granularity:
properties.add(LinkableElementProperty.DERIVED_TIME_GRANULARITY)
if date_part:
properties.add(LinkableElementProperty.DATE_PART)
return LinkableDimension.create(
defined_in_semantic_model=semantic_model_origin,
element_name=dimension.reference.element_name,
Expand Down Expand Up @@ -463,78 +456,68 @@ def _get_metric_time_elements(self, measure_reference: Optional[MeasureReference
on what aggregation time dimension was used to define the measure.
"""
measure_semantic_model: Optional[SemanticModel] = None
defined_granularity: Optional[TimeGranularity] = None
defined_granularity: Optional[ExpandedTimeGranularity] = None
if measure_reference:
measure_semantic_model = self._get_semantic_model_for_measure(measure_reference)
measure_agg_time_dimension_reference = measure_semantic_model.checked_agg_time_dimension_for_measure(
measure_reference=measure_reference
)
defined_granularity = self._get_time_granularity_for_dimension(
min_granularity = self._get_time_granularity_for_dimension(
semantic_model=measure_semantic_model,
time_dimension_reference=measure_agg_time_dimension_reference,
)
possible_metric_time_granularities = tuple(
time_granularity
for time_granularity in TimeGranularity
if defined_granularity.is_smaller_than_or_equal(time_granularity)
)
defined_granularity = ExpandedTimeGranularity.from_time_granularity(min_granularity)
else:
# If querying metric_time without metrics, will query from time spines.
# Defaults to DAY granularity if available in time spines, else smallest available granularity.
min_time_spine_granularity = min(self._time_spine_sources.keys())
possible_metric_time_granularities = tuple(
time_granularity
for time_granularity in TimeGranularity
if min_time_spine_granularity.is_smaller_than_or_equal(time_granularity)
)
min_granularity = min(self._time_spine_sources.keys())
possible_metric_time_granularities = tuple(
ExpandedTimeGranularity.from_time_granularity(time_granularity)
for time_granularity in TimeGranularity
if min_granularity.is_smaller_than_or_equal(time_granularity)
) + tuple(
[
custom_granularity
for custom_granularity in self._custom_granularities.values()
if min_granularity.is_smaller_than_or_equal(custom_granularity.base_granularity)
]
)

# For each of the possible time granularities, create a LinkableDimension.
# For each granularity, will create one LinkableDimension with no date part, and one for each compatible date part.
# TODO: group by resolution has different logic than source node builder for combining date part w/ grain. Fix.
path_key_to_linkable_dimensions: Dict[ElementPathKey, List[LinkableDimension]] = defaultdict(list)
for time_granularity in possible_metric_time_granularities:
possible_date_parts: Sequence[Optional[DatePart]] = (
# No date part, just the metric time at a different grain.
(None,)
# date part of a metric time at a different grain.
+ tuple(date_part for date_part in DatePart if time_granularity.to_int() <= date_part.to_int())
)
possible_date_parts: Tuple[Optional[DatePart], ...] = (None,)
if not time_granularity.is_custom_granularity:
possible_date_parts += tuple(
date_part
for date_part in DatePart
if time_granularity.base_granularity.to_int() <= date_part.to_int()
)

for date_part in possible_date_parts:
path_key = ElementPathKey(
properties = {LinkableElementProperty.METRIC_TIME}
if time_granularity != defined_granularity:
properties.add(LinkableElementProperty.DERIVED_TIME_GRANULARITY)
if date_part:
properties.add(LinkableElementProperty.DATE_PART)
linkable_dimension = LinkableDimension.create(
defined_in_semantic_model=measure_semantic_model.reference if measure_semantic_model else None,
element_name=MetricFlowReservedKeywords.METRIC_TIME.value,
element_type=LinkableElementType.TIME_DIMENSION,
dimension_type=DimensionType.TIME,
entity_links=(),
time_granularity=ExpandedTimeGranularity.from_time_granularity(time_granularity),
date_part=date_part,
)
path_key_to_linkable_dimensions[path_key].append(
LinkableDimension.create(
defined_in_semantic_model=measure_semantic_model.reference if measure_semantic_model else None,
element_name=MetricFlowReservedKeywords.METRIC_TIME.value,
dimension_type=DimensionType.TIME,
entity_links=(),
join_path=SemanticModelJoinPath(
left_semantic_model_reference=(
measure_semantic_model.reference
if measure_semantic_model
else SemanticModelDerivation.VIRTUAL_SEMANTIC_MODEL_REFERENCE
),
),
# Anything that's not at the base time granularity of the measure's aggregation time dimension
# should be considered derived.
properties=(
frozenset({LinkableElementProperty.METRIC_TIME})
if time_granularity is defined_granularity and date_part is None
else frozenset(
{
LinkableElementProperty.METRIC_TIME,
LinkableElementProperty.DERIVED_TIME_GRANULARITY,
}
)
join_path=SemanticModelJoinPath(
left_semantic_model_reference=(
measure_semantic_model.reference
if measure_semantic_model
else SemanticModelDerivation.VIRTUAL_SEMANTIC_MODEL_REFERENCE
),
time_granularity=ExpandedTimeGranularity.from_time_granularity(time_granularity),
date_part=date_part,
)
),
properties=frozenset(properties),
time_granularity=time_granularity,
date_part=date_part,
)
path_key_to_linkable_dimensions[linkable_dimension.path_key].append(linkable_dimension)

return LinkableElementSet(
path_key_to_linkable_dimensions={
Expand Down Expand Up @@ -651,6 +634,8 @@ def get_linkable_elements_for_distinct_values_query(
"""
return self._no_metric_linkable_element_set.filter(with_any_of=with_any_of, without_any_of=without_any_of)

# TODO: the results of this method don't actually match what will be allowed for the metric. This method checks
# _metric_to_linkable_element_sets, while the actual group by resolution DAG calls _get_linkable_element_set_for_measure.
def get_linkable_elements_for_metrics(
self,
metric_references: Sequence[MetricReference],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,20 @@
from metricflow_semantics.model.semantics.semantic_model_join_evaluator import MAX_JOIN_HOPS
from metricflow_semantics.model.semantics.semantic_model_lookup import SemanticModelLookup
from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec
from metricflow_semantics.time.granularity import ExpandedTimeGranularity

logger = logging.getLogger(__name__)


class MetricLookup:
"""Tracks semantic information for metrics by linking them to semantic models."""

def __init__(self, semantic_manifest: SemanticManifest, semantic_model_lookup: SemanticModelLookup) -> None:
def __init__(
self,
semantic_manifest: SemanticManifest,
semantic_model_lookup: SemanticModelLookup,
custom_granularities: Dict[str, ExpandedTimeGranularity],
) -> None:
"""Initializer.
Args:
Expand All @@ -37,6 +43,7 @@ def __init__(self, semantic_manifest: SemanticManifest, semantic_model_lookup: S
"""
self._metrics: Dict[MetricReference, Metric] = {}
self._semantic_model_lookup = semantic_model_lookup
self._custom_granularities = custom_granularities

for metric in semantic_manifest.metrics:
self._add_metric(metric)
Expand Down Expand Up @@ -188,6 +195,7 @@ def get_valid_agg_time_dimensions_for_metric(
valid_agg_time_dimension_specs = TimeDimensionSpec.generate_possible_specs_for_time_dimension(
time_dimension_reference=agg_time_dimension_reference,
entity_links=agg_time_dimension_entity_links,
custom_granularities=self._custom_granularities,
)
return valid_agg_time_dimension_specs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,21 @@
from metricflow_semantics.specs.measure_spec import MeasureSpec
from metricflow_semantics.specs.non_additive_dimension_spec import NonAdditiveDimensionSpec
from metricflow_semantics.specs.time_dimension_spec import DEFAULT_TIME_GRANULARITY, TimeDimensionSpec
from metricflow_semantics.time.granularity import ExpandedTimeGranularity

logger = logging.getLogger(__name__)


class SemanticModelLookup:
"""Tracks semantic information for semantic models held in a set of SemanticModelContainers."""

def __init__(
self,
model: SemanticManifest,
) -> None:
def __init__(self, model: SemanticManifest, custom_granularities: Dict[str, ExpandedTimeGranularity]) -> None:
"""Initializer.
Args:
model: the semantic manifest used for loading semantic model definitions
"""
self._custom_granularities = custom_granularities
self._measure_index: Dict[MeasureReference, SemanticModel] = {}
self._measure_aggs: Dict[MeasureReference, AggregationType] = {}
self._measure_agg_time_dimension: Dict[MeasureReference, TimeDimensionReference] = {}
Expand Down Expand Up @@ -374,6 +373,7 @@ def get_agg_time_dimension_specs_for_measure(
return TimeDimensionSpec.generate_possible_specs_for_time_dimension(
time_dimension_reference=agg_time_dimension,
entity_links=(entity_link,),
custom_granularities=self._custom_granularities,
)

def get_defined_time_granularity(self, time_dimension_reference: TimeDimensionReference) -> TimeGranularity:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def __init__(self, manifest_lookup: SemanticManifestLookup) -> None: # noqa: D1

self._metric_time_specs = tuple(
TimeDimensionSpec.generate_possible_specs_for_time_dimension(
time_dimension_reference=TimeDimensionReference(element_name=METRIC_TIME_ELEMENT_NAME), entity_links=()
time_dimension_reference=TimeDimensionReference(element_name=METRIC_TIME_ELEMENT_NAME),
entity_links=(),
custom_granularities=self._manifest_lookup._custom_granularities,
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dataclasses import dataclass
from enum import Enum
from typing import Any, List, Optional, Sequence, Tuple, Union
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union

from dbt_semantic_interfaces.naming.keywords import METRIC_TIME_ELEMENT_NAME
from dbt_semantic_interfaces.references import DimensionReference, EntityReference, TimeDimensionReference
Expand Down Expand Up @@ -192,30 +192,31 @@ def comparison_key(self, exclude_fields: Sequence[TimeDimensionSpecField] = ())

@staticmethod
def generate_possible_specs_for_time_dimension(
time_dimension_reference: TimeDimensionReference, entity_links: Tuple[EntityReference, ...]
time_dimension_reference: TimeDimensionReference,
entity_links: Tuple[EntityReference, ...],
custom_granularities: Dict[str, ExpandedTimeGranularity],
) -> List[TimeDimensionSpec]:
"""Generate a list of time dimension specs with all combinations of granularity & date part.
TODO: [custom calendar] decide whether to add support for custom granularities or rename this to indicate that
it only includes standard granularities.
"""
"""Generate a list of time dimension specs with all combinations of granularity & date part."""
time_dimension_specs: List[TimeDimensionSpec] = []
for time_granularity in TimeGranularity:
granularities = [
ExpandedTimeGranularity.from_time_granularity(time_granularity) for time_granularity in TimeGranularity
] + list(custom_granularities.values())
for time_granularity in granularities:
time_dimension_specs.append(
TimeDimensionSpec(
element_name=time_dimension_reference.element_name,
entity_links=entity_links,
time_granularity=ExpandedTimeGranularity.from_time_granularity(time_granularity),
time_granularity=time_granularity,
date_part=None,
)
)
for date_part in DatePart:
for time_granularity in date_part.compatible_granularities:
for compatible_granularity in date_part.compatible_granularities:
time_dimension_specs.append(
TimeDimensionSpec(
element_name=time_dimension_reference.element_name,
entity_links=entity_links,
time_granularity=ExpandedTimeGranularity.from_time_granularity(time_granularity),
time_granularity=ExpandedTimeGranularity.from_time_granularity(compatible_granularity),
date_part=date_part,
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,38 @@
assert_linkable_element_set_snapshot_equal,
assert_object_snapshot_equal,
)
from metricflow_semantics.time.time_spine_source import TimeSpineSource

logger = logging.getLogger(__name__)


def build_semantic_model_lookup_from_manifest(semantic_manifest: SemanticManifest) -> SemanticModelLookup: # noqa: D103
time_spine_sources = TimeSpineSource.build_standard_time_spine_sources(semantic_manifest)
custom_granularities = TimeSpineSource.build_custom_granularities(list(time_spine_sources.values()))
return SemanticModelLookup(model=semantic_manifest, custom_granularities=custom_granularities)


@pytest.fixture
def semantic_model_lookup(simple_semantic_manifest: SemanticManifest) -> SemanticModelLookup: # noqa: D103
return SemanticModelLookup(
model=simple_semantic_manifest,
)
return build_semantic_model_lookup_from_manifest(simple_semantic_manifest)


@pytest.fixture
def multi_hop_semantic_model_lookup( # noqa: D103
multi_hop_join_manifest: SemanticManifest,
) -> SemanticModelLookup:
return SemanticModelLookup(model=multi_hop_join_manifest)
return build_semantic_model_lookup_from_manifest(multi_hop_join_manifest)


@pytest.fixture
def metric_lookup( # noqa: D103
simple_semantic_manifest: SemanticManifest, semantic_model_lookup: SemanticModelLookup
) -> MetricLookup:
return MetricLookup(semantic_manifest=simple_semantic_manifest, semantic_model_lookup=semantic_model_lookup)
return MetricLookup(
semantic_manifest=simple_semantic_manifest,
semantic_model_lookup=semantic_model_lookup,
custom_granularities=semantic_model_lookup._custom_granularities,
)


@pytest.fixture
Expand All @@ -46,6 +55,7 @@ def multi_hop_metric_lookup( # noqa: D103
return MetricLookup(
semantic_manifest=multi_hop_join_manifest,
semantic_model_lookup=multi_hop_semantic_model_lookup,
custom_granularities=multi_hop_semantic_model_lookup._custom_granularities,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"TimeDimension('metric_time', 'hour', date_part_name='month')",
"TimeDimension('metric_time', 'hour', date_part_name='quarter')",
"TimeDimension('metric_time', 'hour', date_part_name='year')",
"TimeDimension('metric_time', 'martian_day')",
"TimeDimension('metric_time', 'microsecond')",
"TimeDimension('metric_time', 'microsecond', date_part_name='day')",
"TimeDimension('metric_time', 'microsecond', date_part_name='dow')",
Expand Down
Loading

0 comments on commit 963b4e1

Please sign in to comment.