Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(common.socket): Allow parallel parsing with a pool of workers #15891

Merged
merged 10 commits into from
Oct 9, 2024

Conversation

LarsStegman
Copy link
Contributor

Summary

This is a proof of concept for the issue I raised in #15884. I feel like there might be a better way to implement this, than my way. I was thinking of something like the Options pattern, but I am not sure about this.

I am very open to discuss this! @srebhan do you have time to discuss this? I am not even sure if it is something you're open to add to Telegraf, but I think it is worth it considering the increased performance we see in our application.

Checklist

  • No AI generated code was used in this PR

Related issues

resolves #15884

@srebhan srebhan marked this pull request as draft September 16, 2024 13:40
@srebhan
Copy link
Member

srebhan commented Sep 16, 2024

Hey! Thanks for being so active and contributing your findings and fixes! We recently added something similar to the kafka consumer plugin allowing to override the timestamp provided by parsing.
I see two options:

  1. Implementing the override in the input plugin using metric.SetTime() to overwrite the timestamp after parsing.
  2. Implementing the override as a general option for inputs in the running_input model.

Option two was discussed a while ago but we never found the time to implement it. Let me know if you are interested in this and I will detail more on what needs to be done.

@srebhan srebhan self-assigned this Sep 16, 2024
@LarsStegman
Copy link
Contributor Author

You're welcome, it helps me too, since I now only need to maintain some proprietary plugins. Maintaining a fork that touches a lot of the internals is very time consuming. And most importantly, I enjoy contributing. I am also glad that my company allows me to.

I'm up to working on it. I wasn't really aware of how the internals for Telegraf worked, since I worked mostly on plugins. This weekend of profiling and optimizing really made things a lot clearer. I think I see how it would be implemented on the running_input, but please elaborate.

Regarding moving parsing for inputs.socket_listener to a separate goroutine. Should I make that a separate PR after the override has been added to the running_input? Is that something that stands a chance of getting merged?

@srebhan
Copy link
Member

srebhan commented Sep 19, 2024

I think I see how it would be implemented on the running_input, but please elaborate.

The idea is to add a general timesource option with the choices

  • metric: use the timestamp of the metric as provided by the input plugin e.g. through parsing (default)
  • collection-start: use the timestamp when gathering started for all metrics
  • collection-end: use the timestamp when gathering finished for all metrics

to all inputs by adding a new config option (and parse it) to the inputs.

Then you adapt the metric using the start time (stored in the model struct) or end time (stored in the model struct) and call SetTime() on all metrics, or do nothing dependent on the config option.

Does this make things more clear?

Regarding moving parsing for inputs.socket_listener to a separate goroutine. Should I make that a separate PR after the override has been added to the running_input? Is that something that stands a chance of getting merged?

I think we might accept this if there is a compelling argumentation why this is needed and what the benefit will be. I guess you are already having those numbers as you are asking... ;-)
One thing, please use a thread pool like github.com/alitto/pond (see e.g. the snmp_lookup processor) to allow the user to keep control over the number of threads spawned as otherwise it might be easily possible to DoS machines...

@LarsStegman
Copy link
Contributor Author

Yes, that makes it clear and is about what I expected. I'll try to work on it tomorrow!

I'll also work on the async parsing.

@srebhan
Copy link
Member

srebhan commented Sep 19, 2024

Just to be clear, async parsing should be an own PR! Please also double check that parser calls are thread-safe! I remember that we had this a while back with Avro but I want to urge you to double check! :-)

@LarsStegman
Copy link
Contributor Author

LarsStegman commented Sep 19, 2024

Yes, that was my plan as well :) Easier to get things merged when the changes are small and isolated.

I thought the "contract" requires calls to Parse to be thread safe, right?

@LarsStegman
Copy link
Contributor Author

The idea is to add a general timesource option with the choices

  • metric: use the timestamp of the metric as provided by the input plugin e.g. through parsing (default)
  • collection-start: use the timestamp when gathering started for all metrics
  • collection-end: use the timestamp when gathering finished for all metrics

I am working on implementing it, but I am a little uncertain how this would work for Service plugins. Those often don't do anything in Gather, so overriding it with a collection-start/end time is useless.

I was thinking of maybe adding the same feature to the RunningParser, but even then there is no guarantee this is the correct time. The time when the Parse function is called, does not need to be the same as the time the original data comes in. For example, I read the UDP buffer, and then send the data into a goroutine to be parsed. The goroutine is not guaranteed to be started immediately.

@srebhan
Copy link
Member

srebhan commented Sep 19, 2024

@LarsStegman right, service plugins will need to take care of timestamp overriding themselves...

@LarsStegman LarsStegman force-pushed the feat/parser-with-timestamp branch from 8f7aa5d to 538a885 Compare September 20, 2024 12:43
@LarsStegman LarsStegman changed the title WIP: feat(parsers): support passing receive timestamp to parser feat(inputs.socket_listener): parallel parsing Sep 20, 2024
@telegraf-tiger telegraf-tiger bot added feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins labels Sep 20, 2024
@LarsStegman LarsStegman marked this pull request as ready for review September 20, 2024 12:47
@LarsStegman LarsStegman force-pushed the feat/parser-with-timestamp branch 2 times, most recently from c41db77 to 9eaa313 Compare September 21, 2024 06:12
Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @LarsStegman for your contribution! A few comments from my side.

plugins/common/socket/datagram.go Outdated Show resolved Hide resolved
plugins/common/socket/datagram.go Show resolved Hide resolved
plugins/common/socket/datagram.go Outdated Show resolved Hide resolved
plugins/common/socket/datagram.go Outdated Show resolved Hide resolved
plugins/common/socket/datagram.go Outdated Show resolved Hide resolved
plugins/common/socket/datagram.go Outdated Show resolved Hide resolved
plugins/common/socket/socket.go Outdated Show resolved Hide resolved
plugins/common/socket/socket.go Outdated Show resolved Hide resolved
plugins/common/socket/stream.go Show resolved Hide resolved
plugins/inputs/socket_listener/socket_listener.go Outdated Show resolved Hide resolved
Copy link
Contributor Author

@LarsStegman LarsStegman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I answered some questions and gave some explanations. I have not yet implemented any changes, I will do that tomorrow.

plugins/common/socket/datagram.go Show resolved Hide resolved
plugins/common/socket/datagram.go Outdated Show resolved Hide resolved
plugins/common/socket/stream.go Show resolved Hide resolved
plugins/inputs/socket_listener/socket_listener.go Outdated Show resolved Hide resolved
plugins/common/socket/socket.go Outdated Show resolved Hide resolved
plugins/inputs/socket_listener/socket_listener.go Outdated Show resolved Hide resolved
@LarsStegman LarsStegman requested a review from srebhan October 1, 2024 08:35
@srebhan
Copy link
Member

srebhan commented Oct 2, 2024

Please move the time-source part to another PR and keep the parallelization in here!

@LarsStegman LarsStegman force-pushed the feat/parser-with-timestamp branch from bad5458 to a22481d Compare October 3, 2024 07:20
@LarsStegman LarsStegman changed the title feat(inputs.socket_listener): parallel parsing feat(common.socket): parallel parsing Oct 3, 2024
@LarsStegman LarsStegman force-pushed the feat/parser-with-timestamp branch from 814740d to 6d44ec7 Compare October 7, 2024 11:23
@telegraf-tiger
Copy link
Contributor

telegraf-tiger bot commented Oct 7, 2024

@srebhan srebhan changed the title feat(common.socket): parallel parsing feat(common.socket): Allow parallel parsing with a pool of workers Oct 7, 2024
@srebhan
Copy link
Member

srebhan commented Oct 7, 2024

@LarsStegman one more question because we had issues in the past... Did you test if taking down telegraf still works reliably with parallel workers?

@LarsStegman
Copy link
Contributor Author

LarsStegman commented Oct 7, 2024

With the first implementation with just creating new goroutines, it did not. I am not 100% sure with the pool implementation. I don't remember seeing issues, but I will test it tomorrow. I can imagine some things breaking, because the channel is already closed when parsing is finished or something. I should probably add some wait group things or something.

Edit: looking at the code again, there shouldn't be any problems, because I already wait until the pool is done. I will test it tomorrow to be sure, though.

@LarsStegman
Copy link
Contributor Author

@srebhan yeah, no problems. It stops gracefully.

Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your effort @LarsStegman!

@srebhan srebhan added the ready for final review This pull request has been reviewed and/or tested by multiple users and is ready for a final review. label Oct 8, 2024
@srebhan srebhan assigned DStrand1 and unassigned srebhan Oct 8, 2024
@DStrand1 DStrand1 merged commit c0a3656 into influxdata:master Oct 9, 2024
28 of 29 checks passed
@github-actions github-actions bot added this to the v1.33.0 milestone Oct 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins ready for final review This pull request has been reviewed and/or tested by multiple users and is ready for a final review.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

perf: Async parsing
3 participants