Skip to content

Commit

Permalink
Merge pull request dapr#4485 from hhunter-ms/issue_4424
Browse files Browse the repository at this point in the history
add python to streaming subscriptions
  • Loading branch information
hhunter-ms authored Jan 10, 2025
2 parents 5f6a726 + 0a61f41 commit ddcc50e
Showing 1 changed file with 106 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,112 @@ As messages are sent to the given message handler code, there is no concept of r

The example below shows the different ways to stream subscribe to a topic.

{{< tabs Go>}}
{{< tabs Python Go >}}

{{% codetab %}}

You can use the `subscribe` method, which returns a `Subscription` object and allows you to pull messages from the stream by calling the `next_message` method. This runs in and may block the main thread while waiting for messages.

```python
import time
from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError
counter = 0
def process_message(message):
global counter
counter += 1
# Process the message here
print(f'Processing message: {message.data()} from {message.topic()}...')
return 'success'
def main():
with DaprClient() as client:
global counter
subscription = client.subscribe(
pubsub_name='pubsub', topic='orders', dead_letter_topic='orders_dead'
)
try:
while counter < 5:
try:
message = subscription.next_message()
except StreamInactiveError as e:
print('Stream is inactive. Retrying...')
time.sleep(1)
continue
if message is None:
print('No message received within timeout period.')
continue
# Process the message
response_status = process_message(message)
if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)
finally:
print("Closing subscription...")
subscription.close()
if __name__ == '__main__':
main()
```

You can also use the `subscribe_with_handler` method, which accepts a callback function executed for each message received from the stream. This runs in a separate thread, so it doesn't block the main thread.

```python
import time
from dapr.clients import DaprClient
from dapr.clients.grpc._response import TopicEventResponse
counter = 0
def process_message(message):
# Process the message here
global counter
counter += 1
print(f'Processing message: {message.data()} from {message.topic()}...')
return TopicEventResponse('success')
def main():
with (DaprClient() as client):
# This will start a new thread that will listen for messages
# and process them in the `process_message` function
close_fn = client.subscribe_with_handler(
pubsub_name='pubsub', topic='orders', handler_fn=process_message,
dead_letter_topic='orders_dead'
)

while counter < 5:
time.sleep(1)

print("Closing subscription...")
close_fn()


if __name__ == '__main__':
main()
```
[Learn more about streaming subscriptions using the Python SDK client.]({{< ref "python-client.md#streaming-message-subscription" >}})
{{% /codetab %}}
{{% codetab %}}
Expand Down

0 comments on commit ddcc50e

Please sign in to comment.