Skip to content

Commit

Permalink
Rename dataflow recipe -> source node recipe
Browse files Browse the repository at this point in the history
The dataflow recipe is essentially the best choice for the source node to select from and how to join any other source nodes required to satisfy all requested specs. The name is confusing, though, since the dataflow PLAN is the entire DAG, whichextends far beyond the source nodes. Rename to source node recipe in order to make it more clear that this part of the dataflow plan is only dealing with source nodes.
  • Loading branch information
courtneyholcomb committed Sep 24, 2024
1 parent 50f3ca7 commit 0779e41
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions metricflow/dataflow/builder/dataflow_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@


@dataclass(frozen=True)
class DataflowRecipe:
class SourceNodeRecipe:
"""Get a recipe for how to build a dataflow plan node that outputs measures and linkable instances as needed."""

source_node: DataflowPlanNode
Expand Down Expand Up @@ -274,13 +274,13 @@ def _build_aggregated_conversion_node(
queried_linkable_specs=queried_linkable_specs,
filter_specs=base_measure_spec.filter_specs,
)
base_measure_recipe = self._find_dataflow_recipe(
base_measure_recipe = self._find_source_node_recipe(
measure_spec_properties=self._build_measure_spec_properties([base_measure_spec.measure_spec]),
predicate_pushdown_state=time_range_only_pushdown_state,
linkable_spec_set=base_required_linkable_specs,
)
logger.info(f"Recipe for base measure aggregation:\n{mf_pformat(base_measure_recipe)}")
conversion_measure_recipe = self._find_dataflow_recipe(
conversion_measure_recipe = self._find_source_node_recipe(
measure_spec_properties=self._build_measure_spec_properties([conversion_measure_spec.measure_spec]),
predicate_pushdown_state=disabled_pushdown_state,
linkable_spec_set=LinkableSpecSet(),
Expand Down Expand Up @@ -363,7 +363,7 @@ def _build_aggregated_conversion_node(
)

# Aggregate the conversion events with the JoinConversionEventsNode as the source node
recipe_with_join_conversion_source_node = DataflowRecipe(
recipe_with_join_conversion_source_node = SourceNodeRecipe(
source_node=join_conversion_node,
required_local_linkable_specs=base_measure_recipe.required_local_linkable_specs,
join_linkable_instances_recipes=base_measure_recipe.join_linkable_instances_recipes,
Expand Down Expand Up @@ -786,7 +786,7 @@ def _build_plan_for_distinct_values(
predicate_pushdown_state = PredicatePushdownState(
time_range_constraint=query_spec.time_range_constraint, where_filter_specs=query_level_filter_specs
)
dataflow_recipe = self._find_dataflow_recipe(
dataflow_recipe = self._find_source_node_recipe(
linkable_spec_set=required_linkable_specs, predicate_pushdown_state=predicate_pushdown_state
)
if not dataflow_recipe:
Expand Down Expand Up @@ -970,12 +970,13 @@ def _build_measure_spec_properties(self, measure_specs: Sequence[MeasureSpec]) -
non_additive_dimension_spec=non_additive_dimension_spec,
)

def _find_dataflow_recipe(
def _find_source_node_recipe(
self,
linkable_spec_set: LinkableSpecSet,
predicate_pushdown_state: PredicatePushdownState,
measure_spec_properties: Optional[MeasureSpecProperties] = None,
) -> Optional[DataflowRecipe]:
) -> Optional[SourceNodeRecipe]:
"""Find the most suitable source nodes to satisfy the requested specs, as well as how to join them."""
linkable_specs = linkable_spec_set.as_tuple
candidate_nodes_for_left_side_of_join: List[DataflowPlanNode] = []
candidate_nodes_for_right_side_of_join: List[DataflowPlanNode] = []
Expand Down Expand Up @@ -1166,7 +1167,7 @@ def _find_dataflow_recipe(
for x in evaluation.join_recipes
for y in x.join_on_partition_time_dimensions
)
return DataflowRecipe(
return SourceNodeRecipe(
source_node=node_with_lowest_cost_plan,
required_local_linkable_specs=(
evaluation.local_linkable_specs
Expand Down Expand Up @@ -1355,7 +1356,7 @@ def build_aggregated_measure(
metric_input_measure_spec: MetricInputMeasureSpec,
queried_linkable_specs: LinkableSpecSet,
predicate_pushdown_state: PredicatePushdownState,
measure_recipe: Optional[DataflowRecipe] = None,
measure_recipe: Optional[SourceNodeRecipe] = None,
) -> DataflowPlanNode:
"""Returns a node where the measures are aggregated by the linkable specs and constrained appropriately.
Expand Down Expand Up @@ -1418,7 +1419,7 @@ def _build_aggregated_measure_from_measure_source_node(
metric_input_measure_spec: MetricInputMeasureSpec,
queried_linkable_specs: LinkableSpecSet,
predicate_pushdown_state: PredicatePushdownState,
measure_recipe: Optional[DataflowRecipe] = None,
measure_recipe: Optional[SourceNodeRecipe] = None,
) -> DataflowPlanNode:
measure_spec = metric_input_measure_spec.measure_spec
cumulative = metric_input_measure_spec.cumulative_description is not None
Expand Down Expand Up @@ -1484,7 +1485,7 @@ def _build_aggregated_measure_from_measure_source_node(
)

find_recipe_start_time = time.time()
measure_recipe = self._find_dataflow_recipe(
measure_recipe = self._find_source_node_recipe(
measure_spec_properties=measure_properties,
predicate_pushdown_state=measure_pushdown_state,
linkable_spec_set=required_linkable_specs,
Expand Down

0 comments on commit 0779e41

Please sign in to comment.