-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added to_kafka directly from a Dask worker #279
base: master
Are you sure you want to change the base?
Conversation
Codecov Report
@@ Coverage Diff @@
## master #279 +/- ##
==========================================
- Coverage 94.69% 93.61% -1.08%
==========================================
Files 13 13
Lines 1620 1644 +24
==========================================
+ Hits 1534 1539 +5
- Misses 86 105 +19
Continue to review full report at Codecov.
|
I'll add tests to get coverage now. |
@jsmaupin, did you get a chance to write some tests? @martindurant, @CJ-Wright, Any thoughts on this? |
@chinmaychandak I suspect this can be done with the existing to_kafka method. We just need to figure out the difference between this implementation and the existing one and add an |
Okay, let me take a look |
@jsmaupin The Hence, I am thinking that this current implementation of yours would be the best way out? |
|
||
client = default_client() | ||
result = client.submit(produce, self.topic, x, self.producer_config) | ||
self._emit(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this call self.emit
? That way it integrates with the async support?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Anything else that comes to mind that needs change, @CJ-Wright?
@martindurant Could you also please take a look at this? |
Circling back to this. After becoming more familiar with everything here, I'm thinking that this should support the back pressure like the existing |
This PR got left behind. Does anyone remember the status here? |
I proposed a solution here: https://stackoverflow.com/questions/60764361/write-to-kafka-from-a-dask-worker that I felt was a bit of a hack. I haven't found a better solution. I would be happy to follow through with this implementation if there are no objections. |
I think that approach is totally fine; but maybe I would improve it by having a dict of producers, with the key being a hash of the connection kwargs, because you could have different kafka jobs live in a cluster at the same time. Also, the attribute could as easily be a global variable in a module - especially if it's mutable like the dict I'm suggestign above. This seems cleaner to my mind, but I can't think of any technical reason that it's different (there is only one worker instance). |
Currently, we must
gather()
all the results from a Dask stream back to the master script and then push the results to Kafka. This removes all the benefits of parallel processing we get with Dask and Kafka. It would be much more efficient if we could push data directly from the Dask workers into Kafka.One issue I had getting this to work is that the Producer class from the Confluent Kafka Python library is not pickle-able. The workaround is to hide the Producer from the pickle function by creating it using "reflection" methods and create it on the worker side. However, I believe this adds a requirement that the confluent_kafka library must be installed on the worker.
Also, this implementation is serial, but Dask itself is parallel.