diff --git a/zetta_utils/layer/volumetric/cloudvol/backend.py b/zetta_utils/layer/volumetric/cloudvol/backend.py index 5ae06bbe3..7f451fbc7 100644 --- a/zetta_utils/layer/volumetric/cloudvol/backend.py +++ b/zetta_utils/layer/volumetric/cloudvol/backend.py @@ -23,6 +23,7 @@ IN_MEM_CACHE_NUM_BYTES_PER_CV = 128 * 1024 ** 2 + # To avoid reloading info file - note that an empty provenance is passed # since otherwise the CloudVolume's __new__ will download the provenance # TODO: Use `assume_metadata` off of the cached info, using `get_info`. @@ -101,6 +102,7 @@ class CVBackend(VolumetricBackend): # pylint: disable=too-few-public-methods info_spec: PrecomputedInfoSpec | None = None info_overwrite: bool = False info_keep_existing_scales: bool = True + cache_bytes_limit: Optional[int] = None def __attrs_post_init__(self): if "mip" in self.cv_kwargs: @@ -147,13 +149,17 @@ def name(self, name: str) -> None: # pragma: no cover @property def dtype(self) -> np.dtype: - result = _get_cv_cached(self.path, **self.cv_kwargs) + result = _get_cv_cached( + self.path, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs + ) return np.dtype(result.data_type) @property def num_channels(self) -> int: # pragma: no cover - result = _get_cv_cached(self.path, **self.cv_kwargs) + result = _get_cv_cached( + self.path, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs + ) return result.num_channels @property @@ -197,14 +203,21 @@ def clear_disk_cache(self) -> None: # pragma: no cover info = get_info(self.path) for scale in info["scales"]: res = Vec3D[float](*scale["resolution"]) - _get_cv_cached(self.path, resolution=res, **self.cv_kwargs).cache.flush() + _get_cv_cached( + self.path, + resolution=res, + cache_bytes_limit=self.cache_bytes_limit, + **self.cv_kwargs, + ).cache.flush() def clear_cache(self) -> None: # pragma: no cover _clear_cv_cache(self.path) def read(self, idx: VolumetricIndex) -> npt.NDArray: # Data out: cxyz - cvol = _get_cv_cached(self.path, idx.resolution, **self.cv_kwargs) + cvol = _get_cv_cached( + self.path, idx.resolution, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs + ) data_raw = cvol[idx.to_slices()] result = np.transpose(data_raw, (3, 0, 1, 2)) @@ -226,7 +239,9 @@ def write(self, idx: VolumetricIndex, data: npt.NDArray): f"but got a tensor of with ndim == {data.ndim}" ) - cvol = _get_cv_cached(self.path, idx.resolution, **self.cv_kwargs) + cvol = _get_cv_cached( + self.path, idx.resolution, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs + ) slices = idx.to_slices() # Enable autocrop for writes only cvol.autocrop = True @@ -301,15 +316,30 @@ def with_changes(self, **kwargs) -> CVBackend: return result def get_voxel_offset(self, resolution: Vec3D) -> Vec3D[int]: - cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs) + cvol = _get_cv_cached( + self.path, + resolution=resolution, + cache_bytes_limit=self.cache_bytes_limit, + **self.cv_kwargs, + ) return Vec3D[int](*cvol.voxel_offset) def get_chunk_size(self, resolution: Vec3D) -> Vec3D[int]: - cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs) + cvol = _get_cv_cached( + self.path, + resolution=resolution, + cache_bytes_limit=self.cache_bytes_limit, + **self.cv_kwargs, + ) return Vec3D[int](*cvol.chunk_size) def get_dataset_size(self, resolution: Vec3D) -> Vec3D[int]: - cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs) + cvol = _get_cv_cached( + self.path, + resolution=resolution, + cache_bytes_limit=self.cache_bytes_limit, + **self.cv_kwargs, + ) return Vec3D[int](*cvol.volume_size) def get_bounds(self, resolution: Vec3D) -> VolumetricIndex: # pragma: no cover diff --git a/zetta_utils/layer/volumetric/cloudvol/build.py b/zetta_utils/layer/volumetric/cloudvol/build.py index 31086a58d..2bdb250ce 100644 --- a/zetta_utils/layer/volumetric/cloudvol/build.py +++ b/zetta_utils/layer/volumetric/cloudvol/build.py @@ -54,6 +54,7 @@ def build_cv_layer( # pylint: disable=too-many-locals JointIndexDataProcessor[npt.NDArray | torch.Tensor, VolumetricIndex], ] ] = (), + cache_bytes_limit: int | None = None, ) -> VolumetricLayer: # pragma: no cover # trivial conditional, delegation only """Build a CloudVolume layer. @@ -94,6 +95,7 @@ def build_cv_layer( # pylint: disable=too-many-locals returning it to the user. :param write_procs: List of processors that will be applied to the data given by the user before writing it to the backend. + :param cache_bytes_limit: Cache size limit in bytes. :return: Layer built according to the spec. """ if cv_kwargs is None: @@ -143,6 +145,7 @@ def build_cv_layer( # pylint: disable=too-many-locals info_overwrite=info_overwrite, info_keep_existing_scales=info_keep_existing_scales, info_spec=info_spec, + cache_bytes_limit=cache_bytes_limit, ) result = build_volumetric_layer( diff --git a/zetta_utils/layer/volumetric/cloudvol/deprecated/backend.py b/zetta_utils/layer/volumetric/cloudvol/deprecated/backend.py index a5bff7895..a3a5989e3 100644 --- a/zetta_utils/layer/volumetric/cloudvol/deprecated/backend.py +++ b/zetta_utils/layer/volumetric/cloudvol/deprecated/backend.py @@ -23,6 +23,7 @@ IN_MEM_CACHE_NUM_BYTES_PER_CV = 128 * 1024 ** 2 + # To avoid reloading info file - note that an empty provenance is passed # since otherwise the CloudVolume's __new__ will download the provenance # TODO: Use `assume_metadata` off of the cached info, using `get_info`. @@ -100,6 +101,7 @@ class CVBackend(VolumetricBackend): # pylint: disable=too-few-public-methods cv_kwargs: Dict[str, Any] = attrs.field(factory=dict) info_spec: Optional[PrecomputedInfoSpec] = None on_info_exists: InfoExistsModes = "expect_same" + cache_bytes_limit: Optional[int] = None def __attrs_post_init__(self): if "mip" in self.cv_kwargs: @@ -142,13 +144,17 @@ def name(self, name: str) -> None: # pragma: no cover @property def dtype(self) -> np.dtype: - result = _get_cv_cached(self.path, **self.cv_kwargs) + result = _get_cv_cached( + self.path, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs + ) return np.dtype(result.data_type) @property def num_channels(self) -> int: # pragma: no cover - result = _get_cv_cached(self.path, **self.cv_kwargs) + result = _get_cv_cached( + self.path, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs + ) return result.num_channels @property @@ -192,14 +198,21 @@ def clear_disk_cache(self) -> None: # pragma: no cover info = get_info(self.path) for scale in info["scales"]: res = Vec3D[float](*scale["resolution"]) - _get_cv_cached(self.path, resolution=res, **self.cv_kwargs).cache.flush() + _get_cv_cached( + self.path, + resolution=res, + cache_bytes_limit=self.cache_bytes_limit, + **self.cv_kwargs, + ).cache.flush() def clear_cache(self) -> None: # pragma: no cover _clear_cv_cache(self.path) def read(self, idx: VolumetricIndex) -> npt.NDArray: # Data out: cxyz - cvol = _get_cv_cached(self.path, idx.resolution, **self.cv_kwargs) + cvol = _get_cv_cached( + self.path, idx.resolution, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs + ) data_raw = cvol[idx.to_slices()] result = np.transpose(data_raw, (3, 0, 1, 2)) @@ -221,7 +234,9 @@ def write(self, idx: VolumetricIndex, data: npt.NDArray): f"but got a tensor of with ndim == {data.ndim}" ) - cvol = _get_cv_cached(self.path, idx.resolution, **self.cv_kwargs) + cvol = _get_cv_cached( + self.path, idx.resolution, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs + ) slices = idx.to_slices() # Enable autocrop for writes only cvol.autocrop = True @@ -294,15 +309,30 @@ def with_changes(self, **kwargs) -> CVBackend: return result def get_voxel_offset(self, resolution: Vec3D) -> Vec3D[int]: - cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs) + cvol = _get_cv_cached( + self.path, + resolution=resolution, + cache_bytes_limit=self.cache_bytes_limit, + **self.cv_kwargs, + ) return Vec3D[int](*cvol.voxel_offset) def get_chunk_size(self, resolution: Vec3D) -> Vec3D[int]: - cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs) + cvol = _get_cv_cached( + self.path, + resolution=resolution, + cache_bytes_limit=self.cache_bytes_limit, + **self.cv_kwargs, + ) return Vec3D[int](*cvol.chunk_size) def get_dataset_size(self, resolution: Vec3D) -> Vec3D[int]: - cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs) + cvol = _get_cv_cached( + self.path, + resolution=resolution, + cache_bytes_limit=self.cache_bytes_limit, + **self.cv_kwargs, + ) return Vec3D[int](*cvol.volume_size) def get_bounds(self, resolution: Vec3D) -> VolumetricIndex: # pragma: no cover diff --git a/zetta_utils/layer/volumetric/cloudvol/deprecated/build.py b/zetta_utils/layer/volumetric/cloudvol/deprecated/build.py index 79553d54a..6babbead5 100644 --- a/zetta_utils/layer/volumetric/cloudvol/deprecated/build.py +++ b/zetta_utils/layer/volumetric/cloudvol/deprecated/build.py @@ -61,6 +61,7 @@ def build_cv_layer( # pylint: disable=too-many-locals JointIndexDataProcessor[npt.NDArray | torch.Tensor, VolumetricIndex], ] ] = (), + cache_bytes_limit: int | None = None, ) -> VolumetricLayer: # pragma: no cover # trivial conditional, delegation only """Build a CloudVolume layer. @@ -115,6 +116,7 @@ def build_cv_layer( # pylint: disable=too-many-locals returning it to the user. :param write_procs: List of processors that will be applied to the data given by the user before writing it to the backend. + :param cache_bytes_limit: Cache size limit in bytes. :return: Layer built according to the spec. """ @@ -145,6 +147,7 @@ def build_cv_layer( # pylint: disable=too-many-locals add_scales_exclude_fields=info_add_scales_exclude_fields, only_retain_scales=info_only_retain_scales, ), + cache_bytes_limit=cache_bytes_limit, ) result = build_volumetric_layer(