Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Tombstone with Reactive Messaging #155

Open
BastianSperrhacke-Otto opened this issue Aug 24, 2022 · 5 comments
Open

Kafka Tombstone with Reactive Messaging #155

BastianSperrhacke-Otto opened this issue Aug 24, 2022 · 5 comments

Comments

@BastianSperrhacke-Otto
Copy link

BastianSperrhacke-Otto commented Aug 24, 2022

Hi there.
Currently I am implementing a Kafka-Message-Publisher using mp reactive messaging.
Behind the scenes I use quarkus which comes with the smallrye implementation of the spec.

Our Kafka Topic is configured to "compact" which generally means, that the most recent message related to a given ID will be kept without being deleting by the kafka system.
Now I have the problem to delete the recent message which means to get rid of the last message of that ID.
For this purpose the kafka Guidelines recommend to send a so called "Tombstone", which is a Message with the ID in the Metadata and a NULL-Message.
And the NULL-Message is the problem: Due to the spec of the Emitter I use the message must not be null:

public interface Emitter<T> {
    /**
     * Sends a message to the channel.
     *
     * @param <M> the <em>Message</em> type
     * @param **msg the <em>Message</em> to send, must not be {@code null}**
     * @throws IllegalStateException if the channel has been cancelled or terminated or if an overflow strategy of
     *         {@link OnOverflow.Strategy#THROW_EXCEPTION THROW_EXCEPTION} or {@link OnOverflow.Strategy#BUFFER BUFFER} is
     *         configured and the emitter overflows.
     */
    <M extends Message<? extends T>> void send(M msg);

Are there any thoughts or even solutions on that?

@cescoffier
Copy link
Contributor

cescoffier commented Aug 24, 2022

You can do:

@Incoming("in")
@Outgoing("out")
public Record<String, String> process(String in) {
    return Record.of("my-key", in);
}

Same with an emitter (just use null as in).
See: https://smallrye.io/smallrye-reactive-messaging/3.18.0/kafka/writing-kafka-records/#serialization

@BastianSperrhacke-Otto
Copy link
Author

Hi @cescoffier . Thank you for the answer. You mean non-standard io.smallrye.reactive.messaging.kafka.Record? I try this, stay tuned.

@BastianSperrhacke-Otto
Copy link
Author

Hi @cescoffier . Finally I've got it even though it was little bit tricky. It works with this snippet:


    @Inject
    @Channel(CHANNEL_NAME)
    Emitter<io.smallrye.reactive.messaging.kafka.Record<String, my.app.OutgoingMessage>> kafkaEmitter;
	
	
	public void publish(my.app.OutgoingMessage myOutgoingMessage) {
	
	//metadata from io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata
	Metadata metadata = Metadata.of(io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata.builder()
                    .withId(UUID.randomUUID().toString())
                    .withSpecVersion("1.0") // cloudevent Spec.
                    .withType("mytype." + EVENT_SPEC_VERSION)
                    .withTimestamp(ZonedDateTime.now())
                    .withSource(new URI("//some.live.server/entity/" + myId))
            .build()
            );
	
	//sending message
	kafkaEmitter.send(Message.of(Record.of(myId.toString(), myOutgoingMessage), metadata));
	
	//sending tombstone
	kafkaEmitter.send(Message.of(Record.of(assignmentChanged.getAssignment().getId().toString(), null), metadata));
	
	}
	

But this feels for me as a spec-loving guy like a workaround, because I have to use some vendor (smallrye) specific stuff, especially the Record class, to get a null value into my payload.

I think it would be a nice Idea to add some ability to send NULL-Payload, like
kafkaEmitter.sendNullRecord(metadata)

or
kafkaEmitter.sendTombstone(id, metadata)

or
kafkaEmitter.send(NullMessage.of(metadata))
where NullMessage behaves like Message, but skips the Nullchecks.

How are your thoughts on this? How can we achieve this?

@cescoffier
Copy link
Contributor

As you use MEtadata already, you can achieve the same using the OutgoingKafkaMetadata.

Note that sendNullRecord and sendTombstone would have to be:

  • kafka specifics - reactive messaging is kafka agnostic
  • smallrye one - emitter is from the spec, pushing these methods would be not without challenges

@BastianSperrhacke-Otto
Copy link
Author

What I meant is to enhance the spec MP reactive messaging in one of the upcoming releases. Of course, that is probably not only a pull request ( I have absolutetly no idea, what effort is needed for a addition to the spec).
So the Idea is to find a possibility that the user can use the spec (not a vendor specific class) to send a null Record. Currently there is IMO no possibility to do that, so that I had to use the Record class from smallrye.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants