|
| 1 | +# Task |
| 2 | + |
| 3 | +Task is the basic unit of building workflow. There are two types of tasks: simple task and operator. |
| 4 | + |
| 5 | +## Simple Task |
| 6 | +The functionality of simple task is defined by binding it to a [worker](./worker.md). |
| 7 | +Here is an example of how to define a simple task: |
| 8 | +```python |
| 9 | +from omagent_core.engine.worker.base import BaseWorker |
| 10 | +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow |
| 11 | +from omagent_core.engine.workflow.task.simple_task import simple_task |
| 12 | +from omagent_core.utils.registry import registry |
| 13 | + |
| 14 | +# Define a worker |
| 15 | +@registry.register_worker() |
| 16 | +class SimpleWorker(BaseWorker): |
| 17 | + def _run(self, my_name: str): |
| 18 | + return {} |
| 19 | + |
| 20 | +# Define a workflow |
| 21 | +workflow = ConductorWorkflow(name='my_exp') |
| 22 | + |
| 23 | +# Define a simple task |
| 24 | +task = simple_task(task_def_name='SimpleWorker', task_reference_name='ref_name', inputs={'my_name': workflow.input('my_name')}) |
| 25 | + |
| 26 | +workflow >> task |
| 27 | +``` |
| 28 | +Specify the task definition name(```task_def_name```) and the task reference name(```task_reference_name```). The task definition name should be the name of the corresponding worker class. The task reference name is used to identify the task in the workflow. |
| 29 | +Specify the inputs of the task. Inputs may be either values or references to a workflow's initial inputs or the outputs of preceding tasks. |
| 30 | +See [workflow](./workflow.md) for workflow details. |
| 31 | + |
| 32 | +## Operators |
| 33 | +Operators are the build-in tasks provided by the workflow engine. They handle the workflow control logic. |
| 34 | +### 1. Switch Task |
| 35 | +Switch task is used to make a decision based on the value of a given field. |
| 36 | +```python |
| 37 | +from omagent_core.engine.workflow.task.switch_task import SwitchTask |
| 38 | +from omagent_core.engine.worker.base import BaseWorker |
| 39 | +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow |
| 40 | +from omagent_core.engine.workflow.task.simple_task import simple_task |
| 41 | +from omagent_core.utils.registry import registry |
| 42 | + |
| 43 | +@registry.register_worker() |
| 44 | +class SimpleWorker1(BaseWorker): |
| 45 | + def _run(self): |
| 46 | + print('worker1') |
| 47 | + return {} |
| 48 | + |
| 49 | +@registry.register_worker() |
| 50 | +class SimpleWorker2(BaseWorker): |
| 51 | + def _run(self): |
| 52 | + print('worker2') |
| 53 | + return {} |
| 54 | + |
| 55 | +@registry.register_worker() |
| 56 | +class SimpleWorker3(BaseWorker): |
| 57 | + def _run(self): |
| 58 | + print('worker3') |
| 59 | + return {} |
| 60 | + |
| 61 | +workflow = ConductorWorkflow(name='switch_test') |
| 62 | + |
| 63 | +# Create some example tasks (replace with your actual tasks) |
| 64 | +task1 = simple_task(task_def_name='SimpleWorker1', task_reference_name='ref_name1') |
| 65 | +task2 = simple_task(task_def_name='SimpleWorker2', task_reference_name='ref_name2') |
| 66 | +task3 = simple_task(task_def_name='SimpleWorker3', task_reference_name='ref_name3') |
| 67 | + |
| 68 | +# 1. Create a switch task with a value-based condition |
| 69 | +switch = SwitchTask( |
| 70 | + task_ref_name="my_switch", |
| 71 | + case_expression=workflow.input('switch_case_value'), # This will evaluate the switch_case_value from workflow input |
| 72 | +) |
| 73 | + |
| 74 | +# 2. Add cases |
| 75 | +switch.switch_case("w1", [task1]) |
| 76 | +switch.switch_case("w2", [task2]) |
| 77 | + |
| 78 | +# 3. Add default case (optional) |
| 79 | +switch.default_case([task3]) |
| 80 | + |
| 81 | +workflow >> switch |
| 82 | + |
| 83 | +workflow.register(overwrite=True) |
| 84 | +``` |
| 85 | +This will create a basic workflow with a switch task shown below. |
| 86 | +<p align="center"> |
| 87 | + <img src="../images/switch_task.png" width="300"/> |
| 88 | +</p> |
| 89 | +You can also chaining the switch cases as follows: |
| 90 | + |
| 91 | +```python |
| 92 | +switch.switch_case("w1", [task1]).switch_case("w2", [task2]).default_case([task3]) |
| 93 | +``` |
| 94 | + |
| 95 | +### 2. Fork-Join Task |
| 96 | +The fork-join task is used to execute multiple tasks in parallel. |
| 97 | +```python |
| 98 | +from omagent_core.engine.workflow.task.fork_task import ForkTask |
| 99 | +from omagent_core.engine.worker.base import BaseWorker |
| 100 | +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow |
| 101 | +from omagent_core.engine.workflow.task.simple_task import simple_task |
| 102 | +from omagent_core.utils.registry import registry |
| 103 | + |
| 104 | + |
| 105 | +@registry.register_worker() |
| 106 | +class SimpleWorker1(BaseWorker): |
| 107 | + def _run(self): |
| 108 | + print("worker1") |
| 109 | + return {} |
| 110 | + |
| 111 | + |
| 112 | +@registry.register_worker() |
| 113 | +class SimpleWorker2(BaseWorker): |
| 114 | + def _run(self): |
| 115 | + print("worker2") |
| 116 | + return {} |
| 117 | + |
| 118 | + |
| 119 | +@registry.register_worker() |
| 120 | +class SimpleWorker3(BaseWorker): |
| 121 | + def _run(self): |
| 122 | + print("worker3") |
| 123 | + return {} |
| 124 | + |
| 125 | + |
| 126 | +# Create the main workflow |
| 127 | +workflow = ConductorWorkflow(name="fork_join_test") |
| 128 | + |
| 129 | +# Create tasks for parallel execution |
| 130 | +task1 = simple_task(task_def_name="SimpleWorker1", task_reference_name="parallel_task1") |
| 131 | +task2 = simple_task(task_def_name="SimpleWorker2", task_reference_name="parallel_task2") |
| 132 | +task3 = simple_task(task_def_name="SimpleWorker3", task_reference_name="parallel_task3") |
| 133 | + |
| 134 | +# Create parallel execution paths |
| 135 | +path1 = [task1] # First parallel path |
| 136 | +path2 = [task2] # Second parallel path |
| 137 | +path3 = [task3] # Third parallel path |
| 138 | + |
| 139 | +# Create the fork task with multiple parallel paths |
| 140 | +fork_task = ForkTask( |
| 141 | + task_ref_name="parallel_execution", |
| 142 | + forked_tasks=[path1, path2, path3], |
| 143 | + # The join will wait for the last task in each path |
| 144 | + join_on=["parallel_task1", "parallel_task2", "parallel_task3"] |
| 145 | +) |
| 146 | + |
| 147 | +# Add the fork task to the workflow |
| 148 | +workflow.add(fork_task) |
| 149 | + |
| 150 | +workflow.register(overwrite=True) |
| 151 | +``` |
| 152 | +This will create a basic workflow with a fork-join task shown below. |
| 153 | +<p align="center"> |
| 154 | + <img src="../images/fork_task.png" width="300"/> |
| 155 | +</p> |
| 156 | + |
| 157 | +### 3. Do-While Task |
0 commit comments