diff --git a/metricflow-semantics/metricflow_semantics/specs/linkable_spec_set.py b/metricflow-semantics/metricflow_semantics/specs/linkable_spec_set.py index 2aa3ac6d0b..41d3a9ea32 100644 --- a/metricflow-semantics/metricflow_semantics/specs/linkable_spec_set.py +++ b/metricflow-semantics/metricflow_semantics/specs/linkable_spec_set.py @@ -43,6 +43,17 @@ def contains_metric_time(self) -> bool: def time_dimension_specs_with_custom_grain(self) -> Tuple[TimeDimensionSpec, ...]: # noqa: D102 return tuple([spec for spec in self.time_dimension_specs if spec.time_granularity.is_custom_granularity]) + def replace_custom_granularity_with_base_granularity(self) -> LinkableSpecSet: + """Return the same spec set, replacing any custom time granularity with its base granularity.""" + return LinkableSpecSet( + dimension_specs=self.dimension_specs, + time_dimension_specs=tuple( + [time_dimension_spec.with_base_grain() for time_dimension_spec in self.time_dimension_specs] + ), + entity_specs=self.entity_specs, + group_by_metric_specs=self.group_by_metric_specs, + ) + def included_agg_time_dimension_specs_for_metric( self, metric_reference: MetricReference, metric_lookup: MetricLookup ) -> List[TimeDimensionSpec]: diff --git a/metricflow/dataflow/builder/dataflow_plan_builder.py b/metricflow/dataflow/builder/dataflow_plan_builder.py index 064c8d40eb..4dd9b8cce7 100644 --- a/metricflow/dataflow/builder/dataflow_plan_builder.py +++ b/metricflow/dataflow/builder/dataflow_plan_builder.py @@ -893,18 +893,8 @@ def _select_source_nodes_with_linkable_specs( # Use a dictionary to dedupe for consistent ordering. selected_nodes: Dict[DataflowPlanNode, None] = {} - # Find the source node that will satisfy the base granularity. Custom granularities will be joined in later. - linkable_specs_set_with_base_granularities: Set[LinkableInstanceSpec] = set() # TODO: Add support for no-metrics queries for custom grains without a join (i.e., select directly from time spine). - for linkable_spec in linkable_specs.as_tuple: - if isinstance(linkable_spec, TimeDimensionSpec) and linkable_spec.time_granularity.is_custom_granularity: - linkable_spec_with_base_grain = linkable_spec.with_grain( - ExpandedTimeGranularity.from_time_granularity(linkable_spec.time_granularity.base_granularity) - ) - linkable_specs_set_with_base_granularities.add(linkable_spec_with_base_grain) - else: - linkable_specs_set_with_base_granularities.add(linkable_spec) - + linkable_specs_set_with_base_granularities = set(linkable_specs.as_tuple) for source_node in source_nodes: output_spec_set = self._node_data_set_resolver.get_output_data_set(source_node).instance_set.spec_set all_linkable_specs_in_node = set(output_spec_set.linkable_specs) @@ -977,10 +967,14 @@ def _find_source_node_recipe( measure_spec_properties: Optional[MeasureSpecProperties] = None, ) -> Optional[SourceNodeRecipe]: """Find the most suitable source nodes to satisfy the requested specs, as well as how to join them.""" - linkable_specs = linkable_spec_set.as_tuple candidate_nodes_for_left_side_of_join: List[DataflowPlanNode] = [] candidate_nodes_for_right_side_of_join: List[DataflowPlanNode] = [] + # Replace any custom granularities with their base granularities. The custom granularity will be joined in + # later, since custom granularities cannot be satisfied by source nodes. But we will need the dimension at + # base granularity from the source node in order to join to the appropriate time spine later. + linkable_specs_to_satisfy = linkable_spec_set.replace_custom_granularity_with_base_granularity() + linkable_specs_to_satisfy_tuple = linkable_specs_to_satisfy.as_tuple if measure_spec_properties: candidate_nodes_for_right_side_of_join += self._source_node_set.source_nodes_for_metric_queries candidate_nodes_for_left_side_of_join += self._select_source_nodes_with_measures( @@ -992,15 +986,15 @@ def _find_source_node_recipe( candidate_nodes_for_right_side_of_join += list(self._source_node_set.source_nodes_for_group_by_item_queries) candidate_nodes_for_left_side_of_join += list( self._select_source_nodes_with_linkable_specs( - linkable_specs=linkable_spec_set, + linkable_specs=linkable_specs_to_satisfy, source_nodes=self._source_node_set.source_nodes_for_group_by_item_queries, ) ) # If metric_time is requested without metrics, choose appropriate time spine node to select those values from. - if linkable_spec_set.metric_time_specs: + if linkable_specs_to_satisfy.metric_time_specs: time_spine_node = self._source_node_set.time_spine_nodes[ TimeSpineSource.choose_time_spine_source( - required_time_spine_specs=linkable_spec_set.metric_time_specs, + required_time_spine_specs=linkable_specs_to_satisfy.metric_time_specs, time_spine_sources=self._source_node_builder.time_spine_sources, ).base_granularity ] @@ -1037,7 +1031,7 @@ def _find_source_node_recipe( ) candidate_nodes_for_right_side_of_join = node_processor.remove_unnecessary_nodes( - desired_linkable_specs=linkable_specs, + desired_linkable_specs=linkable_specs_to_satisfy_tuple, nodes=candidate_nodes_for_right_side_of_join, metric_time_dimension_reference=self._metric_time_dimension_reference, time_spine_nodes=self._source_node_set.time_spine_nodes_tuple, @@ -1048,10 +1042,10 @@ def _find_source_node_recipe( f"nodes for the right side of the join" ) # TODO: test multi-hop with custom grains - if DataflowPlanBuilder._contains_multihop_linkables(linkable_specs): + if DataflowPlanBuilder._contains_multihop_linkables(linkable_specs_to_satisfy_tuple): candidate_nodes_for_right_side_of_join = list( node_processor.add_multi_hop_joins( - desired_linkable_specs=linkable_specs, + desired_linkable_specs=linkable_specs_to_satisfy_tuple, nodes=candidate_nodes_for_right_side_of_join, join_type=default_join_type, ) @@ -1065,13 +1059,13 @@ def _find_source_node_recipe( # If there are MetricGroupBys in the requested linkable specs, build source nodes to satisfy them. # We do this at query time instead of during usual source node generation because the number of potential # MetricGroupBy source nodes could be extremely large (and potentially slow). - logger.info(f"Building source nodes for group by metrics: {linkable_spec_set.group_by_metric_specs}") + logger.info(f"Building source nodes for group by metrics: {linkable_specs_to_satisfy.group_by_metric_specs}") candidate_nodes_for_right_side_of_join += [ self._build_query_output_node( query_spec=self._source_node_builder.build_source_node_inputs_for_group_by_metric(group_by_metric_spec), for_group_by_source_node=True, ) - for group_by_metric_spec in linkable_spec_set.group_by_metric_specs + for group_by_metric_spec in linkable_specs_to_satisfy.group_by_metric_specs ] logger.info(f"Processing nodes took: {time.time()-start_time:.2f}s") @@ -1110,7 +1104,7 @@ def _find_source_node_recipe( start_time = time.time() evaluation = node_evaluator.evaluate_node( left_node=node, - required_linkable_specs=list(linkable_specs), + required_linkable_specs=list(linkable_specs_to_satisfy_tuple), default_join_type=default_join_type, ) logger.info(f"Evaluation of {node} took {time.time() - start_time:.2f}s") @@ -1565,10 +1559,7 @@ def _build_aggregated_measure_from_measure_source_node( specs_to_keep_after_join = InstanceSpecSet(measure_specs=(measure_spec,)).merge( InstanceSpecSet.create_from_specs( - [ - spec.with_base_grain() if isinstance(spec, TimeDimensionSpec) else spec - for spec in required_linkable_specs.as_tuple - ] + required_linkable_specs.replace_custom_granularity_with_base_granularity().as_tuple ), ) diff --git a/metricflow/dataflow/builder/node_evaluator.py b/metricflow/dataflow/builder/node_evaluator.py index 6b365cc09f..0d6738b53b 100644 --- a/metricflow/dataflow/builder/node_evaluator.py +++ b/metricflow/dataflow/builder/node_evaluator.py @@ -29,7 +29,6 @@ from metricflow_semantics.specs.entity_spec import LinklessEntitySpec from metricflow_semantics.specs.instance_spec import LinkableInstanceSpec from metricflow_semantics.specs.spec_set import group_specs_by_type -from metricflow_semantics.specs.time_dimension_spec import TimeDimensionSpec from metricflow_semantics.sql.sql_join_type import SqlJoinType from metricflow.dataflow.builder.node_data_set import DataflowPlanNodeOutputDataSetResolver @@ -407,10 +406,6 @@ def evaluate_node( logger.debug(f"Candidate spec set is:\n{mf_pformat(candidate_spec_set)}") data_set_linkable_specs = candidate_spec_set.linkable_specs - # Look for which nodes can satisfy the linkable specs at their base grains. Custom grains will be joined later. - required_linkable_specs_with_base_grains = [ - spec.with_base_grain() if isinstance(spec, TimeDimensionSpec) else spec for spec in required_linkable_specs - ] # These are linkable specs in the start node data set. Those are considered "local". local_linkable_specs: List[LinkableInstanceSpec] = [] @@ -420,7 +415,7 @@ def evaluate_node( # Group required_linkable_specs into local / un-joinable / or possibly joinable. unjoinable_linkable_specs = [] - for required_linkable_spec in required_linkable_specs_with_base_grains: + for required_linkable_spec in required_linkable_specs: is_metric_time = required_linkable_spec.element_name == DataSet.metric_time_dimension_name() is_local = required_linkable_spec in data_set_linkable_specs is_unjoinable = (