-
Notifications
You must be signed in to change notification settings - Fork 363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
unexpected behaviour regarding item count #694
Comments
Hi @jonathan-dev thank you for submitting this issue; I've modified your example slightly as below, I do believe it achieves the same: def test_issue_694_fail():
import time
from datetime import timedelta
from reactivex import operators as ops
from reactivex.subject import Subject
su = Subject()
v = 0
def set_checker(x):
nonlocal v
v = x
def wait_a_while(*_):
time.sleep(10 / 1000) # <-- the "10" here will be roughly how many subject emissions are missed
su.pipe(
ops.window_with_time_or_count(
count=1_000, timespan=timedelta(milliseconds=1_000)
),
ops.flat_map(
lambda window: window.pipe(
ops.count(),
)
),
ops.do_action(wait_a_while), # <-- cause a delay when a window completes and `count` emits on_next
ops.scan(lambda acc, x: acc + x, 0), # <-- use scan for easier check of what I think you meant when you said "sums to 1999 or 2000"
).subscribe(on_next=set_checker)
for i in range(2000):
time.sleep(1 / 1000)
su.on_next(i)
time.sleep(2)
assert v == 2_000 # <-- will fail with something like 1980 != 2000 ; since we sleep 10 ticks two times, 20 are missing The "section" you commented out in your code is replaced with This only happens for a window that is closed due to time (not count) because that is happening on a separate thread - whereas for count, it's on the same thread. So while the window is being closed by I do not feel that this is a bug, but a threading issue. Happy to hear your thoughts. Below is a version using a lock which I believe works, though it might not be suitable for your use case: def test_issue_694():
import time
from datetime import timedelta
from reactivex import operators as ops
from reactivex.subject import Subject
su = Subject()
lock = threading.Lock()
v = 0
def set_checker(x):
nonlocal v
v = x
lock.release() # <-- unlock only at the very end of the function executions caused by window's on_completed() call
def wait_a_while(*_):
time.sleep(10 / 1000)
su.pipe(
ops.window_with_time_or_count(
count=1_000, timespan=timedelta(milliseconds=1_000)
),
ops.flat_map(
lambda window: window.pipe(
ops.do_action(on_completed=lock.acquire), # <-- lock as soon as window is completed
ops.count(),
)
),
ops.do_action(wait_a_while),
ops.scan(lambda acc, x: acc + x, 0),
# end section
).subscribe(on_next=set_checker)
for i in range(2000):
time.sleep(1 / 1000)
while lock.locked():
pass # <-- wait until the lock is released on the other thread
su.on_next(i)
time.sleep(2)
assert v == 2_000 Hope this helps! |
Hi there, just as a follow up, your case got me diving a bit more into schedulers; I believe that using an def test_issue_694_event_loop():
import time
from datetime import timedelta
from reactivex import operators as ops
from reactivex.subject import Subject
from reactivex.scheduler import EventLoopScheduler
su = Subject()
event_loop = EventLoopScheduler()
v = 0
def set_checker(x):
nonlocal v
v = x
def wait_a_while(*_):
time.sleep(10 / 1000)
su.pipe(
ops.window_with_time_or_count(
count=1_000, timespan=timedelta(milliseconds=1_000)
),
ops.flat_map(
lambda window: window.pipe(
ops.observe_on(event_loop), # <- event loop
ops.count(),
)
),
ops.do_action(wait_a_while),
ops.scan(lambda acc, x: acc + x, 0),
# end section
).subscribe(on_next=set_checker)
for i in range(2000):
time.sleep(1 / 1000)
su.on_next(i)
time.sleep(2)
assert v == 2_000 Simply commenting out the |
Describe the bug
How is is plausible in my example below that with the marked section commented out the numbers that are printed before the section always sum to 2000 but when uncommenting that section it sums to 1999 or 2000. I am really interested in the cause for that or if that might be a bug in the rx library (using version 4.0.4)
To Reproduce
Expected behavior
A clear and concise description of what you expected to happen.
Code or Screenshots
If applicable, add a minimal and self contained code example or screenshots to help explain your problem.
The text was updated successfully, but these errors were encountered: