Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Add AMQP (RabbitMQ) support in async consumers API #761

Open
deefdragon opened this issue Jan 10, 2024 · 17 comments
Open

[feature] Add AMQP (RabbitMQ) support in async consumers API #761

deefdragon opened this issue Jan 10, 2024 · 17 comments

Comments

@deefdragon
Copy link

Is your feature request related to a problem? Please describe.

This is a re-opening of #373 (and a few others) to add RabbitMQ as an async consumer input now that several existing asynchronous consumers have been added in V5.2.0.

I created this as a new ticket specifically because I am looking into writing a PR for it, and want to have an open ticket to collect the questions I have in.

Describe the solution you'd like

Addition of RabbitMQ

Design Questions

I have been looking at the code for the existing consumers, and it appears that the consumer has to do 3 things.

  1. Be added as a consumer that can be set up in consuming.go
  2. Implement the Service Interface
  3. Be able to listen for data and call Dispatch

Everything else appears to be consumer specific. Am I missing anything?

Consumer Multiplexing

Is a new consumer created for each specified input? IE if I have 2 tables I am reading data in from in postgres, (or 2 queues in rabbitMQ) Would I be calling the setup function twice, or is the consumer client set up once, and expected to handle the multiple inputs itself?

Multiple instances of centrifugo

When centrifugo is scaled out horizontally, it appears that postgres divides the work into partitions to make sure each piece of data is only handled by one instance each. Does the dispatch call properly send data between the instances so that the clients are all properly informed?

If so, RabbitMQ would be able to have each client in the centrifugo instance connect to the same queue, and the data would be processed in a partitioned manner automatically. I just wish to make sure that is acceptable.

Updates Mid-Execution

Is the expectation that the user restarts centrifugo to apply updates to asnyc consumers? or must they be able to adjust on the fly to changes in config?

Testing expectations

I will be able to write some unit tests for the RabbitMQ consumer of course, but I also saw some things that makes me think there are integration tests. Is that the case, and if so, what are the expectations around integration tests (and where might I look for more information/examples on those)?

@FZambia
Copy link
Member

FZambia commented Jan 10, 2024

Hello @deefdragon

I knew that adding async consumers feature may quickly result into requests for other native integrations :) I don't want to add support integrations with all the possible queueing systems in Centrifugo core, so need to think a bit about a good strategy here. My first thought that it should be somehow based on a popularity of the system we integrate with - think RabbitMQ is popular enough to be one such system. And one more criteria – I think we need a real use case before writing a new async consumer. Do you have a production system where you want to use Centrifugo with async consuming from Rabbit? If yes - could you describe the use case? If not – I'd say we better leave this issue open for some time first, discuss questions you asked and then wait for the feedback from community.

Now to questions - yep, I think you understood right how code is structured now, async consumer is just a Service which consumes some queue and calls Dispatch. In the message from queue system we expect API command to execute - method and payload.

The important property of current consumers (PostgreSQL table and Kafka topics) is that they both can maintain message ordering in channels if properly partitioned. RabbitMQ can't do this in basic setup as far I know. It seems that it's possible to achieve with https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams/ and this article additionally describes how to achieve partitioning with it for increased throughput.

Is a new consumer created for each specified input? IE if I have 2 tables I am reading data in from in postgres, (or 2 queues in rabbitMQ) Would I be calling the setup function twice, or is the consumer client set up once, and expected to handle the multiple inputs itself?

I'd say consuming from many queues is nice if possible, for Kafka it's possible to consume from many topics - but that was almost free to implement, not sure how this may be in RabbitMQ case. If it's simple to use many queues - why not and users can decide whether to configure one or many.

If so, RabbitMQ would be able to have each client in the centrifugo instance connect to the same queue, and the data would be processed in a partitioned manner automatically. I just wish to make sure that is acceptable.

Yep, just the note about ordering of processing – I believe most Centrifugo users want to keep order of messages in channels. Of course there could be cases where order is not important. Would be nice to provide an option to keep ordering right from the start. But in general this is why real use case is needed here to have some ground for decisions.

Is the expectation that the user restarts centrifugo to apply updates to asnyc consumers? or must they be able to adjust on the fly to changes in config?

Think applying changes upon restart is sufficient for now. Applying changes on the fly is nice to have - but I afraid can require non-trivial changes in configuration - no enough understanding at this point how difficult this could be.

I will be able to write some unit tests for the RabbitMQ consumer of course, but I also saw some things that makes me think there are integration tests. Is that the case, and if so, what are the expectations around integration tests (and where might I look for more information/examples on those)?

Yep, currently we test consumers in CI, using docker-compose.yaml to start external services. Locally, just docker compose up and then go test ./... -tags integration -v

Overall, I'd approach this carefully and would be nice to understand it may be really useful for some Centrifugo user with production app.

@deefdragon
Copy link
Author

Production Use Case

To answer the question around current production use with one of a few examples, I have a system that authenticates with twitch for user login (among others). When a user changes their password on twitch, twitch sends out a de-auth message to all 3rd party integrations. On getting this message, I de-auth any active keys to prevent them from refreshing, but before the JWT expires naturally (<5m), I want to be able to inform any active clients to log the user out. (Of the front end specifically so they can not send new http requests, not centrifugo. I am aware PRO does support de-authing tokens).

My intent was that users would be able to listen on an appropriate channel on the front end, and when I send a message to the rabbitMQ queue that gets these messages, centrifugo then passes that message on to the appropriate user. (My very initial planning so far has options to substitute headers in the channel so you can target a given channel in a namespace or a given user)

In general, that could be handled by an HTTP call, but that also applies for most cases with async consumers I think. As a general case, RabitMQ is automatically integrated to all of my services due to how I initialize my infra (I have a common framework during startup), and the ability to not have to deploy another service to get messaging direct to users would be ideal.

Integration

I'm wondering if it would be possible to use go's plugin system to separate out the different supported async consumers. There are some very specific considerations when it comes to version matching that may complicate that however.
Something to consider researching later I think. It would be easy enough to migrate any existing implementation to a plugin system if that is the path taken.

As is, I'm going to start with mimicking the existing code to get a hang on how well rabbitMQ can integrate to being with.

Ordering

To me, the most important part of centrifugo is how it centralizes sending users events on demand, and the time between events on the same channel for all of my use-cases is normally on the order of several seconds at smallest, and several minutes under normal circumstances.

As such, I missed the importance of tight ordering for other users and considered it more an optional feature. Useful, but not required.

I will do what I can to take keeping ordering into consideration when working on this.

@deefdragon
Copy link
Author

I have a basic AMQP (I always forget that's the generic variant of rabbitMQ queues. Ive updated the title) consumer written up, tho I've not gone through and done super rigorous testing as of yet.

I have a better understanding of the complexity in the way the consumers are currently written now, and what you mean about strategy. I misinterpreted In the message from queue system we expect API command to execute - method and payload. to mean that the existing standard has the data required located in the message as a whole somewhere (IE could be parsed from the headers of the message), not that all data required would be encoded in the body of the message itself.

The version I currently have written follows the pattern of the Kafka consumer and encodes both the payload and the method into a single json message.

To me however, the ideal end form for an async amqp consumer would be to only have data/b64data be part of the body and have all the other data pulled from the headers/config.

My reasoning for this is to allow setting up the AMQP consumer to listen to the same, already existing messages I'm sending.

Untitled Diagram drawio

So in this example, I already have Service 1 and Service 2 parsing data from RabbitMQ queues. I'd like to be able to create a queue that attaches to the existing exchange, and have Centrifugo be able to get/build the channel and other options from the headers and config. In this way, the only change that would have to be made to pass any events to the front end would be to change the centrifugo config, and the rabbitMQ queues, requiring no code changes.

The code to do this however is going to be quite a bit more complicated due to the list of options that need to be supported. As I have an active use-case for this, I'm going to split what I have currently off, and try to come up with a proof of concept to allow the more complex formatting use-cases. I will circle around with my findings if I think it can be done reasonably, and stably.

(Apologies if I'm getting pushy here. The use case Ive described is somewhat high priority given its security related, but I also have some other stuff undergoing an active migration that this would simplify by leaps and bounds if I can get it functioning)

@deefdragon deefdragon changed the title [feature] Add RabbitMQ to async consumers API [feature] Add AMQP (RabbitMQ) support in async consumers API Jan 12, 2024
@FZambia
Copy link
Member

FZambia commented Jan 12, 2024

Thanks for sharing more details!

I'm wondering if it would be possible to use go's plugin system to separate out the different supported async consumers. There are some very specific considerations when it comes to version matching that may complicate that however.

Currently the answer to this - we are not ready to include Go plugin support to Centrifugo. If you look in the Go ecosystem most systems that added Go plugin support consider it experimental, and the fact plugin requires exact versions of libraries makes this quite unmanageable.

As such, I missed the importance of tight ordering for other users and considered it more an optional feature. Useful, but not required.

If ordering is not important for your use case, I suppose it's ok to not have it but think about possible solutions. We can emphasize this in doc if ordering is hard to achieve, just would be nice to understand whether it's hard or impossible.

to mean that the existing standard has the data required located in the message as a whole somewhere (IE could be parsed from the headers of the message), not that all data required would be encoded in the body of the message itself.

Think this is the most difficult question here. Yes - consumers in Centrifugo expect messages to be Centrifugo API command. If you want to have some rules how to extract data from RabbitMQ messages - then I suppose it's out of scope of Centrifugo at the moment. Also, I think it would be quite strange/non-obvious if all your messages travelling to various destinations will contain Centrifugo-specific payload in headers. I think the correct solution here – write a service which will transform RabbitMQ messages to Centrifugo format - in this case you can simply use HTTP API I suppose.

@deefdragon
Copy link
Author

Implementations

Here is my initial implementation that reads data in from AMQP and dispatches via the existing method used by kafka (I actually used literally the same struct to parse the data).

I also have a more advanced variant that allows substituting information in as needed for the channel, method, and the other parameters (this is optional as it defaults to the simple variant). It is somewhat complex to get this method to work however, as it uses go-templates to accomplish the substitution, and some really messy json encoding/decoding to build the payload structure.

I'm going to be testing the more advanced version in my production environment to see if I encounter any particular issues/complexities with it, but its been stable so far. If I do get heavy use out of the complex form I'm going to see if I can clean it up & optimize it to not require converting to/from json half a dozen times and actually reasonable to submit a PR for. I also would like to see if I can make it usable on more than just the AMQP consumer so its some kind of standardized.

Extra headers

I think it would be quite strange/non-obvious if all your messages traveling to various destinations will contain Centrifugo-specific payload in headers

As is, I pass different headers for different destinations, so in several cases I'm already passing extraneous data from the perspective of one program vs another. I also do tracing that is informed by the headers I include in the messages, so for most cases I would arguably not be including any extra data. This is a fair comment however, as that will not necessarily be the case for most users.

Overall I think I would prefer having the extra headers as opposed to having to send messages into two separate queues, but that's part of why I'm going to do the testing. To verify if this is the case&actually useful.

Ordering

As a side note, I am still researching & testing if ordering is possible. To my understanding the method rabbitMQ uses to build the single consumer streams means that it SHOULD just be a drop in replacement, and you just point the consumer at the appropriate queue. I don't know if the user needs to be able to set any headers for the connection however.

@FZambia
Copy link
Member

FZambia commented Jan 17, 2024

Hello, took a quick look, still thinking that templating to extract Centrifugo-specific data is a very controversial approach. One of the goals of Centrifugo - we generally try to translate good practices to users. Having large message for all systems seems like an antipattern for most use cases. I understand it reduces the work and number of components, but may become a limitation or security concern in the future.

@FZambia
Copy link
Member

FZambia commented Jan 18, 2024

Looks like RabbitMQ Super streams provide exact semantics we want in Centrifugo regarding throughput scalability and ordering: https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams/

@deefdragon
Copy link
Author

Regarding streams, that's actually setup that can actually be done completely on the user's side when they are configuring the queues, so if the user wants to implement the highest level strictness and semantics, they can choose to do so. (I'll write the documentation for this if you are willing to accept the rabbitmq consumer, and recommend&show how to enable streams)

I've been using this (the advanced version specifically) for the last several months, and its been very stable, but running full templates for everything was indeed overkill. Everything I used it for just used {{.Body}} as the data sent out, which is just the body passed in the rabbitmq message. I have been using the templates to determine the exact channel quite a bit however. IE token.deauth:{{.Headers.Token}}#{{.Headers.User}} for the example diagram case I had above.

Would it alleviate enough of the worries you have regarding it being an anit-pattern/security issue if the main data was fed directly in as the content? Having only the channel be adjustable, and only through the request headers?

This would also allow for largely simplifying the method used to specify the channel to IE token.deauth:{{Token}}#{{User}}, pulling from the headers automatically, and would let me replace the template engine, which is quite heavy handed.

@FZambia
Copy link
Member

FZambia commented Aug 9, 2024

@deefdragon hello, just letting you know that I've noticed your comment, unfortunately I lost the context – so will try to refresh my understanding about the implementation you have and come back with thoughts.

@FZambia
Copy link
Member

FZambia commented Aug 29, 2024

Well, I went through the issue and refreshed the context. Current thoughts:

Probably we should introduce a new mode of async consumers. By default, async consumers should be written in a way that they expect a method and API payload command inside the incoming message payload from the external system. I.e. work in a way consumers implemented currently. In PG case method and payload are different columns, in Kafka we expect:

type KafkaJSONEvent struct {
	Method  string          `json:"method"`
	Payload JSONRawOrString `json:"payload"`
}

But in the different mode (let's call it publication_data for now) we expect ONLY the publication data in external message payload. And the method in this case is publish only, without possibility to change it somehow - because it's mostly impossible in my opinion to make a flexible solution for all other API commands in this case. The flexible solution already exists with current implementation. And it always possible to define different consumers in different modes.

Each consumer that implements publication_data mode will provide some context extracted from headers to be used to construct the channel to publish. This context should not depend on specific library somehow - if we change underlying library used for consumer implementation Centrifugo users should not notice that.

Also some other thoughts:

  • I think we better call consumer rabbitmq instead of amqp – and concentrate on RabbitMQ only, even if AMQP is a generic protocol. I don't know other popular solutions and don't want to give promises that consumer will work for all AMQP versions/implementations.
  • I still want to make sure it's really possible to configure the system to use super streams. Maybe you can show the configuration? How message Ack-ing will change in this case BTW? Or is it fully compatible with standard queues API-wise? I have not looked deep myself yet.

What do you think @deefdragon ? Will the design I described above work for you? I still do not want to rush a lot with implementation of this trying to validate the approach first, but it seems to me that with the reduced scope it may be possible to add this to Centrifugo itself and benefit from this for existing Kafka consumer also.\

UPD. Probably, we should also try to support broadcast in the new mode, because technically the payload for the broadcast it the same, just need to pass/extract list of channels, instead of one channel.

@FZambia
Copy link
Member

FZambia commented Sep 23, 2024

@deefdragon – kindly reminding, would be nice to hear your opinion ^^

@deefdragon
Copy link
Author

deefdragon commented Sep 23, 2024

Apologies on the slow response. family issues have taken my time. I'm on leave for a few weeks now, and tackling this is on my list to do.

Will have a longer form response later today on your thoughts.

@deefdragon
Copy link
Author

I think I follow everything youve brought up, but I have some clarifying questions.

  1. When you say add a different mode, the user would have to configure that on the consumer right? Would that option be provided in the top level object like this

        {
            "name": "xxx",
            "mode": "publication_data",
            "type": "postgresql",
            "postgresql": {...}
        },

    or in the service-specific object like this?

        {
            "name": "xxx",
            "type": "postgresql",
            "postgresql": {
                "mode": "publication_data",
                ...
            }
        },
  2. When you said

    should not depend on specific library

    You meant that if we migrated the rabbitmq library from https://pkg.go.dev/github.com/streadway/amqp to https://pkg.go.dev/github.com/rabbitmq/amqp091-go the user should not have to change any settings & it should be seamless? (mind, streadway is already deprecated so not the best example).

  3. Would the configuration for the publication_data mode then be in the service-specific object (IE in the postgres block)? I feel like it would not make sense to have it be anywhere else as each service is likely going to require different configuration options? But there's also likely going to be a lot of overlap in options as well for this method (such as what channel to send to or the template to construct the channel from anyway).

    I feel like this might just be something that requires messing with the code to see what feels better to work with.

  4. Overall I think having something that benefits all of the services that can be consumed from would be more ideal yeah.

  5. I'm good on calling it rabbitmq instead of amqp.

  6. When I'm testing, Ill make sure that the super-streams are as drop-in as I believe they are.

  7. Can you confirm what you mean by message ack-ing? Just so I'm answering the right question.

Is there any chance you could query some of your users to see how they are using the existing Kafka/Postgres async consumers? See if there is anything that could inform how we are implementing this?

I also don't want to rush this. Any implementation needs to be good enough that others are willing to use it.

@FZambia
Copy link
Member

FZambia commented Sep 29, 2024

the user would have to configure that on the consumer right? Would that option be provided in the top level object like this

Would the configuration for the publication_data mode then be in the service-specific object (IE in the postgres block)? I feel like it would not make sense to have it be anywhere else as each service is likely going to require different configuration options? But there's also likely going to be a lot of overlap in options as well for this method (such as what channel to send to or the template to construct the channel from anyway).

Very good question, after thinking a bit – I do not see any major benefit of having it on one level or another, though for some reason biased towards upper level (near the type of consumer) – as an attempt to standardize the mode. Whether different consumers support it or not may be documented and validated on start. At the same time the options how to extract list of channels from external system message will stay inside individual consumer config - it seems impossible to somehow make it the same for everything. In PostgreSQL case channels may be sent comma-separated through method column for example, won't require any additional configuration. In Kafka/Rabbit/Nats case in headers – will require additional options.

You meant that if we migrated the rabbitmq library from https://pkg.go.dev/github.com/streadway/amqp to https://pkg.go.dev/github.com/rabbitmq/amqp091-go the user should not have to change any settings & it should be seamless?

Yeah, I mean we can't expose templating based on streadway/amqp.Message type as input for example - need to abstract somehow. BTW, does your use case require templating? Will it work for you if you just pass a list of Centrifugo channels in one of the headers? I.e. prepared on producer level? Or producer should not know about Centrifugo? If producer can construct channels - we can start with sth simple like an option to use the name of header to get list of channels, without templating. If no - then templating seems useful on start.

Can you confirm what you mean by message ack-ing? Just so I'm answering the right question.

I mean whether approach how consumer acks processed messages in super streams scenario differs somehow from basic queue consuming. Of the API is fully the same – and no differences in consumer code really required.

Is there any chance you could query some of your users to see how they are using the existing Kafka/Postgres async consumers? See if there is anything that could inform how we are implementing this?

I see the usage stats of async consuming feature usage, it was introduced relatively recently, so about only 12 setups using it, 10 use Kafka, 2 use PostgreSQL. Will try asking in the community channel for some details from users of feature – though I doubt I'll find any feedback this way.

@deefdragon
Copy link
Author

I will mess with the config layout to see what feels right, tho I suspect it being at the top level will be. And yeah, it makes sense that there will be some different configuration for each async consumer.

I can make the channel just the raw header, but Id really like to have at-least some simple template support (full go templates are def too much after experimenting with them) so that I don't have to add a header for centrifugo specifically. It would be nice if I could get it set up so that there's some standard to work with for all of the implementations.

The way RabbitMQ designed their APIs for super-streams and the like, its the exact same API. Changing the settings on the queue side to enable super-streams requires no additional work on the consumer side. (again, I will test this to confirm, but I'm pretty sure on this case)

I'm going to try to take a crack at this this week, but I cant guarantee anything.

@FZambia
Copy link
Member

FZambia commented Oct 6, 2024

I can make the channel just the raw header, but Id really like to have at-least some simple template support (full go templates are def too much after experimenting with them) so that I don't have to add a header for centrifugo specifically. It would be nice if I could get it set up so that there's some standard to work with for all of the implementations.

Centrifugo already depends on github.com/valyala/fasttemplate for some features (dynamic JWKs) -

github.com/valyala/fasttemplate v1.2.2
- probably can be a good fit. At the same time templating may be the next step when really required. Starting with a special header with channels especially since in your use case it's viable – seems good to me also, may be even more explicit at the end.

@FZambia
Copy link
Member

FZambia commented Nov 13, 2024

I wanted to prepare barebones for this while working on #832 – and here are some outcomes:

  • new mode seems not a good fit for PostgreSQL consumer, and it does not have a lot of sense for outbox table actually which is specific for Centrifugo
  • for Kafka it makes sense, since re-using existing topics sounds beneficial. But since it seems not really possible to implement the mode for all the async consumers (due to PG concerns above), I now think a better idea would be make this an implementation detail of specific consumer.

Did MVP for Kafka in v6_dev branch, it looks like this:

// KafkaConsumerConfig is a configuration for Kafka async consumer.
type KafkaConsumerConfig struct {
        ...

	// PublicationDataMode is a configuration for the mode where message payload already
	// contains data ready to publish into channels, instead of API command.
	PublicationDataMode KafkaPublicationDataModeConfig `mapstructure:"publication_data_mode" json:"publication_data_mode" envconfig:"publication_data_mode" yaml:"publication_data_mode" toml:"publication_data_mode"`
}

// KafkaPublicationDataModeConfig is a configuration for Kafka publication data mode.
// In this mode we expect Kafka message payload to contain data ready to publish into
// channels, instead of API command. All other fields used to build channel Publication
// can be passed in Kafka message headers – thus it's possible to integrate existing
// topics with Centrifugo.
type KafkaPublicationDataModeConfig struct {
	// Enabled enables Kafka publication data mode for the Kafka consumer.
	Enabled bool `mapstructure:"enabled" json:"enabled" envconfig:"enabled" yaml:"enabled" toml:"enabled"`
	// ChannelsHeader is a header name to extract channels to publish data into
	// (channels must be comma-separated). Ex. of value: "channel1,channel2".
	ChannelsHeader string `mapstructure:"channels_header" json:"channels_header" envconfig:"channels_header" yaml:"channels_header" toml:"channels_header"`
	// IdempotencyKeyHeader is a header name to extract Publication idempotency key from
	// Kafka message. See https://centrifugal.dev/docs/server/server_api#publishrequest.
	IdempotencyKeyHeader string `mapstructure:"idempotency_key_header" json:"idempotency_key_header" envconfig:"idempotency_key_header" yaml:"idempotency_key_header" toml:"idempotency_key_header"`
	// DeltaHeader is a header name to extract Publication delta flag from Kafka message
	// which tells Centrifugo whether to use delta compression for message or not.
	// See https://centrifugal.dev/docs/server/delta_compression and
	// https://centrifugal.dev/docs/server/server_api#publishrequest.
	DeltaHeader string `mapstructure:"delta_header" json:"delta_header" envconfig:"delta_header" yaml:"delta_header" toml:"delta_header"`
}

For the previous discussion it means the following:

  • consumer's default config still need to consume API commands from queue – to keep things consistent with current functionality
  • it's possible that some consumers may provide publication_data_mode config which is specific for each consumer.

I just prototyped this all yesterday, maybe some better ideas will come up as time goes...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants