Skip to content

Commit

Permalink
Fix IndexError when submitting buffered futures
Browse files Browse the repository at this point in the history
  • Loading branch information
puigru committed Aug 31, 2020
1 parent 8c3361d commit 62864a1
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions core.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,21 @@ def __init__(self, submit_size, *args, **kwargs):
def submit(self, fn, *args, **kwargs):
self._buffer.append((fn, args, kwargs))

def __submit_from_buffer(self):
fn, args, kwargs = self._buffer.pop(0)
return self._executor.submit(fn, *args, **kwargs)
def __submit_from_buffer(self, count=1):
futures = list()
while count and self._buffer:
fn, args, kwargs = self._buffer.pop(0)
futures.append(self._executor.submit(fn, *args, **kwargs))
count -= 1
return futures

def as_completed(self):
submitted = [self.__submit_from_buffer() for _ in range(self._submit_size)]
submitted = self.__submit_from_buffer(self._submit_size)
while self._buffer and not self._shutdown:
done, _ = concurrent.futures.wait(submitted, return_when=concurrent.futures.FIRST_COMPLETED)
for future in done:
submitted.remove(future)
submitted.append(self.__submit_from_buffer())
submitted += self.__submit_from_buffer()
yield future

def shutdown(self, wait=True):
Expand Down

0 comments on commit 62864a1

Please sign in to comment.