From af11fe25d77623564c49e05131c4aa53ea5fc979 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 4 Mar 2024 18:37:26 -0500 Subject: [PATCH 1/2] fixture to benchmark amount of intermediate data stored --- tests/conftest.py | 42 ++++++++++++++++++++++++++++++++++++++++-- tests/utils.py | 26 +++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 3 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 55cd2f7..eac220b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,7 +18,7 @@ import cubed_xarray import lithops -from .utils import spec_from_config_file +from .utils import spec_from_config_file, get_directory_size from benchmark_schema import TestRun @@ -244,10 +244,46 @@ def _benchmark_memory(history): yield _benchmark_memory +@pytest.fixture(scope="function") +def benchmark_storage(test_run_benchmark): + """Benchmark the total amount of intermediate data written to storage. + + Yields + ------ + Context manager factory function which takes an instantiated cubed HistoryCallback object + as input. The context manager records number and cumulative duration of + tasks run while executing the ``with`` statement if run as part of a benchmark, + or does nothing otherwise. + + Example + ------- + .. code-block:: python + + def test_something(benchmark_storage): + history = cubed.extensions.history.HistoryCallback() + with benchmark_tasks(history): + cubed.compute(*arrs, callbacks=[history]) + """ + + @contextlib.contextmanager + def _benchmark_storage(work_dir): + if not test_run_benchmark: + yield + else: + yield + + stored_data_size = get_directory_size(work_dir) + + test_run_benchmark.intermediate_data_stored = stored_data_size + + yield _benchmark_storage + + @pytest.fixture(scope="function") def benchmark_all( benchmark_memory, benchmark_time, + benchmark_storage, ): """Benchmark all available metrics and extracts cluster information @@ -271,13 +307,15 @@ def test_something(benchmark_all): -------- benchmark_memory benchmark_time + benchmark_storage """ @contextlib.contextmanager - def _benchmark_all(history): + def _benchmark_all(history, work_dir): with ( benchmark_memory(history), benchmark_time, + benchmark_storage(work_dir) ): yield diff --git a/tests/utils.py b/tests/utils.py index 6192d95..0de8864 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -21,9 +21,33 @@ def spec_from_config_file(filepath: str) -> cubed.Spec: return spec_from_config(config) +def get_directory_size(work_dir: str) -> float: + """Get the size of any data written to the given directory (local or remote).""" + + import fsspec + + total_size = 0 + + if work_dir.startswith('s3://'): + fs = fsspec.filesystem('s3') + else: + fs = fsspec.filesystem('file') + + # List all files and subdirectories in the directory + contents = fs.glob(f'{work_dir}/**', detail=True) + + for item in contents: + if item['type'] == 'file': + # If it's a file, add its size to the total + total_size += item['size'] + + return total_size + + def run( result, executor, + spec, benchmarks, **kwargs ): @@ -34,7 +58,7 @@ def run( callbacks.append(history) kwargs['callbacks'] = callbacks - with benchmarks(history): + with benchmarks(history, spec.work_dir): computed_result = result.compute( executor=executor, From 4630591df24e4f2d0620e28f42363cf48a4c4239 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 4 Mar 2024 18:51:19 -0500 Subject: [PATCH 2/2] pass test to executor so it can know the work_dir --- tests/benchmarks/test_array.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/benchmarks/test_array.py b/tests/benchmarks/test_array.py index 8f62e88..0802b5f 100644 --- a/tests/benchmarks/test_array.py +++ b/tests/benchmarks/test_array.py @@ -70,6 +70,7 @@ def test_quadratic_means_xarray(tmp_path, runtime, benchmark_all, t_length): computed_result = run( result, executor=spec.executor, + spec=spec, benchmarks=benchmark_all, optimize_function=opt_fn, compute_arrays_in_parallel=True,