-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove dumps_task
#8067
Remove dumps_task
#8067
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,7 @@ | |
import dask | ||
from dask import delayed | ||
from dask.highlevelgraph import HighLevelGraph, MaterializedLayer | ||
from dask.utils import apply, parse_timedelta, stringify, tmpfile, typename | ||
from dask.utils import parse_timedelta, stringify, tmpfile, typename | ||
|
||
from distributed import ( | ||
CancelledError, | ||
|
@@ -74,7 +74,7 @@ | |
varying, | ||
wait_for_state, | ||
) | ||
from distributed.worker import dumps_function, dumps_task, get_worker, secede | ||
from distributed.worker import dumps_function, get_worker, secede | ||
|
||
pytestmark = pytest.mark.ci1 | ||
|
||
|
@@ -345,7 +345,26 @@ async def test_decide_worker_rootish_while_last_worker_is_retiring(c, s, a): | |
await wait(xs + ys) | ||
|
||
|
||
@pytest.mark.slow | ||
from distributed import WorkerPlugin | ||
|
||
|
||
class CountData(WorkerPlugin): | ||
def __init__(self, keys): | ||
self.keys = keys | ||
self.worker = None | ||
self.count = 0 | ||
|
||
def setup(self, worker): | ||
self.worker = worker | ||
|
||
def transition(self, start, finish, *args, **kwargs): | ||
count = 0 | ||
for k in self.worker.data: | ||
if k in self.keys: | ||
count += 1 | ||
self.count = max(self.count, count) | ||
|
||
|
||
@gen_cluster( | ||
nthreads=[("", 2)] * 4, | ||
client=True, | ||
|
@@ -359,33 +378,18 @@ async def test_graph_execution_width(c, s, *workers): | |
The number of parallel work streams match the number of threads. | ||
""" | ||
|
||
class Refcount: | ||
"Track how many instances of this class exist; logs the count at creation and deletion" | ||
|
||
count = 0 | ||
lock = dask.utils.SerializableLock() | ||
log = [] | ||
|
||
def __init__(self): | ||
with self.lock: | ||
type(self).count += 1 | ||
self.log.append(self.count) | ||
|
||
def __del__(self): | ||
with self.lock: | ||
self.log.append(self.count) | ||
type(self).count -= 1 | ||
Comment on lines
-362
to
-377
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is interesting. This PR is not changing anything in terms of scheduling, ordering, etc. but this is still quite reliably failing. It seems as if Refcount is relying on explicit garbage collection. This is something I want to look into a little more since we're seeing a lot of GC warnings recently. However, for the sake of this PR I rewrote it to count keys in data instead of relying on GC. Eventually, I think both tests would make sense There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is really a weird case and somehow connected to how this object is defined in a local context. |
||
|
||
roots = [delayed(Refcount)() for _ in range(32)] | ||
roots = [delayed(inc)(ix) for ix in range(32)] | ||
passthrough1 = [delayed(slowidentity)(r, delay=0) for r in roots] | ||
passthrough2 = [delayed(slowidentity)(r, delay=0) for r in passthrough1] | ||
done = [delayed(lambda r: None)(r) for r in passthrough2] | ||
|
||
await c.register_worker_plugin( | ||
CountData(keys=[f.key for f in roots]), name="count-roots" | ||
) | ||
fs = c.compute(done) | ||
await wait(fs) | ||
# NOTE: the max should normally equal `total_nthreads`. But some macOS CI machines | ||
# are slow enough that they aren't able to reach the full parallelism of 8 threads. | ||
assert max(Refcount.log) <= s.total_nthreads | ||
|
||
res = await c.run(lambda dask_worker: dask_worker.plugins["count-roots"].count) | ||
assert all(0 < count <= 2 for count in res.values()) | ||
|
||
|
||
@gen_cluster(client=True, nthreads=[("", 1)]) | ||
|
@@ -953,24 +957,6 @@ def test_dumps_function(): | |
assert a != c | ||
|
||
|
||
def test_dumps_task(): | ||
d = dumps_task((inc, 1)) | ||
assert set(d) == {"function", "args"} | ||
|
||
def f(x, y=2): | ||
return x + y | ||
|
||
d = dumps_task((apply, f, (1,), {"y": 10})) | ||
assert cloudpickle.loads(d["function"])(1, 2) == 3 | ||
assert cloudpickle.loads(d["args"]) == (1,) | ||
assert cloudpickle.loads(d["kwargs"]) == {"y": 10} | ||
|
||
d = dumps_task((apply, f, (1,))) | ||
assert cloudpickle.loads(d["function"])(1, 2) == 3 | ||
assert cloudpickle.loads(d["args"]) == (1,) | ||
assert set(d) == {"function", "args"} | ||
|
||
|
||
@pytest.mark.parametrize("worker_saturation", [1.0, float("inf")]) | ||
@gen_cluster(client=True) | ||
async def test_ready_remove_worker(c, s, a, b, worker_saturation): | ||
|
@@ -1357,9 +1343,9 @@ async def test_update_graph_culls(s, a, b): | |
layers={ | ||
"foo": MaterializedLayer( | ||
{ | ||
"x": dumps_task((inc, 1)), | ||
"y": dumps_task((inc, "x")), | ||
"z": dumps_task((inc, 2)), | ||
"x": (inc, 1), | ||
"y": (inc, "x"), | ||
"z": (inc, 2), | ||
} | ||
) | ||
}, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @madsbk - It looks like we were using this function in dask-cuda (rapidsai/dask-cuda#1219)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we use it for its caching feature but I don't think it is needed.