You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When tombstones are sent to the input topic, they are swallowed by the plugin and never surface in the pipeline.
Kafka tombstones are events with a null value.
They are used to signal the deletion of the entity behind the key of the message.
Here, we use an Avro-schema for the topic: this may be the source of the problem.
See an example pipeline below.
This null-management is crucial because we use Logstash to ingest Kafka topics into ElasticSearch, so we can index the entities present in the events, and entites can be deleted (they are streamed from a database): such deleted entities should be unindexed in ElasticSearch.
We clearly see the warning that is printed by Logstash when it encounters a null value:
publicdefdecode(data)ifdata.length < 5@logger.error('message is too small to decode')else
It could be just a test if data.length == 0 to trigger the tombstone handling:
Giving an empty log with one field: [@metadata][kafka][tombstone]=true
Steps to reproduce:
Publish an event with a key, and a null value into a Kafka topic that is provided as the input of a Logstash pipeline
Use stdout output and see that nothing is printed for such event
Use a filter to add some metadata when a mandatory field is not present (likely a tombstone message) and see that nothing is printed for such event
Here is a minimum pipeline showing that null events are not sent to the pipeline, and a sample of our use-case we would like to achieve:
input {
kafka {
bootstrap_servers => ["${KAFKA_BROKER_URL}"]
topics => ["${TOPIC}"]
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "${SASL_JAAS_CONFIG}"
ssl_endpoint_identification_algorithm => "https"
group_id => "${GROUP_ID}"
auto_offset_reset => "${AUTO_OFFSET_RESET}"
isolation_level => "read_committed"
codec => avro_schema_registry {
endpoint => "${KAFKA_SCHEMA_REGISTRY_URL}"
username => "${KAFKA_SCHEMA_REGISTRY_USERNAME}"
password => "${KAFKA_SCHEMA_REGISTRY_PASSWORD}"
}
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
decorate_events => "basic"
}
}
# Here, it's already too late: the event is not processed, so there is nothing we can filter on...
# filter {
# }
output {
# Here, we'd like to do this:
# if [@metadata][kafka][tombstone] {
# elasticsearch {
# action => "delete"
# ...
# }
# } else {
# elasticsearch {
# action => "index"
# ...
# }
# }
stdout {
codec => rubydebug { metadata => true }
}
}
The text was updated successfully, but these errors were encountered:
slaout
changed the title
Tombstones are swallowed in an Avro topic: please give them to the pipeline, with [@medata][kafka][tombstone]=true
Tombstones are swallowed in an Avro topic: please give them to the pipeline, with [@metadata][kafka][tombstone]=true
May 24, 2024
Using Logstash version 8.13.0
When tombstones are sent to the input topic, they are swallowed by the plugin and never surface in the pipeline.
Kafka tombstones are events with a null value.
They are used to signal the deletion of the entity behind the key of the message.
Here, we use an Avro-schema for the topic: this may be the source of the problem.
See an example pipeline below.
This null-management is crucial because we use Logstash to ingest Kafka topics into ElasticSearch, so we can index the entities present in the events, and entites can be deleted (they are streamed from a database): such deleted entities should be unindexed in ElasticSearch.
It seems like the problem lies in this file:
https://github.com/revpoint/logstash-codec-avro_schema_registry/blob/master/lib/logstash/codecs/avro_schema_registry.rb#L218
We clearly see the warning that is printed by Logstash when it encounters a null value:
It could be just a test if data.length == 0 to trigger the tombstone handling:
Giving an empty log with one field: [@metadata][kafka][tombstone]=true
Steps to reproduce:
Here is a minimum pipeline showing that null events are not sent to the pipeline, and a sample of our use-case we would like to achieve:
The text was updated successfully, but these errors were encountered: