Skip to content

Commit

Permalink
feat: sugar for tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
supersergiy committed Feb 14, 2025
1 parent 163b4fb commit fad2700
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions zetta_utils/mazepa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ class _TaskableOperation(Generic[P, R_co]):
id_fn: Callable[[Callable, list, dict], str] = attrs.field(
default=functools.partial(id_generation.generate_invocation_id, prefix="task")
)
worker_type: str | None = None
runtime_limit_sec: float | None = None
upkeep_interval_sec: float = constants.DEFAULT_UPKEEP_INTERVAL

Expand All @@ -185,8 +184,9 @@ def __call__(

def make_task(
self,
*args: P.args,
**kwargs: P.kwargs,
*args,
worker_type=None,
**kwargs,
) -> Task[R_co]:
id_ = self.id_fn(self.fn, list(args), kwargs)
upkeep_settings = TaskUpkeepSettings(
Expand All @@ -197,7 +197,7 @@ def make_task(
fn=self.fn,
operation_name=self.operation_name,
id_=id_,
worker_type=self.worker_type,
worker_type=worker_type,
upkeep_settings=upkeep_settings,
runtime_limit_sec=self.runtime_limit_sec,
)
Expand Down Expand Up @@ -254,7 +254,7 @@ def taskable_operation_cls(
*,
operation_name: str | None = None,
):
def _make_task(self, *args, **kwargs):
def _make_task(self, *args, worker_type, **kwargs):
if operation_name is None:
if hasattr(self, "get_operation_name"):
operation_name_final = self.get_operation_name() # pragma: no cover
Expand All @@ -267,7 +267,9 @@ def _make_task(self, *args, **kwargs):
operation_name=operation_name_final,
# TODO: Other params passed to decorator
).make_task(
*args, **kwargs
*args,
worker_type=worker_type,
**kwargs,
) # pylint: disable=protected-access
return task

Expand Down

0 comments on commit fad2700

Please sign in to comment.