-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Using partition "breaks" program logic #427
Comments
Found a closely related SO post, unfortunately not providing a clear answer:
Is there none or is the person just "not aware of it"? |
I made some progress now. The following seems to achieve what I was hoping to achieve. I am however not 100% sure if this only works in this pathological toy-example. I am mainly unsure if the call to import streamz
import threading
from functools import partial
def main():
# Event used to signal when processing is done
signal_done = threading.Event()
state = {
"cnt": 0,
}
done_cb = partial(_singal_done_cb, evnt=signal_done)
# crate a RefCounter keeping track of number of items in stream
ref_c = streamz.RefCounter(cb=done_cb)
source = streamz.Stream()
parted = source.partition(10001, timeout=2) # <= PARTITION HERE
cntd = parted.accumulate(cnt,
returns_state=True,
start=state)
cntd.sink(dev_null)
with open("many_lines.txt", "r") as fh:
signal_done.clear()
for line in fh:
source.emit(line, metadata=[{"ref": ref_c}])
signal_done.wait(timeout=5)
print(f"found {state.get('cnt')} lines")
def cnt(state, itm):
state["cnt"] += 1
return state, itm
def dev_null(itm):
return None
def _singal_done_cb(evnt=None):
evnt.set()
if __name__ == "__main__":
main() This now returns
after about 2 seconds, which is expected since |
I am struggling to use
partition
in a pipeline because it "breaks" the logic of my program; presumably because it introduces asynchronous processing.As a simplified example, I have something that works along the lines of this:
This basically runs through all the lines in the file
many_lines.txt
, counts and prints them and then reportsSo far so good.
When I introduce
partition
now, like this:I would want to see basically the same result. But I see nothing for some time and then
I know, there are only 10'000 lines in
many_lines.txt
so the partition will never fill up, but it should hit the timeout at some point and "release" the data, no?I suspect that the program terminates before the partition hits the timeout, so I tried (many variations of)
await
ingstream.emit(line)
. That was inspired by theasync def process_file(fn):
function in Processing Time and Back Pressure.For example like this:
But this (obviously) does not work (
SyntaxError: 'await' outside async function
). And I also did not find a way to make it work.(How) Can I make sure the
for
loop terminates before theprint
statement (or any remaining code, for that matter) is executed? Or am I getting this completely wrong?My use case is to read (all) lines in pretty big files (I cannot load into memory at once), send them through a streamz pipeline and then continue with my program. "Then" meaning, after all lines are processed (also those that might be "stuck" in a partition when no more lines are emitted because we reached EOF; this is why I need the
timeout
, I believe).The text was updated successfully, but these errors were encountered: