forked from coze-dev/coze-py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkflow_stream.py
48 lines (38 loc) · 1.73 KB
/
workflow_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
"""
This example describes how to use the workflow interface to stream chat.
"""
import os
from cozepy import COZE_COM_BASE_URL
# Get an access_token through personal access token or oauth.
coze_api_token = os.getenv("COZE_API_TOKEN")
# The default access is api.coze.com, but if you need to access api.coze.cn,
# please use base_url to configure the api endpoint to access
coze_api_base = os.getenv("COZE_API_BASE") or COZE_COM_BASE_URL
from cozepy import Coze, TokenAuth, Stream, WorkflowEvent, WorkflowEventType # noqa
# Init the Coze client through the access_token.
coze = Coze(auth=TokenAuth(token=coze_api_token), base_url=coze_api_base)
# Create a workflow instance in Coze, copy the last number from the web link as the workflow's ID.
workflow_id = "workflow id"
# The stream interface will return an iterator of WorkflowEvent. Developers should iterate
# through this iterator to obtain WorkflowEvent and handle them separately according to
# the type of WorkflowEvent.
def handle_workflow_iterator(stream: Stream[WorkflowEvent]):
for event in stream:
if event.event == WorkflowEventType.MESSAGE:
print("got message", event.message)
elif event.event == WorkflowEventType.ERROR:
print("got error", event.error)
elif event.event == WorkflowEventType.INTERRUPT:
handle_workflow_iterator(
coze.workflows.runs.resume(
workflow_id=workflow_id,
event_id=event.interrupt.interrupt_data.event_id,
resume_data="hey",
interrupt_type=event.interrupt.interrupt_data.type,
)
)
handle_workflow_iterator(
coze.workflows.runs.stream(
workflow_id=workflow_id,
)
)