Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Queston] Kafka batch and headers #2233

Closed
ghstahl opened this issue Nov 12, 2023 · 1 comment
Closed

[Queston] Kafka batch and headers #2233

ghstahl opened this issue Nov 12, 2023 · 1 comment

Comments

@ghstahl
Copy link

ghstahl commented Nov 12, 2023

The following input is causing problems for me because I can't seem to get kafka headers any other way to end up in my custom output plugin;

input:
  kafka:
    addresses: ["localhost:9093"]
    topics: ["cloudevents-core"]
    consumer_group: "$Default2"    
    batching:
      count: 3
      period: 20s
      processors:
        - mapping: |
            root.headers = @   
            root.value = this
        - archive:
            format: binary

The reason is that ALL kafka messages are good, but some will be treated by benthos as errors.
This is because of the following line;

            root.value = this

I would rather have not needed to do this at all because headers CANNOT be separated from the payload.

        - mapping: |
            root.headers = @   
            root.value = this

What I want to get in my Custom Output plugin's Write:
An array of messages in the following format if represented as a Binary Blob.

[headers][payload] [headers][payload] [headers][payload] [headers][payload] [headers][payload]

The minimum [headers][payload] would be a kafka message that has no headers from me, or data from me.
so [headers == only benthos stuff][payload == empty]

The full one would look like this:

so [headers == only benthos stuff + my kafka headers][payload == my data]

All headers are bound to those payloads. They are every bit as important as the payload itself.

good kafka message

headers == [all benthos metadata + all kafka headers]
payload == [nil,json,notjson,binary], etc

how can I get all the kafka messages in their entirety?

image
@mihaitodor
Copy link
Collaborator

Hey @ghstahl 👋 It's not entirely clear what kind of errors you're getting, but the only case in which root.value = this will fail is when the message you're getting from Kafka isn't valid JSON. If this is the case, then using root.value = content() should do the trick. Since you're archiving the batches to binary blobs, then the metadata of all but the first message gets discarded, so doing root.headers = @ is a good way to store the metadata in each message so it gets preserved. Hope that helps.

Moving to Discussions as per #2026.

@redpanda-data redpanda-data locked and limited conversation to collaborators Nov 12, 2023
@mihaitodor mihaitodor converted this issue into discussion #2234 Nov 12, 2023

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants