-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement pre-fetching in map() and gen() #521
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -325,6 +325,7 @@ | |
parallel=None, | ||
workers=None, | ||
min_task_size=None, | ||
prefetch: Optional[int] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. q: |
||
sys: Optional[bool] = None, | ||
) -> "Self": | ||
"""Change settings for chain. | ||
|
@@ -351,7 +352,7 @@ | |
if sys is None: | ||
sys = self._sys | ||
settings = copy.copy(self._settings) | ||
settings.add(Settings(cache, parallel, workers, min_task_size)) | ||
settings.add(Settings(cache, parallel, workers, min_task_size, prefetch)) | ||
return self._evolve(settings=settings, _sys=sys) | ||
|
||
def reset_settings(self, settings: Optional[Settings] = None) -> "Self": | ||
|
@@ -801,6 +802,8 @@ | |
``` | ||
""" | ||
udf_obj = self._udf_to_obj(Mapper, func, params, output, signal_map) | ||
if (prefetch := self._settings.prefetch) is not None: | ||
udf_obj.prefetch = prefetch | ||
|
||
return self._evolve( | ||
query=self._query.add_signals( | ||
|
@@ -838,6 +841,8 @@ | |
``` | ||
""" | ||
udf_obj = self._udf_to_obj(Generator, func, params, output, signal_map) | ||
if (prefetch := self._settings.prefetch) is not None: | ||
udf_obj.prefetch = prefetch | ||
return self._evolve( | ||
query=self._query.generate( | ||
udf_obj.to_udf_wrapper(), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,11 +7,19 @@ def __init__(self, msg): | |
|
||
|
||
class Settings: | ||
def __init__(self, cache=None, parallel=None, workers=None, min_task_size=None): | ||
def __init__( | ||
self, | ||
cache=None, | ||
parallel=None, | ||
workers=None, | ||
min_task_size=None, | ||
prefetch=None, | ||
): | ||
self._cache = cache | ||
self.parallel = parallel | ||
self._workers = workers | ||
self.min_task_size = min_task_size | ||
self.prefetch = prefetch | ||
|
||
if not isinstance(cache, bool) and cache is not None: | ||
raise SettingsError( | ||
|
@@ -66,3 +74,5 @@ def add(self, settings: "Settings"): | |
self.parallel = settings.parallel or self.parallel | ||
self._workers = settings._workers or self._workers | ||
self.min_task_size = settings.min_task_size or self.min_task_size | ||
if settings.prefetch is not None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is a reason to have a mix of styles here - some protected vars, some not, some |
||
self.prefetch = settings.prefetch |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
import functools | ||
from collections import Counter | ||
from contextlib import contextmanager | ||
from queue import Queue | ||
|
||
import pytest | ||
from fsspec.asyn import sync | ||
|
@@ -111,6 +112,37 @@ async def process(row): | |
list(mapper.iterate(timeout=4)) | ||
|
||
|
||
@pytest.mark.parametrize("create_mapper", [AsyncMapper, OrderedMapper]) | ||
def test_mapper_deadlock(create_mapper): | ||
shcheklein marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will it deadlock if we don't wrap producer into a thread? were you trying to test (make sure that producer is wrapped)? |
||
queue = Queue() | ||
inputs = range(50) | ||
|
||
def as_iter(queue): | ||
while (item := queue.get()) is not None: | ||
yield item | ||
|
||
async def process(x): | ||
return x | ||
|
||
mapper = create_mapper(process, as_iter(queue), workers=10, loop=get_loop()) | ||
it = mapper.iterate(timeout=4) | ||
for i in inputs: | ||
queue.put(i) | ||
|
||
# Check that we can get as many objects out as we put in, without deadlock | ||
result = [] | ||
for _ in range(len(inputs)): | ||
result.append(next(it)) | ||
if mapper.order_preserving: | ||
assert result == list(inputs) | ||
else: | ||
assert set(result) == set(inputs) | ||
|
||
# Check that iteration terminates cleanly | ||
queue.put(None) | ||
assert list(it) == [] | ||
|
||
|
||
@pytest.mark.parametrize("create_mapper", [AsyncMapper, OrderedMapper]) | ||
@settings(deadline=None) | ||
@given( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: why do they have to be thread safe (I mean cursor results)?Since we run producer in a separate thread now in async mapper?Q: are there any implications in terms of memory usage for this?