Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Add the Dapr Runtime for Processes #9642

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5b84392
wip dapr runtime for processes
moonbox3 Oct 30, 2024
67e4dd9
dapr updates
moonbox3 Oct 30, 2024
039a687
Merge branch 'main' into processes-dapr-runtime
moonbox3 Oct 31, 2024
b4bb82e
work on dapr runtime.
moonbox3 Nov 6, 2024
7394832
Merge main to branch
moonbox3 Nov 6, 2024
f48b1a0
Further work on dapr runtime.
moonbox3 Nov 7, 2024
8451bd4
Merge branch 'main' into processes-dapr-runtime
moonbox3 Nov 7, 2024
221a429
Working dapr FastAPI sample app.
moonbox3 Nov 8, 2024
ac5d571
Merge branch 'main' into processes-dapr-runtime
moonbox3 Nov 8, 2024
ed3bed7
Fix mypy errors
moonbox3 Nov 9, 2024
c7fb532
Clean up sample, add sample README, add unit tests for Dapr runtime.
moonbox3 Nov 11, 2024
1fd8802
unit test cleanup
moonbox3 Nov 11, 2024
834777b
Merge branch 'main' into processes-dapr-runtime
moonbox3 Nov 11, 2024
65d2bdc
Revert sample changes.
moonbox3 Nov 11, 2024
e1fee76
Fix typo
moonbox3 Nov 11, 2024
f6a89b0
Merge branch 'main' into processes-dapr-runtime
moonbox3 Nov 11, 2024
2b99a87
Fix return type
moonbox3 Nov 11, 2024
b409f1d
Merge branch 'processes-dapr-runtime' of github.com:moonbox3/semantic…
moonbox3 Nov 11, 2024
a152b6a
Improve import depth. Fix running nested processes.
moonbox3 Nov 14, 2024
13b60a2
Clean up demo sample.
moonbox3 Nov 14, 2024
9ebb1cd
Simplify being able to pass in enum and convert string for user.
moonbox3 Nov 14, 2024
fc9154f
Merge main to branch
moonbox3 Nov 14, 2024
ba87c48
Fix mypy errors
moonbox3 Nov 14, 2024
6196927
Refactor support for Dapr FastAPI and Flask samples.
moonbox3 Nov 14, 2024
0f9e16e
Merge branch 'main' into processes-dapr-runtime
moonbox3 Nov 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion python/.vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,17 @@
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": true
},
{
"name": "Pythonapp with Dapr",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/samples/demos/process_with_dapr/app.py",
"console": "integratedTerminal",
"preLaunchTask": "daprd-debug-python",
"postDebugTask": "daprd-down-python",
"justMyCode": false
}
}
]
}
14 changes: 14 additions & 0 deletions python/.vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@
"panel": "shared"
},
"problemMatcher": []
},
{
"label": "daprd-debug-python",
"type": "daprd",
"appId": "dapr-processes",
"httpPort": 3500,
"appPort": 5001,
"grpcPort": 53317,
"metricsPort": 9091
},
{
"label": "daprd-down-python",
"type": "daprd-down",
"appId": "dapr-processes"
}
],
"inputs": [
Expand Down
4 changes: 4 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ pandas = [
aws = [
"boto3>=1.28.57",
]
dapr = [
"dapr>=1.14.0",
"dapr.ext.fastapi>=1.14.0",
]

[tool.uv]
prerelease = "if-necessary-or-explicit"
Expand Down
28 changes: 14 additions & 14 deletions python/samples/concepts/processes/cycles_with_fan_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class KickOffStep(KernelProcessStep):

@kernel_function(name=KICK_OFF_FUNCTION)
async def print_welcome_message(self, context: KernelProcessStepContext):
await context.emit_event(process_event=CommonEvents.StartARequested.value, data="Get Going A")
await context.emit_event(process_event=CommonEvents.StartBRequested.value, data="Get Going B")
await context.emit_event(process_event=CommonEvents.StartARequested, data="Get Going A")
await context.emit_event(process_event=CommonEvents.StartBRequested, data="Get Going B")


# Define a sample `AStep` step that will emit an event after 1 second.
Expand All @@ -52,7 +52,7 @@ class AStep(KernelProcessStep):
@kernel_function()
async def do_it(self, context: KernelProcessStepContext):
await asyncio.sleep(1)
await context.emit_event(process_event=CommonEvents.AStepDone.value, data="I did A")
await context.emit_event(process_event=CommonEvents.AStepDone, data="I did A")


# Define a sample `BStep` step that will emit an event after 2 seconds.
Expand All @@ -61,7 +61,7 @@ class BStep(KernelProcessStep):
@kernel_function()
async def do_it(self, context: KernelProcessStepContext):
await asyncio.sleep(2)
await context.emit_event(process_event=CommonEvents.BStepDone.value, data="I did B")
await context.emit_event(process_event=CommonEvents.BStepDone, data="I did B")


# Define a sample `CStepState` that will keep track of the current cycle.
Expand All @@ -84,9 +84,9 @@ async def do_it(self, context: KernelProcessStepContext, astepdata: str, bstepda
print(f"CStep Current Cycle: {self.state.current_cycle}")
if self.state.current_cycle == 3:
print("CStep Exit Requested")
await context.emit_event(process_event=CommonEvents.ExitRequested.value)
await context.emit_event(process_event=CommonEvents.ExitRequested)
return
await context.emit_event(process_event=CommonEvents.CStepDone.value)
await context.emit_event(process_event=CommonEvents.CStepDone)


kernel = Kernel()
Expand All @@ -105,25 +105,25 @@ async def cycles_with_fan_in():
myCStep = process.add_step(step_type=CStep)

# Define the input event and where to send it to
process.on_input_event(event_id=CommonEvents.StartProcess.value).send_event_to(target=kickoff_step)
process.on_input_event(event_id=CommonEvents.StartProcess).send_event_to(target=kickoff_step)

# Define the process flow
kickoff_step.on_event(event_id=CommonEvents.StartARequested.value).send_event_to(target=myAStep)
kickoff_step.on_event(event_id=CommonEvents.StartBRequested.value).send_event_to(target=myBStep)
myAStep.on_event(event_id=CommonEvents.AStepDone.value).send_event_to(target=myCStep, parameter_name="astepdata")
kickoff_step.on_event(event_id=CommonEvents.StartARequested).send_event_to(target=myAStep)
kickoff_step.on_event(event_id=CommonEvents.StartBRequested).send_event_to(target=myBStep)
myAStep.on_event(event_id=CommonEvents.AStepDone).send_event_to(target=myCStep, parameter_name="astepdata")

# Define the fan in behavior once both AStep and BStep are done
myBStep.on_event(event_id=CommonEvents.BStepDone.value).send_event_to(target=myCStep, parameter_name="bstepdata")
myCStep.on_event(event_id=CommonEvents.CStepDone.value).send_event_to(target=kickoff_step)
myCStep.on_event(event_id=CommonEvents.ExitRequested.value).stop_process()
myBStep.on_event(event_id=CommonEvents.BStepDone).send_event_to(target=myCStep, parameter_name="bstepdata")
myCStep.on_event(event_id=CommonEvents.CStepDone).send_event_to(target=kickoff_step)
myCStep.on_event(event_id=CommonEvents.ExitRequested).stop_process()

# Build the process
kernel_process = process.build()

async with await start(
process=kernel_process,
kernel=kernel,
initial_event=KernelProcessEvent(id=CommonEvents.StartProcess.value, data="foo"),
initial_event=KernelProcessEvent(id=CommonEvents.StartProcess, data="foo"),
) as process_context:
process_state = await process_context.get_state()
c_step_state: KernelProcessStepState[CStepState] = next(
Expand Down
12 changes: 6 additions & 6 deletions python/samples/concepts/processes/nested_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ async def repeat(self, message: str, context: KernelProcessStepContext, count: i
print(f"[REPEAT] {output}")

await context.emit_event(
process_event=ProcessEvents.OutputReadyPublic.value,
process_event=ProcessEvents.OutputReadyPublic,
data=output,
visibility=KernelProcessEventVisibility.Public,
)
await context.emit_event(
process_event=ProcessEvents.OutputReadyInternal.value,
process_event=ProcessEvents.OutputReadyInternal,
data=output,
visibility=KernelProcessEventVisibility.Internal,
)
Expand All @@ -74,7 +74,7 @@ def create_linear_process(name: str):
echo_step = process_builder.add_step(step_type=EchoStep)
repeat_step = process_builder.add_step(step_type=RepeatStep)

process_builder.on_input_event(event_id=ProcessEvents.StartProcess.value).send_event_to(target=echo_step)
process_builder.on_input_event(event_id=ProcessEvents.StartProcess).send_event_to(target=echo_step)

echo_step.on_function_result(function_name=EchoStep.ECHO).send_event_to(
target=repeat_step, parameter_name="message"
Expand All @@ -93,16 +93,16 @@ async def nested_process():

nested_process_step = process_builder.add_step_from_process(create_linear_process("Inner"))

process_builder.steps[1].on_event(ProcessEvents.OutputReadyInternal.value).send_event_to(
nested_process_step.where_input_event_is(ProcessEvents.StartProcess.value)
process_builder.steps[1].on_event(ProcessEvents.OutputReadyInternal).send_event_to(
nested_process_step.where_input_event_is(ProcessEvents.StartProcess)
)

process = process_builder.build()

test_input = "Test"

process_handle = await start(
process=process, kernel=kernel, initial_event=ProcessEvents.StartProcess.value, data=test_input
process=process, kernel=kernel, initial_event=ProcessEvents.StartProcess, data=test_input
)
process_info = await process_handle.get_state()

Expand Down
163 changes: 163 additions & 0 deletions python/samples/demos/process_with_dapr/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# Semantic Kernel Processes in Dapr

This demo contains a FastAPI app that uses Dapr to run a Semantic Kernel Process. Dapr is a portable, event-driven runtime that can simplify the process of building resilient, stateful application that run in the cloud and/or edge. Dapr is a natural fit for hosting Semantic Kernel Processes and allows you to scale your processes in size and quantity without sacrificing performance, or reliability.

For more information about Semantic Kernel Processes and Dapr, see the following documentation:

#### Semantic Kernel Processes

- [Overview of the Process Framework (docs)](https://learn.microsoft.com/semantic-kernel/frameworks/process/process-framework)
- [Getting Started with Processes (samples)](../../getting_started_with_processes/)

#### Dapr

- [Dapr documentation](https://docs.dapr.io/)
- [Dapr Actor documentation](https://v1-10.docs.dapr.io/developing-applications/building-blocks/actors/)
- [Dapr local development](https://docs.dapr.io/getting-started/install-dapr-selfhost/)

## Running the Demo

Before running this Demo, make sure to configure Dapr for local development following the links above. The Dapr containers must be running for this demo application to run.

```mermaid
flowchart LR
Kickoff --> A
Kickoff --> B
A --> C
B --> C

C -->|Count < 3| Kickoff
C -->|Count >= 3| End

classDef kickoffClass fill:#f9f,stroke:#333,stroke-width:2px;
class Kickoff kickoffClass;

End((End))
```

1. Build and run the sample. Running the Dapr service locally can be done using the Dapr Cli or with the Dapr VS Code extension. The VS Code extension is the recommended approach if you want to debug the code as it runs.
- If using VSCode to debug, select the `Pythonapp with Dapr` option from the Run and Debug dropdown list.
1. When the service is up and running, it will expose a single API in localhost port 5001.

#### Invoking the process:

1. Open a web browser and point it to [http://localhost:5001/processes/1234](http://localhost:5001/processes/1234) to invoke a new process with `Id = "1234"`
1. When the process is complete, you should see `{"processId":"1234"}` in the web browser.
1. You should also see console output from the running service with logs that match the following:

```text
##### Kickoff ran.
##### AStep ran.
##### BStep ran.
##### CStep activated with Cycle = '1'.
##### CStep run cycle 2.
##### Kickoff ran.
##### AStep ran.
##### BStep ran.
##### CStep run cycle 3 - exiting.
```

Now refresh the page in your browser to run the same processes instance again. Now the logs should look like this:

```text
##### Kickoff ran.
##### AStep ran.
##### BStep ran.
##### CStep run cycle 4 - exiting.
```

Notice that the logs from the two runs are not the same. In the first run, the processes has not been run before and so it's initial
state came from what we defined in the process:

**_First Run_**

- `CState` is initialized with `Cycle = 1` which is the initial state that we specified while building the process.
- `CState` is invoked a total of two times before the terminal condition of `Cycle >= 3` is reached.

In the second run however, the process has persisted state from the first run:

**_Second Run_**

- `CState` is initialized with `Cycle = 3` which is the final state from the first run of the process.
- `CState` is invoked only once and is already in the terminal condition of `Cycle >= 3`.

If you create a new instance of the process with `Id = "ABCD"` by pointing your browser to [http://localhost:5001/processes/ABCD](http://localhost:5001/processes/ABCD), you will see the it will start with the initial state as expected.

## Understanding the Code

Below are the key aspects of the code that show how Dapr and Semantic Kernel Processes can be integrated into a FastAPI app:

- Create a new Dapr FastAPI app.
- Add the required Semantic Kernel and Dapr packages to your project:

**_General Imports and Dapr Packages_**

```python
# Copyright (c) Microsoft. All rights reserved.

import asyncio
import logging
from contextlib import asynccontextmanager
from enum import Enum
from typing import TYPE_CHECKING, ClassVar

import uvicorn
from dapr.actor import ActorId
from dapr.actor.runtime.context import ActorRuntimeContext
from dapr.ext.fastapi import DaprActor, DaprApp
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import Field
```

**_Semantic Kernel Process Imports_**

```python
from semantic_kernel import Kernel
from semantic_kernel.functions import kernel_function
from semantic_kernel.kernel_pydantic import KernelBaseModel
from semantic_kernel.processes.dapr_runtime.actors.event_buffer_actor import EventBufferActor
from semantic_kernel.processes.dapr_runtime.actors.external_event_buffer_actor import ExternalEventBufferActor
from semantic_kernel.processes.dapr_runtime.actors.message_buffer_actor import MessageBufferActor
from semantic_kernel.processes.dapr_runtime.actors.process_actor import ProcessActor
from semantic_kernel.processes.dapr_runtime.actors.step_actor import StepActor
from semantic_kernel.processes.dapr_runtime.dapr_kernel_process import start
from semantic_kernel.processes.kernel_process.kernel_process_step import KernelProcessStep
from semantic_kernel.processes.kernel_process.kernel_process_step_context import KernelProcessStepContext
from semantic_kernel.processes.kernel_process.kernel_process_step_state import KernelProcessStepState
from semantic_kernel.processes.process_builder import ProcessBuilder
```

**_Define the FastAPI app, Dapr App, and the DaprActor_**

```python
kernel = Kernel()


def process_actor_factory(ctx: ActorRuntimeContext, actor_id: ActorId) -> ProcessActor:
"""Factory function to create ProcessActor instances with dependencies."""
return ProcessActor(ctx, actor_id, kernel)


def step_actor_factory(ctx: ActorRuntimeContext, actor_id: ActorId) -> StepActor:
"""Factory function to create StepActor instances with dependencies."""
return StepActor(ctx, actor_id, kernel=kernel)


@asynccontextmanager
async def lifespan(app: FastAPI):
print("## actor startup ##")
await actor.register_actor(ProcessActor, actor_factory=process_actor_factory)
await actor.register_actor(StepActor, actor_factory=step_actor_factory)
await actor.register_actor(EventBufferActor)
await actor.register_actor(MessageBufferActor)
await actor.register_actor(ExternalEventBufferActor)
yield


app = FastAPI(title="SKProcess", lifespan=lifespan)
dapr_app = DaprApp(app)
actor = DaprActor(app)
```

- Build and run a Process as you normally would. For this Demo we run a simple example process from with a FastAPI method in response to a GET request. [See the FastAPI app here](./app.py).
Loading
Loading