Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmark amount of intermediate data stored #10

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/benchmarks/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def test_quadratic_means_xarray(tmp_path, runtime, benchmark_all, t_length):
computed_result = run(
result,
executor=spec.executor,
spec=spec,
Comment on lines 72 to +73
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems redundant

benchmarks=benchmark_all,
optimize_function=opt_fn,
compute_arrays_in_parallel=True,
Expand Down
42 changes: 40 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import cubed_xarray
import lithops

from .utils import spec_from_config_file
from .utils import spec_from_config_file, get_directory_size

from benchmark_schema import TestRun

Expand Down Expand Up @@ -244,10 +244,46 @@ def _benchmark_memory(history):
yield _benchmark_memory


@pytest.fixture(scope="function")
def benchmark_storage(test_run_benchmark):
"""Benchmark the total amount of intermediate data written to storage.

Yields
------
Context manager factory function which takes an instantiated cubed HistoryCallback object
as input. The context manager records number and cumulative duration of
tasks run while executing the ``with`` statement if run as part of a benchmark,
or does nothing otherwise.

Example
-------
.. code-block:: python

def test_something(benchmark_storage):
history = cubed.extensions.history.HistoryCallback()
with benchmark_tasks(history):
cubed.compute(*arrs, callbacks=[history])
"""

@contextlib.contextmanager
def _benchmark_storage(work_dir):
if not test_run_benchmark:
yield
else:
yield

stored_data_size = get_directory_size(work_dir)

test_run_benchmark.intermediate_data_stored = stored_data_size

yield _benchmark_storage


@pytest.fixture(scope="function")
def benchmark_all(
benchmark_memory,
benchmark_time,
benchmark_storage,
):
"""Benchmark all available metrics and extracts cluster information

Expand All @@ -271,13 +307,15 @@ def test_something(benchmark_all):
--------
benchmark_memory
benchmark_time
benchmark_storage
"""

@contextlib.contextmanager
def _benchmark_all(history):
def _benchmark_all(history, work_dir):
with (
benchmark_memory(history),
benchmark_time,
benchmark_storage(work_dir)
):
yield

Expand Down
26 changes: 25 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,33 @@ def spec_from_config_file(filepath: str) -> cubed.Spec:
return spec_from_config(config)


def get_directory_size(work_dir: str) -> float:
"""Get the size of any data written to the given directory (local or remote)."""

import fsspec

total_size = 0

if work_dir.startswith('s3://'):
fs = fsspec.filesystem('s3')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simpler way to do this (that I only discovered yesterday!) is

fs, _, _ = fsspec.get_fs_token_paths(work_dir)

else:
fs = fsspec.filesystem('file')

# List all files and subdirectories in the directory
contents = fs.glob(f'{work_dir}/**', detail=True)
Comment on lines +36 to +37
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently failing to discover any of the contents


for item in contents:
if item['type'] == 'file':
# If it's a file, add its size to the total
total_size += item['size']

return total_size


def run(
result,
executor,
spec,
benchmarks,
**kwargs
):
Expand All @@ -34,7 +58,7 @@ def run(
callbacks.append(history)
kwargs['callbacks'] = callbacks

with benchmarks(history):
with benchmarks(history, spec.work_dir):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec.work_dir isn't the right thing to pass - it's not specific enough. I want the directory containing only the intermediate results written during the last cubed run.

I can see that each such directly is uniquely labeled which is nice, but I'm not sure exactly how to get this label from the computation. Should I import cubed.core.plan.CONTEXT_ID or is there a simpler way?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event object passed to on_compute_start (and on_compute_end) in the callback contains the context_id (see https://github.com/cubed-dev/cubed/blob/fda2f1e7bedec5389cae00ee751a54076ccf28e3/cubed/runtime/types.py#L28-L49). The HistoryCallback doesn't capture this - perhaps it could be changed, or maybe it's simpler to write a custom callback?

Copy link
Member Author

@TomNicholas TomNicholas Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cubed.core.plan there is a CONTEXT_ID and a compute_id. The former seems to be used to make temporary directories, whilst the latter is what's stored in the event object. What's the difference between them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CONTEXT_ID is the directory name created for the duration of the client Python process. The compute_id is a recent addition used for creating directories to store history data in that is unique to the call to compute.

So you're right - you want CONTEXT_ID, but the intermediate data for all computations run in the pytest session will be there, so it's hard to pick out the arrays for a particular computation.

We should probably make Cubed store its intermediate data in a directory named {CONTEXT_ID}/{compute_id}, but that's a bit more work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation!

We should probably make Cubed store its intermediate data in a directory named {CONTEXT_ID}/{compute_id}, but that's a bit more work.

I would like to do that - I can make a PR to cubed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Continuing discussion from cubed-dev/cubed#413)

I think the easier way to solve the original problem in #10 would be to just get the intermediate array paths from the DAG

So then the benchmark context managers need to know about the plan object right? Or can we add it to what's saved in history.plan?

I tried looking at the plan object but if this information is stored in there I can't seem to find it. I thought looking at target would give me what I need but for the smallest quad means example I just get None for each node, when clearly something was written.

I'm also wondering if there is any subtlety here with the optimized vs unoptimized plan?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan only has the op names now. Optimized vs unoptimized shouldn't make a difference. Try something like (untested)

array_names = [name for name in dag if name.startswith("array-")]

Where dag comes from the callback object.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see a way to avoid passing extra information to the benchmarks context manager - the history does not contain this information.


computed_result = result.compute(
executor=executor,
Expand Down