-
Notifications
You must be signed in to change notification settings - Fork 162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
not able to access message timestamp #261
Comments
I think that #245 covers this. Of course, as usual with Open Source, the quickest way to get things done is to implement it and submit a PR ;-) |
Hi @dams, it does look like this is because we are using the v0 format. I am currently working may way up to the new formats, but it may take a while because I have to do it in a way that will (hopefully) work for everyone. If you need this urgently, you might be better off forking the repo and making the changes you need :/ Of course if you come up with a good strategy for supporting the various formats, please do submit a PR! Regardless, this is really useful feedback because it lets us know what the community's needs are. |
I have implemented a great deal of the Perl Kafka driver, and the way I did it was to implement support for the api that returns what the broker is capable of, then choose the highest supported version for each api endpoint. Then each time a request is done, the apiversion is set, and kept around when we receive the reply. Then data decoding is done consistently.
I’ll fork and patch for my need and try to do a useful PR.
… Le 6 déc. 2017 à 17:52, Dan Swain ***@***.***> a écrit :
Hi @dams, it does look like this is because we are using the v0 format. I am currently working may way up to the new formats, but it may take a while because I have to do it in a way that will (hopefully) work for everyone. If you need this urgently, you might be better off forking the repo and making the changes you need :/ Of course if you come up with a good strategy for supporting the various formats, please do submit a PR!
Regardless, this is really useful feedback because it lets us know what the community's needs are.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
|
We could do that if:
|
|
🤔 That's a good idea, but we'd need to drop 0.8 support at least (#260) It sounds easy to just pick an API version per broker version, but the code wasn't laid out in a way that makes that easy; that's the main part of the problem. |
Indeed, I didn't check how hard it would be to lay out the code again. about 0.8 support, in the Perl client, here is what happens, from higher to lower priority: |
I actually think it shouldn't be difficult to implement, the protocols are completely decoupled from the server/worker implementations, and the logic for differing logic between 0.8.0, 0.8.2 and 0.9.0 are also decoupled. If I was going to implement this I'll start off implementing the wire protocol, which is most likely just making slight modification after copying the base implementations and then wire it up verify it works from the console and then add logic to switch in the servers (I understand I might be speaking from a position of familiarity) |
I'm working on some refactoring of the server implementations right now and I had planned to revisit the protocols after that. I'm traveling this week, though, so progress is a bit slow right now. Anyone else is welcome to take it on but let me know so we don't duplicate efforts. |
For anyone that needs the timestamp information can use my fork (https://github.com/chen116/kafka_ex/tree/cleantime), the branch is based on tag 0.10.0. I have not tested it extensively, and pretty sure I am not following any good code structure, but it is enough for my use such as stream the changes I made are: creats an extra function that use api_version=2 https://github.com/chen116/kafka_ex/blob/b213e378759eab549311e6982dcb8197afa6fde4/lib/kafka_ex/protocol.ex#L42 and have that function called in https://github.com/chen116/kafka_ex/blob/b213e378759eab549311e6982dcb8197afa6fde4/lib/kafka_ex/protocol/fetch.ex#L62 The rest is just following around the chain of functions and adjust them according to https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets |
@chen116 the PR here: https://github.com/kafkaex/kafka_ex/pull/364/files is adding timestamp to the message_set.... you should take a look at the PR. |
no wayyyy, nice, will try it out, I assume is the same usage, thanks for letting me know |
ok, can't wait! thanks again |
Hi,
I'm consuming messages from a topic where brokers automatically set timestamps to messages when they reach them.
I'd like to access this timestamp when I'm consuming the messages, but I can't find a way to access it.
For reference, it 's the Timestamp field in the kafka message protocol, here: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
Is it because kafkaex doesn't support v1 MEssage format (so kafka 0.10.x) ?
If yes, any plan to support it, or any pointer to only implement support for this new message format, for a quick hack (as this feature is critical for my usage)
Thanks!
The text was updated successfully, but these errors were encountered: