Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent Execution Not Working as Expected with RxPY ThreadPoolScheduler #713

Open
vjcspy opened this issue Mar 24, 2024 · 1 comment
Open

Comments

@vjcspy
Copy link

vjcspy commented Mar 24, 2024

Hello,

I'm currently encountering an issue with RxPY where I'm trying to execute multiple tasks concurrently using ThreadPoolScheduler, but they seem to be executing sequentially instead. My goal is to have the tasks "Start Alpha", "Beta", "Gamma" begin almost simultaneously, but there's a 2-second delay between each start, which is not what I was expecting. Below is the relevant part of my code:

import threading
import time

import reactivex
from reactivex import operators as ops
from reactivex.scheduler import ThreadPoolScheduler

from metastock.modules.core.logging.logger import Logger

pool_scheduler = ThreadPoolScheduler(5)

def intense_calculation(value):
    Logger().info(f"Start {value}")
    time.sleep(2)  # Simulate an intensive calculation
    return f"Result {value}"

# Create an Observable
source = reactivex.from_(["Alpha", "Beta", "Gamma"])
source.pipe(
    ops.observe_on(ThreadPoolScheduler(5)),
    ops.map(intense_calculation),
).subscribe(
    on_next=lambda s: Logger().info(
        f"Processed {s} on {threading.current_thread().name}"
    ),
    on_error=lambda e: Logger().info(e),
    on_completed=lambda: Logger().info("Process complete!"),
)

# Wait until all tasks are completed
input("Press any key to exit\n")

Logs are showing that the tasks are starting and finishing sequentially, each 2 seconds apart:

Press any key to exit
[03/24/24 12:05:31] INFO     Start Alpha                           rx_test.py:16
[03/24/24 12:05:33] INFO     Processed Result Alpha on             rx_test.py:27
                             ThreadPoolExecutor-1_0                             
                    INFO     Start Beta                            rx_test.py:16
[03/24/24 12:05:35] INFO     Processed Result Beta on              rx_test.py:27
                             ThreadPoolExecutor-1_1                             
                    INFO     Start Gamma                           rx_test.py:16
[03/24/24 12:05:37] INFO     Processed Result Gamma on             rx_test.py:27
                             ThreadPoolExecutor-1_1                             
                    INFO     Process complete!                     rx_test.py:31

I was under the impression that using ThreadPoolScheduler with a pool size of 5 would allow these tasks to run in parallel, but it appears they are not. I expect "Start Alpha", "Beta", "Gamma" to be logged simultaneously, with the operations happening concurrently on different threads.

Could anyone help identify what might be causing this sequential execution instead of concurrent execution, and how can I adjust my code to achieve the expected parallelism?

Thank you for any insights or suggestions you can provide!

Thank you in advance for your help!

@matiboy
Copy link
Collaborator

matiboy commented Mar 24, 2024

Hi @vjcspy

For extra information you can refer to @dbrattli 's reply on an older issue

That issue being from v3 times, below is a sample code that should get you the expected parallelism.

def main():
    import time

    import reactivex
    from reactivex import operators as ops
    from reactivex.scheduler import ThreadPoolScheduler

    start = time.time()

    pool_scheduler = ThreadPoolScheduler(5)
    messages = []

    def log(message):
        with threading.Lock():
            messages.append(f"{round(time.time() - start , 1)}: {message}")

    def intense_calculation(value):
        time.sleep(2)
        return f"Computed for {value}"
    
    # Create an Observable
    source = reactivex.from_(["Alpha", "Beta", "Gamma"])
    
    source.pipe(
        ops.flat_map(lambda s: reactivex.from_future(pool_scheduler.executor.submit(intense_calculation, s))),
    ).subscribe(
        on_next=lambda s: log(
            f"Processed {s} on {threading.current_thread().name}"
        ),
        on_error=lambda e: log(f"ERROR {e}"),
        on_completed=lambda: log("Process complete!"),
    )

    
    time.sleep(3)
    log("End")
    print(messages)

Additional issues that discuss this: #67

Hope this helps

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants