Skip to content

Commit

Permalink
mutate to offer pipe_output distributively
Browse files Browse the repository at this point in the history
We can apply functions to nodes without touching it

Mutate recieves functions as arguments and builds for those target
functions a pipe_output. If the target function already has a
pipe_output it will append the mutate function as the last step.
  • Loading branch information
jernejfrank committed Oct 6, 2024
1 parent 37dedd3 commit 57a3512
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 3 deletions.
2 changes: 2 additions & 0 deletions hamilton/function_modifiers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@
pipe = macros.pipe
pipe_input = macros.pipe_input
pipe_output = macros.pipe_output
mutate = macros.mutate
step = macros.step
apply_to = macros.apply_to

# resolve transform/model decorator
dynamic_transform = macros.dynamic_transform
Expand Down
222 changes: 219 additions & 3 deletions hamilton/function_modifiers/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,26 +323,38 @@ class Applicable:

def __init__(
self,
fn: Callable,
fn: Union[Callable, str, None],
args: Tuple[Union[Any, SingleDependency], ...],
kwargs: Dict[str, Union[Any, SingleDependency]],
target_fn: Union[Callable, str, None] = None,
_resolvers: List[ConfigResolver] = None,
_name: Optional[str] = None,
_namespace: Union[str, None, EllipsisType] = ...,
_target: base.TargetType = None,
):
"""Instantiates an Applicable.
:param fn: Function it takes in
We allow fn=None for the use-cases where we want to store the Applicable config (i.e. .when* family, namespace, target, etc.)
but do not yet the access to the actual function we are turning into the Applicable. In addition, in case the target nodes come
from a function (using extract_columns/extract_fields) we can pass target_fn to have access to its pointer that we can decorate
programmatically. See `apply_to` and `mutate` for an example.
:param args: Args (*args) to pass to the function
:param kwargs: Kwargs (**kwargs) to pass to the function
:param fn: Function it takes in. Can be None to create an Applicable placeholder with delayed choice of function.
:param target_fn: Function the applicable will be applied to
:param _resolvers: Resolvers to use for the function
:param _name: Name of the node to be created
:param _namespace: Namespace of the node to be created -- currently only single-level namespaces are supported
:param _target: Selects which target nodes it will be appended onto. By default all.
:param kwargs: Kwargs (**kwargs) to pass to the function
"""

if isinstance(fn, str) or isinstance(target_fn, str):
raise TypeError("Strings are not supported currently. Please provide function pointer.")

self.fn = fn
self.target_fn = target_fn

if "_name" in kwargs:
raise ValueError("Cannot pass in _name as a kwarg")

Expand All @@ -363,6 +375,7 @@ def _with_resolvers(self, *additional_resolvers: ConfigResolver) -> "Applicable"
_namespace=self.namespace,
args=self.args,
kwargs=self.kwargs,
target_fn=self.target_fn,
)

def when(self, **key_value_pairs) -> "Applicable":
Expand Down Expand Up @@ -415,6 +428,7 @@ def namespaced(self, namespace: NamespaceType) -> "Applicable":
_namespace=namespace,
args=self.args,
kwargs=self.kwargs,
target_fn=self.target_fn,
)

def resolves(self, config: Dict[str, Any]) -> bool:
Expand Down Expand Up @@ -452,6 +466,7 @@ def named(self, name: str, namespace: NamespaceType = ...) -> "Applicable":
),
args=self.args,
kwargs=self.kwargs,
target_fn=self.target_fn,
)

def on_output(self, target: base.TargetType) -> "Applicable":
Expand Down Expand Up @@ -1174,3 +1189,204 @@ def chain_transforms(
)
first_arg = raw_node.name
return nodes, first_arg


def apply_to(fn_: Union[Callable, str], **mutating_fn_kwargs: Union[SingleDependency, Any]):
"""Creates an applicable placeholder with potential kwargs that will be applied to a node (or a subcomponent of a node).
See documentation for `mutate` to see how this is used. It de facto allows a postponed `step`.
We pass fn=None here as this will be the function we are decorating and need to delay passing it in. The target
function is the one we wish to mutate and we store it for later access.
:param fn: Function the applicable will be applied to
:param mutating_fn_kwargs: Kwargs (**kwargs) to pass to the mutator function. Must be validly called as f(**kwargs), and have a 1:1 mapping of kwargs to parameters.
:return: an applicable placeholder with the target function
"""
return Applicable(fn=None, args=(), kwargs=mutating_fn_kwargs, target_fn=fn_, _resolvers=[])


class NotSameModuleError(Exception):
def __init__(self, fn: Callable, target_fn: Callable):
super().__init__(
f"The functions have to be in the same module... "
f"The target function {target_fn.__name__} is in module {target_fn.__module__} and "
f"the mutator function {fn.__name__} is in module {fn.__module__}./n"
"Use power user setting to disable this restriction."
)


class mutate:
"""Running a transformation on the outputs of a series of functions.
This is closely related to `pipe_output` as it effectively allows you to run transformations on the output of a node without touching that node.
We choose which target functions we wish to mutate by the transformation we are decorating. For now, the target functions, that will be mutated,
have to be in the same module (come speak to us if you need this capability over multiple modules).
If we wish to apply `_transform1` to the output of A and B and `_transform2` only to the output
of node B, we can do this like
.. code-block:: python
:name: Simple @mutate example
def A(...):
return ...
def B(...):
return ...
@mutate(A,B)
def _transform1(...):
return ...
@mutate(B)
def _transform2(...):
return ...
we obtain the new pipe-like subDAGs **A_raw --> _transform1 --> A** and **B_raw --> _transform1 --> _transform2 --> B**,
where the behavior is the same as `pipe_output`.
While it is generally reasonable to contain these constructs within a node's function,
you should consider `mutate` in the following scenarios:
1. Loading data and applying pre-cleaning step.
2. Feature engineering via joining, filtering, sorting, etc.
3. Experimenting with different transformations across nodes by selectively turning transformations on / off.
We assume the first argument of the decorated function to be the output of the function we are targeting.
For transformations with multiple arguments you can use key word arguments coupled with `step` or `value`
the same as with other `pipe`-family decorators
.. code-block:: python
:name: Simple @mutate example with multiple arguments
@mutate(A,B,arg2=step('upstream_node'),arg3=value(some_literal),...)
def _transform1(output_from_target:correct_type, arg2:arg2_type, arg3:arg3_type,...):
return ...
You can also select individual args that will be applied to each target node by adding `apply_to(...)`
.. code-block:: python
:name: Simple @mutate example with multiple arguments allowing individual actions
@mutate(
apply_to(A,arg2=step('upstream_node_1'),arg3=value(some_literal_1)),
apply_to(B,arg2=step('upstream_node_2'),arg3=value(some_literal_2)),
)
def _transform1(output_from_target:correct_type, arg2:arg2_type, arg3:arg3_type,...):
return ...
In case of multiple output nodes, for example after extract_field / extract_columns we can also specify the output node that we wish to mutate.
The following would mutate all columns of *A* individually while in the case of function *B* only "field_1"
.. code-block:: python
:name: @mutate example targeting specific nodes local
@extract_columns("col_1", "col_2")
def A(...):
return ...
@extract_fields(
{"field_1":int, "field_2":int, "field_3":int}
)
def B(...):
return ...
@mutate(
apply_to(A),
apply_to(B).on_output("field_1"),
)
def foo(a:int)->Dict[str,int]:
return {"field_1":1, "field_2":2, "field_3":3}
"""

def __init__(
self,
*target_functions: Union[Applicable, Callable],
collapse: bool = False,
_chain: bool = False,
**mutating_function_kwargs: Union[SingleDependency, Any],
):
"""Instantiates a `\@mutate` decorator.
We assume the first argument of the decorated function to be the output of the function we are targeting.
:param target_functions: functions we wish to mutate the output of
:param collapse: Whether to collapse this into a single node. This is not currently supported.
:param _chain: Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed. @flow will make use of this.
:param \*\*mutating_function_kwargs: other kwargs that the decorated function has. Must be validly called as f(kwargs), and have a 1-to-1 mapping of kwargs to parameters. This will be applied for all `target_function`, unless `apply_to` already has the mutator function kwargs, in which case it takes those.
"""
self.collapse = collapse
self.chain = _chain
# keeping it here once it gets implemented maybe nice to have options
if self.collapse:
raise NotImplementedError(
"Collapsing functions as one node is not yet implemented for mutate(). Please reach out if you want this feature."
)
if self.chain:
raise NotImplementedError("@flow() is not yet supported -- this is ")

self.remote_applicables = tuple(
[apply_to(fn) if isinstance(fn, Callable) else fn for fn in target_functions]
)
self.mutating_function_kwargs = mutating_function_kwargs

# Cross module will require some thought so we are restricting mutate to single module for now
self.restrict_to_single_module = True

def validate_same_module(self, mutating_fn: Callable):
"""Validates target functions are in the same module as the mutator function.
:param mutating_fn: Function to validate against
:return: Nothing, raises exception if not valid.
"""
local_module = mutating_fn.__module__
for remote_applicable in self.remote_applicables:
if remote_applicable.target_fn.__module__ != local_module:
raise NotSameModuleError(fn=mutating_fn, target_fn=remote_applicable.target_fn)

def _create_step(self, mutating_fn: Callable, remote_applicable_builder: Applicable):
"""Adds the correct function for the applicable and resolves kwargs"""

if not remote_applicable_builder.kwargs:
remote_applicable_builder.kwargs = self.mutating_function_kwargs

remote_applicable_builder.fn = mutating_fn

return remote_applicable_builder

def __call__(self, mutating_fn: Callable):
"""Adds to an existing pipe_output or creates a new pipe_output.
This is a new type of decorator that builds `pipe_output` for multiple nodes in the DAG. It does
not fit in the current decorator framework since it does not decorate the node function in the DAG
but allows us to "remotely decorate" multiple nodes at once, which needs to happen before the
NodeTransformLifecycle gets applied / resolved.
:param mutating_fn: function that will be used in pipe_output to transform target function
:return: mutating_fn, to guarantee function works even when Hamilton driver is not used
"""

if not mutating_fn.__name__.startswith("_"):
mutating_fn.__name__ = "".join(("_", mutating_fn.__name__))

if self.restrict_to_single_module:
self.validate_same_module(mutating_fn=mutating_fn)

for remote_applicable in self.remote_applicables:
new_pipe_step = self._create_step(
mutating_fn=mutating_fn, remote_applicable_builder=remote_applicable
)
found_pipe_output = False
if hasattr(remote_applicable.target_fn, "transform"):
for decorator in remote_applicable.target_fn.transform:
if isinstance(decorator, pipe_output):
decorator.transforms = decorator.transforms + (new_pipe_step,)
found_pipe_output = True

if not found_pipe_output:
remote_applicable.target_fn = pipe_output(
new_pipe_step, collapse=self.collapse, _chain=self.chain
)(remote_applicable.target_fn)

return mutating_fn

0 comments on commit 57a3512

Please sign in to comment.