diff --git a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md index 75b1b758c2b..c6f71888370 100644 --- a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md @@ -468,7 +468,7 @@ Apache Kafka supports the following bulk metadata options: When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the `metadata` query param in the request url. -The param name is `partitionKey`. +The param name can either be `partitionKey` or `__key` Example: @@ -484,7 +484,7 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partiti ### Message headers -All other metadata key/value pairs (that are not `partitionKey`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message. +All other metadata key/value pairs (that are not `partitionKey` or `__key`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message. ```shell curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \ @@ -495,7 +495,51 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla } }' ``` +### Kafka Pubsub special message headers received on consumer side +When consuming messages, special message metadata are being automatically passed as headers. These are: +- `__key`: the message key if available +- `__topic`: the topic for the message +- `__partition`: the partition number for the message +- `__offset`: the offset of the message in the partition +- `__timestamp`: the timestamp for the message + +You can access them within the consumer endpoint as follows: +{{< tabs "Python (FastAPI)" >}} + +{{% codetab %}} + +```python +from fastapi import APIRouter, Body, Response, status +import json +import sys + +app = FastAPI() + +router = APIRouter() + + +@router.get('/dapr/subscribe') +def subscribe(): + subscriptions = [{'pubsubname': 'pubsub', + 'topic': 'my-topic', + 'route': 'my_topic_subscriber', + }] + return subscriptions + +@router.post('/my_topic_subscriber') +def my_topic_subscriber( + key: Annotated[str, Header(alias="__key")], + offset: Annotated[int, Header(alias="__offset")], + event_data=Body()): + print(f"key={key} - offset={offset} - data={event_data}", flush=True) + return Response(status_code=status.HTTP_200_OK) + +app.include_router(router) + +``` + +{{% /codetab %}} ## Receiving message headers with special characters The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors.