You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm using rxpy for an event-driven application and after some stress tests I noticed a strange behaviour on memory usage.
I've created a simple script to reproduce the issue.
from rx.scheduler import ThreadPoolScheduler
from rx.subject import Subject
import rx.operators as ops
from threading import current_thread
import multiprocessing
import rx
import time
pool_scheduler = ThreadPoolScheduler(multiprocessing.cpu_count())
pipeline = [ops.map(lambda x: x**2)]
pipeline.append(ops.observe_on(pool_scheduler))
s = Subject()
s.pipe(*pipeline).subscribe(on_next=lambda x: print("{}, thread: {}".format(x, current_thread().name)))
for i in range(100000000000):
s.on_next(i)
input()
I'm using memory-profile library for tracing the memory usage of this script, after 30 seconds of run the result is the following:
The memory is growing linearly.
If I place a time.sleep(0.1) inside the for loop the memory usage is steady.
from rx.scheduler import ThreadPoolScheduler
from rx.subject import Subject
import rx.operators as ops
from threading import current_thread
import multiprocessing
import rx
import time
pool_scheduler = ThreadPoolScheduler(multiprocessing.cpu_count())
pipeline = [ops.map(lambda x: x**2)]
pipeline.append(ops.observe_on(pool_scheduler))
s = Subject()
s.pipe(*pipeline).subscribe(on_next=lambda x: print("{}, thread: {}".format(x, current_thread().name)))
for i in range(100000000000):
s.on_next(i)
time.sleep(0.1)
input()
I don't know if the problem is related to python / rxpy or if I'm using the library in a wrong way.
PS: the memory usage is steady if I get rid of scheduler and everything is executed on the main thread.
The text was updated successfully, but these errors were encountered:
damianoct
changed the title
Memory increases with subject and threadpool scheduler
Memory usage increases with subject and threadpool scheduler
Sep 1, 2021
If you wait until the program completes, does the memory go down?
It looks like the code generates the items faster than they are processed in the thread pool. There is an unbounded queue to schedule the items between the source and the threads in the ThreadSchedulers. So the memory grows up until the source completes and the threads can catch up.
When you add the sleep statement, you slow down the production of the source items and they can be processed faster than they are produced.
If you wait until the program completes, does the memory go down?
Yes, I ran a script that processes 1 million source items without the time sleep and the memory is releasing over the time.
My question is:
I'm using the same setup (subject + thread pool) in a python application and the frequency of source creation and process could be very high.
How can I avoid this memory usage if I have the production of the source items is faster than the process time?
Maybe should I use multiple observable (multiple subjects)?
You need to handle backpressure in your application. Unfortunately, there is no built-in solution for this in RxPY.
Depending on the structure of your application, you can use different techniques.
Hi,
I'm using rxpy for an event-driven application and after some stress tests I noticed a strange behaviour on memory usage.
I've created a simple script to reproduce the issue.
I'm using memory-profile library for tracing the memory usage of this script, after 30 seconds of run the result is the following:
The memory is growing linearly.
If I place a time.sleep(0.1) inside the for loop the memory usage is steady.
I don't know if the problem is related to python / rxpy or if I'm using the library in a wrong way.
PS: the memory usage is steady if I get rid of scheduler and everything is executed on the main thread.
The text was updated successfully, but these errors were encountered: