Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom header option for consumers #29

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

ilyashtrikul
Copy link

Messenger Serializer requires type key with class FQN. This PR makes it possible with headers option in transport config.

framework:
    messenger:
        transports:
            producer:
               # ...
            consumer:
                dsn: '%env(KAFKA_URL)%'
                # serializer: App\Infrastructure\Messenger\MySerializer
                options:
                    commitAsync: true
                    receiveTimeout: 10000
                    topic:
                        name: "events"
                    kafka_conf:
                        enable.auto.offset.store: 'false'
                        group.id: 'my-group-id' # should be unique per consumer
                        security.protocol: 'sasl_ssl'
                        ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
                        sasl.username: '%env(KAFKA_SASL_USERNAME)%'
                        sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
                        sasl.mechanisms: 'SCRAM-SHA-256'
                        max.poll.interval.ms: '45000'
                    topic_conf:
                        auto.offset.reset: 'earliest'
                    # Custom headers for all messages
                    headers:
                      type: Some\Custom\Message\Class

@KonstantinCodes
Copy link
Owner

Hi @ilyashtrikul, thanks for the PR!

I also see you fixed the test :)

The headers field is definitely useful. But you can achieve the same result if you supply a custom serializer. And supplying a custom serializer is standard for symfony messenger.

So I'm wondering if you have considered that option?

@ilyashtrikul
Copy link
Author

You're right, custom serializer is a solution, but it's not just copy-paste with some fixes, it's a bundle with own configuration and etc only for one header field.

@ilyashtrikul
Copy link
Author

If you think this changes are not part of this library or will not make work a bit easy - feel free to close this PR ;)

@KonstantinCodes
Copy link
Owner

@ilyashtrikul I'm not sure what you mean with own bundle with configuration.

You just specify the service ID of your Serializer in the Messenger config

            consumer:
                dsn: '%env(KAFKA_URL)%'
                serializer: App\Infrastructure\Messenger\MySerializer

@ilyashtrikul
Copy link
Author

ilyashtrikul commented Jan 14, 2021

MySerializer don't know which type of message come in (without specific headers), so it should be some hardcode for topic -> class in MySerializer or make serializer with own configuration for topic-class (which is better move to bundle for flexible configuring) and it will be a lot of serializer services. My way is set topic-class config nearby all consumer configuration.

@KonstantinCodes
Copy link
Owner

@ilyashtrikul Are you using Avro? Avro includes the qualified name, that you can use in your serializer to determine the output class

@gayansanjeewa
Copy link

gayansanjeewa commented Mar 24, 2022

Hi @ilyashtrikul, thanks for the PR!

I also see you fixed the test :)

The headers field is definitely useful. But you can achieve the same result if you supply a custom serializer. And supplying a custom serializer is standard for symfony messenger.

So I'm wondering if you have considered that option?

Hi @KonstantinCodes!

I find it really helpful to have the ability to pass the Event class as a header option.

I'm using your package in one of my projects (Huge thanks for creating this package!!) and I consume about 10+ different topics and for each, I had to introduce 10+ serializers.

If we have the ability to mention, okay for this topic this is the Event class by mentioning it in the header, then I can simplify this by just having a single generic serializer

What I mean is this

    order_delayed:
        dsn: '%env(KAFKA_URL)%'
        serializer: App\KafkaSerializer
        retry_strategy:
            max_retries: 0
        options:
            topic:
                name: order.delayed
            kafka_conf:
                enable.auto.offset.store: 'false'
                group.id: tracking
            headers:
                type: App\OrderDelayedEvent

Then in a KafkaSerializer, I would just have

$message = $this->serializer->deserialize($encodedEnvelope['body'], $encodedEnvelope['headers']['type'], self::FORMAT);

What do you think?


Update: If you have a doubt as to whether the headers is the best place to mention the event class, I'd like to suggest an alternative approach like below

      options:
          topic:
              name: order.delayed
              event: App\OrderDelayedEvent

@KonstantinCodes
Copy link
Owner

@gayansanjeewa May I ask, how the payload is serialized? it it plain json?

@gayansanjeewa
Copy link

Hey @KonstantinCodes

So the decode method of the serializer is like

    /**
     * @param string[] $encodedEnvelope
     */
    public function decode(array $encodedEnvelope): Envelope
    {
        if (empty($encodedEnvelope['body'])) {
            return new Envelope(new EmptyMessageEvent());
        }

        /** @var OrderDelayedEvent $message */
        $message = $this->serializer->deserialize($encodedEnvelope['body'], OrderDelayedEvent::class, 'json');

        return new Envelope($message);
    }

and the payload would be like

{
    "order_id": "3c77d2f1-4b2a-4eac-bef3-a3c4c74fc64a",
    "user_id": 3300317213,
    "user_email": "[email protected]",
    "order_number": "OPL-100001111"
}

@gayansanjeewa
Copy link

Hey @KonstantinCodes ping 😄
I'd like to hear your feedback on my previous comment. Thanks!

@KonstantinCodes
Copy link
Owner

@gayansanjeewa Overriding these properties doesn't really seem like an ideal solution.
If you do it this way, you'll need to configure different transports for each topic you consume.

I'd rather just pass the KafkaMessage to the Serializer so you can map the topic name to a class.

@gayansanjeewa
Copy link

Okay, thanks for your feedback @KonstantinCodes!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants