Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tombstones are swallowed in an Avro topic: please give them to the pipeline, with [@metadata][kafka][tombstone]=true #31

Open
slaout opened this issue May 24, 2024 · 0 comments

Comments

@slaout
Copy link

slaout commented May 24, 2024

Using Logstash version 8.13.0

When tombstones are sent to the input topic, they are swallowed by the plugin and never surface in the pipeline.
Kafka tombstones are events with a null value.
They are used to signal the deletion of the entity behind the key of the message.
Here, we use an Avro-schema for the topic: this may be the source of the problem.
See an example pipeline below.

This null-management is crucial because we use Logstash to ingest Kafka topics into ElasticSearch, so we can index the entities present in the events, and entites can be deleted (they are streamed from a database): such deleted entities should be unindexed in ElasticSearch.

It seems like the problem lies in this file:
https://github.com/revpoint/logstash-codec-avro_schema_registry/blob/master/lib/logstash/codecs/avro_schema_registry.rb#L218

We clearly see the warning that is printed by Logstash when it encounters a null value:

  public
  def decode(data)
    if data.length < 5
      @logger.error('message is too small to decode')
    else

It could be just a test if data.length == 0 to trigger the tombstone handling:
Giving an empty log with one field: [@metadata][kafka][tombstone]=true

Steps to reproduce:

  1. Publish an event with a key, and a null value into a Kafka topic that is provided as the input of a Logstash pipeline
  2. Use stdout output and see that nothing is printed for such event
  3. Use a filter to add some metadata when a mandatory field is not present (likely a tombstone message) and see that nothing is printed for such event

Here is a minimum pipeline showing that null events are not sent to the pipeline, and a sample of our use-case we would like to achieve:

input {
  kafka {
    bootstrap_servers => ["${KAFKA_BROKER_URL}"]
    topics => ["${TOPIC}"]
    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "${SASL_JAAS_CONFIG}"
    ssl_endpoint_identification_algorithm => "https"
    group_id => "${GROUP_ID}"
    auto_offset_reset => "${AUTO_OFFSET_RESET}"
    isolation_level => "read_committed"
    codec => avro_schema_registry {
      endpoint => "${KAFKA_SCHEMA_REGISTRY_URL}"
      username => "${KAFKA_SCHEMA_REGISTRY_USERNAME}"
      password => "${KAFKA_SCHEMA_REGISTRY_PASSWORD}"
    }
    value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    decorate_events => "basic"
  }
}

# Here, it's already too late: the event is not processed, so there is nothing we can filter on...
# filter {
# }

output {
  # Here, we'd like to do this:
  # if [@metadata][kafka][tombstone] {
  #   elasticsearch {
  #     action => "delete"
  #     ...
  #   }
  # } else {
  #   elasticsearch {
  #     action => "index"
  #     ...
  #   }
  # }

  stdout {
    codec => rubydebug { metadata => true }
  }
}
@slaout slaout changed the title Tombstones are swallowed in an Avro topic: please give them to the pipeline, with [@medata][kafka][tombstone]=true Tombstones are swallowed in an Avro topic: please give them to the pipeline, with [@metadata][kafka][tombstone]=true May 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant