You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The minimum [headers][payload] would be a kafka message that has no headers from me, or data from me.
so [headers == only benthos stuff][payload == empty]
The full one would look like this:
so [headers == only benthos stuff + my kafka headers][payload == my data]
All headers are bound to those payloads. They are every bit as important as the payload itself.
Hey @ghstahl 👋 It's not entirely clear what kind of errors you're getting, but the only case in which root.value = this will fail is when the message you're getting from Kafka isn't valid JSON. If this is the case, then using root.value = content() should do the trick. Since you're archiving the batches to binary blobs, then the metadata of all but the first message gets discarded, so doing root.headers = @ is a good way to store the metadata in each message so it gets preserved. Hope that helps.
The following input is causing problems for me because I can't seem to get kafka headers any other way to end up in my custom output plugin;
The reason is that
ALL
kafka messages are good, but some will be treated by benthos as errors.This is because of the following line;
I would rather have not needed to do this at all because headers
CANNOT
be separated from the payload.What I want to get in my Custom Output plugin's Write:
An array of messages in the following format if represented as a Binary Blob.
[headers][payload]
[headers][payload]
[headers][payload]
[headers][payload]
[headers][payload]
The minimum
[headers][payload]
would be a kafka message that has no headers from me, or data from me.so
[headers == only benthos stuff][payload == empty]
The full one would look like this:
so
[headers == only benthos stuff + my kafka headers][payload == my data]
All headers are bound to those payloads. They are every bit as important as the payload itself.
good kafka message
headers
==[all benthos metadata + all kafka headers]
payload
==[nil,json,notjson,binary], etc
how can I get all the kafka messages in their entirety?
The text was updated successfully, but these errors were encountered: