You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am often in the situation where i want to (using multiple cores) compute something like (f(x) for x in xs), where xs is a generator yielding a very large number of objects (say billions), so I don't want to materialize xs all at once in memory. multiprocessing.Pool()'s methods unfortunately do so when they populate the underlying work queue.
To get around this, I've been using a little helper function like the following to manually break up an iterator into slices and call Pool.map on each slice:
def lazymap(f, xs, chunksize=1000):
try:
n = len(xs)
except TypeError:
xs, _ = tee(xs)
n = sum(1 for x in _)
pbar = tqdm(xs, total=n)
with multiprocessing.Pool() as p:
while True:
rs = p.map(f, itertools.islice(xs, chunksize))
if rs:
pbar.update(len(rs))
for r in rs:
yield r
else:
break
but this is pretty kludgy. It would be great if parmap would be able to do something similar to avoid consuming the entire input iterator before multiprocessing starts doing work!
It might be as simple as avoiding calling len on line 239 and refactoring the way that pool.map_async is used, but I admit I don't understand the code all that well.
Thoughts?
The text was updated successfully, but these errors were encountered:
First, great little library!
I am often in the situation where i want to (using multiple cores) compute something like
(f(x) for x in xs)
, wherexs
is a generator yielding a very large number of objects (say billions), so I don't want to materializexs
all at once in memory.multiprocessing.Pool()
's methods unfortunately do so when they populate the underlying work queue.To get around this, I've been using a little helper function like the following to manually break up an iterator into slices and call
Pool.map
on each slice:but this is pretty kludgy. It would be great if parmap would be able to do something similar to avoid consuming the entire input iterator before
multiprocessing
starts doing work!It might be as simple as avoiding calling
len
on line 239 and refactoring the way thatpool.map_async
is used, but I admit I don't understand the code all that well.Thoughts?
The text was updated successfully, but these errors were encountered: