Skip to content

Commit edadffe

Browse files
Split sync and async examples
Signed-off-by: Elena Kolevska <[email protected]>
1 parent 0ada5ea commit edadffe

File tree

6 files changed

+166
-98
lines changed

6 files changed

+166
-98
lines changed
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# Example - Publish and subscribe to messages
2+
3+
This example utilizes a publisher and a subscriber to show the bidirectional pubsub pattern.
4+
It creates a publisher and calls the `publish_event` method in the `DaprClient`.
5+
In the s`subscriber.py` file it creates a subscriber object that can call the `next_message` method to get new messages from the stream. After processing the new message, it returns a status to the stream.
6+
7+
8+
> **Note:** Make sure to use the latest proto bindings
9+
10+
## Pre-requisites
11+
12+
- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started)
13+
- [Install Python 3.8+](https://www.python.org/downloads/)
14+
15+
## Install Dapr python-SDK
16+
17+
<!-- Our CI/CD pipeline automatically installs the correct version, so we can skip this step in the automation -->
18+
19+
```bash
20+
pip3 install dapr
21+
```
22+
23+
## Run async example where users control reading messages off the stream
24+
25+
Run the following command in a terminal/command prompt:
26+
27+
<!-- STEP
28+
name: Run subscriber
29+
expected_stdout_lines:
30+
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A..."
31+
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A..."
32+
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A..."
33+
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A..."
34+
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A..."
35+
- "== APP == Closing subscription..."
36+
output_match_mode: substring
37+
background: true
38+
match_order: none
39+
sleep: 3
40+
-->
41+
42+
```bash
43+
# 1. Start Subscriber
44+
dapr run --app-id python-subscriber --app-protocol grpc python3 subscriber.py
45+
```
46+
47+
<!-- END_STEP -->
48+
49+
In another terminal/command prompt run:
50+
51+
<!-- STEP
52+
name: Run publisher
53+
expected_stdout_lines:
54+
- "== APP == {'id': 1, 'message': 'hello world'}"
55+
- "== APP == {'id': 2, 'message': 'hello world'}"
56+
- "== APP == {'id': 3, 'message': 'hello world'}"
57+
- "== APP == {'id': 4, 'message': 'hello world'}"
58+
- "== APP == {'id': 5, 'message': 'hello world'}"
59+
background: true
60+
output_match_mode: substring
61+
sleep: 15
62+
-->
63+
64+
```bash
65+
# 2. Start Publisher
66+
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py
67+
```
68+
69+
<!-- END_STEP -->
70+
71+
## Run async example with a handler function
72+
73+
Run the following command in a terminal/command prompt:
74+
75+
<!-- STEP
76+
name: Run subscriber
77+
expected_stdout_lines:
78+
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A..."
79+
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A..."
80+
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A..."
81+
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A..."
82+
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A..."
83+
- "== APP == Closing subscription..."
84+
output_match_mode: substring
85+
background: true
86+
match_order: none
87+
sleep: 3
88+
-->
89+
90+
```bash
91+
# 1. Start Subscriber
92+
dapr run --app-id python-subscriber --app-protocol grpc python3 subscriber-handler.py
93+
```
94+
95+
<!-- END_STEP -->
96+
97+
In another terminal/command prompt run:
98+
99+
<!-- STEP
100+
name: Run publisher
101+
expected_stdout_lines:
102+
- "== APP == {'id': 1, 'message': 'hello world'}"
103+
- "== APP == {'id': 2, 'message': 'hello world'}"
104+
- "== APP == {'id': 3, 'message': 'hello world'}"
105+
- "== APP == {'id': 4, 'message': 'hello world'}"
106+
- "== APP == {'id': 5, 'message': 'hello world'}"
107+
background: true
108+
output_match_mode: substring
109+
sleep: 15
110+
-->
111+
112+
```bash
113+
# 2. Start Publisher
114+
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py
115+
```
116+
117+
<!-- END_STEP -->
118+
119+
120+
## Cleanup
121+
122+
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# ------------------------------------------------------------
2+
# Copyright 2022 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
# ------------------------------------------------------------
13+
import asyncio
14+
import json
15+
16+
from dapr.aio.clients import DaprClient
17+
18+
async def publish_events():
19+
"""
20+
Publishes events to a pubsub topic asynchronously
21+
"""
22+
23+
async with DaprClient() as d:
24+
id = 0
25+
while id < 5:
26+
id += 1
27+
req_data = {'id': id, 'message': 'hello world'}
28+
29+
# Create a typed message with content type and body
30+
await d.publish_event(
31+
pubsub_name='pubsub',
32+
topic_name='TOPIC_A',
33+
data=json.dumps(req_data),
34+
data_content_type='application/json',
35+
publish_metadata={'ttlInSeconds': '100', 'rawPayload': 'false'},
36+
)
37+
38+
# Print the request
39+
print(req_data, flush=True)
40+
41+
await asyncio.sleep(1)
42+
43+
asyncio.run(publish_events())

examples/pubsub-streaming/async-subscriber-handler.py renamed to examples/pubsub-streaming-async/subscriber-handler.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ async def main():
3232
# Wait until 5 messages are processed
3333
global counter
3434
while counter < 5:
35-
print('Counter: ', counter)
3635
await asyncio.sleep(1)
3736

3837
print('Closing subscription...')

examples/pubsub-streaming/README.md

Lines changed: 0 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -116,103 +116,6 @@ dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --e
116116

117117
<!-- END_STEP -->
118118

119-
## Run async example where users control reading messages off the stream
120-
121-
Run the following command in a terminal/command prompt:
122-
123-
<!-- STEP
124-
name: Run subscriber
125-
expected_stdout_lines:
126-
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A..."
127-
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A..."
128-
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A..."
129-
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A..."
130-
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A..."
131-
- "== APP == Closing subscription..."
132-
output_match_mode: substring
133-
background: true
134-
match_order: none
135-
sleep: 3
136-
-->
137-
138-
```bash
139-
# 1. Start Subscriber
140-
dapr run --app-id python-subscriber --app-protocol grpc python3 async-subscriber.py
141-
```
142-
143-
<!-- END_STEP -->
144-
145-
In another terminal/command prompt run:
146-
147-
<!-- STEP
148-
name: Run publisher
149-
expected_stdout_lines:
150-
- "== APP == {'id': 1, 'message': 'hello world'}"
151-
- "== APP == {'id': 2, 'message': 'hello world'}"
152-
- "== APP == {'id': 3, 'message': 'hello world'}"
153-
- "== APP == {'id': 4, 'message': 'hello world'}"
154-
- "== APP == {'id': 5, 'message': 'hello world'}"
155-
background: true
156-
output_match_mode: substring
157-
sleep: 15
158-
-->
159-
160-
```bash
161-
# 2. Start Publisher
162-
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py
163-
```
164-
165-
<!-- END_STEP -->
166-
167-
## Run async example with a handler function
168-
169-
Run the following command in a terminal/command prompt:
170-
171-
<!-- STEP
172-
name: Run subscriber
173-
expected_stdout_lines:
174-
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A..."
175-
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A..."
176-
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A..."
177-
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A..."
178-
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A..."
179-
- "== APP == Closing subscription..."
180-
output_match_mode: substring
181-
background: true
182-
match_order: none
183-
sleep: 3
184-
-->
185-
186-
```bash
187-
# 1. Start Subscriber
188-
dapr run --app-id python-subscriber --app-protocol grpc python3 async-subscriber-handler.py
189-
```
190-
191-
<!-- END_STEP -->
192-
193-
In another terminal/command prompt run:
194-
195-
<!-- STEP
196-
name: Run publisher
197-
expected_stdout_lines:
198-
- "== APP == {'id': 1, 'message': 'hello world'}"
199-
- "== APP == {'id': 2, 'message': 'hello world'}"
200-
- "== APP == {'id': 3, 'message': 'hello world'}"
201-
- "== APP == {'id': 4, 'message': 'hello world'}"
202-
- "== APP == {'id': 5, 'message': 'hello world'}"
203-
background: true
204-
output_match_mode: substring
205-
sleep: 15
206-
-->
207-
208-
```bash
209-
# 2. Start Publisher
210-
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py
211-
```
212-
213-
<!-- END_STEP -->
214-
215-
216119
## Cleanup
217120

218121

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ commands =
5151
./validate.sh error_handling
5252
./validate.sh pubsub-simple
5353
./validate.sh pubsub-streaming
54+
./validate.sh pubsub-streaming-async
5455
./validate.sh state_store
5556
./validate.sh state_store_query
5657
./validate.sh secret_store

0 commit comments

Comments
 (0)