Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unexpected behaviour regarding item count #694

Open
jonathan-dev opened this issue Jun 5, 2023 · 2 comments
Open

unexpected behaviour regarding item count #694

jonathan-dev opened this issue Jun 5, 2023 · 2 comments

Comments

@jonathan-dev
Copy link

Describe the bug
How is is plausible in my example below that with the marked section commented out the numbers that are printed before the section always sum to 2000 but when uncommenting that section it sums to 1999 or 2000. I am really interested in the cause for that or if that might be a bug in the rx library (using version 4.0.4)

To Reproduce

import time
from datetime import datetime, timedelta

import reactivex as rx
from reactivex import operators as ops, Observable
from reactivex.scheduler import ThreadPoolScheduler
from reactivex.subject import Subject

su: Observable[int] = Subject()
su = Subject()

thread = ThreadPoolScheduler(max_workers=1)

su.pipe(
    ops.window_with_time_or_count(count=1_000, timespan=timedelta(milliseconds=1_000)),
    ops.flat_map(lambda window: window.pipe(
        ops.count(),
    )),
    ops.do_action(lambda x: print(f"x:{x}")),
    # start section
    ops.map(lambda x: rx.of(range(x)).pipe(
        ops.map(lambda x: f"test: {x}"),
    )),
    # end section
).subscribe()


for i in range(2000):
    time.sleep(1/1000)
    t = datetime.now()
    su.on_next(i)

time.sleep(1)

Expected behavior
A clear and concise description of what you expected to happen.

Code or Screenshots
If applicable, add a minimal and self contained code example or screenshots to help explain your problem.

def foo(self) -> str:
    return 3
  • OS [ubuntu]
  • RxPY version [4.0.4]
  • Python version [3.10.2]
@matiboy
Copy link
Collaborator

matiboy commented Jun 10, 2023

Hi @jonathan-dev thank you for submitting this issue;

I've modified your example slightly as below, I do believe it achieves the same:

def test_issue_694_fail():
    import time
    from datetime import timedelta
    from reactivex import operators as ops
    from reactivex.subject import Subject

    su = Subject()
    v = 0

    def set_checker(x):
        nonlocal v
        v = x

    def wait_a_while(*_):
        time.sleep(10 / 1000)  # <-- the "10" here will be roughly how many subject emissions are missed

    su.pipe(
        ops.window_with_time_or_count(
            count=1_000, timespan=timedelta(milliseconds=1_000)
        ),
        ops.flat_map(
            lambda window: window.pipe(
                ops.count(),
            )
        ),
        ops.do_action(wait_a_while),  # <-- cause a delay when a window completes and `count` emits on_next
        ops.scan(lambda acc, x: acc + x, 0), # <-- use scan for easier check of what I think you meant when you said "sums to 1999 or 2000"
    ).subscribe(on_next=set_checker)

    for i in range(2000):
        time.sleep(1 / 1000)
        su.on_next(i)

    time.sleep(2)
    assert v == 2_000  # <-- will fail with something like 1980 != 2000 ; since we sleep 10 ticks two times, 20 are missing

The "section" you commented out in your code is replaced with wait_a_while because that's what causes the problem: the computation time makes the window miss emissions.

This only happens for a window that is closed due to time (not count) because that is happening on a separate thread - whereas for count, it's on the same thread. So while the window is being closed by s.on_completed() in the create_timer window, the main thread continues to emit and that's not caught by the next window.

I do not feel that this is a bug, but a threading issue. Happy to hear your thoughts.

Below is a version using a lock which I believe works, though it might not be suitable for your use case:

def test_issue_694():
    import time
    from datetime import timedelta
    from reactivex import operators as ops
    from reactivex.subject import Subject

    su = Subject()

    lock = threading.Lock()
    v = 0

    def set_checker(x):
        nonlocal v
        v = x
        lock.release() # <-- unlock only at the very end of the function executions caused by window's on_completed() call

    def wait_a_while(*_):
        time.sleep(10 / 1000)

    su.pipe(
        ops.window_with_time_or_count(
            count=1_000, timespan=timedelta(milliseconds=1_000)
        ),
        ops.flat_map(
            lambda window: window.pipe(
                ops.do_action(on_completed=lock.acquire), # <-- lock as soon as window is completed
                ops.count(),
            )
        ),
        ops.do_action(wait_a_while),
        ops.scan(lambda acc, x: acc + x, 0),
        # end section
    ).subscribe(on_next=set_checker)

    for i in range(2000):
        time.sleep(1 / 1000)
        while lock.locked():
            pass # <-- wait until the lock is released on the other thread
        su.on_next(i)

    time.sleep(2)
    assert v == 2_000

Hope this helps!

@matiboy
Copy link
Collaborator

matiboy commented Jun 27, 2023

Hi there, just as a follow up, your case got me diving a bit more into schedulers; I believe that using an EventLoopScheduler would likely be the right way: the version below also works:

def test_issue_694_event_loop():
    import time
    from datetime import timedelta
    from reactivex import operators as ops
    from reactivex.subject import Subject
    from reactivex.scheduler import EventLoopScheduler

    su = Subject()

    event_loop = EventLoopScheduler()
    v = 0

    def set_checker(x):
        nonlocal v
        v = x

    def wait_a_while(*_):
        time.sleep(10 / 1000)

    su.pipe(
        ops.window_with_time_or_count(
            count=1_000, timespan=timedelta(milliseconds=1_000)
        ),
        ops.flat_map(
            lambda window: window.pipe(
                ops.observe_on(event_loop),  # <- event loop
                ops.count(),
            )
        ),
        ops.do_action(wait_a_while),
        ops.scan(lambda acc, x: acc + x, 0),
        # end section
    ).subscribe(on_next=set_checker)

    for i in range(2000):
        time.sleep(1 / 1000)
        su.on_next(i)

    time.sleep(2)
    assert v == 2_000

Simply commenting out the ops.observer_on line shows the initial issue again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants