Skip to content

Commit

Permalink
feat(cv): expose cache_bytes_limit
Browse files Browse the repository at this point in the history
  • Loading branch information
trivoldus28 committed Feb 14, 2025
1 parent 163b4fb commit fa1dd08
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 16 deletions.
46 changes: 38 additions & 8 deletions zetta_utils/layer/volumetric/cloudvol/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

IN_MEM_CACHE_NUM_BYTES_PER_CV = 128 * 1024 ** 2


# To avoid reloading info file - note that an empty provenance is passed
# since otherwise the CloudVolume's __new__ will download the provenance
# TODO: Use `assume_metadata` off of the cached info, using `get_info`.
Expand Down Expand Up @@ -101,6 +102,7 @@ class CVBackend(VolumetricBackend): # pylint: disable=too-few-public-methods
info_spec: PrecomputedInfoSpec | None = None
info_overwrite: bool = False
info_keep_existing_scales: bool = True
cache_bytes_limit: Optional[int] = None

def __attrs_post_init__(self):
if "mip" in self.cv_kwargs:
Expand Down Expand Up @@ -147,13 +149,17 @@ def name(self, name: str) -> None: # pragma: no cover

@property
def dtype(self) -> np.dtype:
result = _get_cv_cached(self.path, **self.cv_kwargs)
result = _get_cv_cached(
self.path, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)

return np.dtype(result.data_type)

@property
def num_channels(self) -> int: # pragma: no cover
result = _get_cv_cached(self.path, **self.cv_kwargs)
result = _get_cv_cached(
self.path, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)
return result.num_channels

@property
Expand Down Expand Up @@ -197,14 +203,21 @@ def clear_disk_cache(self) -> None: # pragma: no cover
info = get_info(self.path)
for scale in info["scales"]:
res = Vec3D[float](*scale["resolution"])
_get_cv_cached(self.path, resolution=res, **self.cv_kwargs).cache.flush()
_get_cv_cached(
self.path,
resolution=res,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
).cache.flush()

def clear_cache(self) -> None: # pragma: no cover
_clear_cv_cache(self.path)

def read(self, idx: VolumetricIndex) -> npt.NDArray:
# Data out: cxyz
cvol = _get_cv_cached(self.path, idx.resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path, idx.resolution, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)
data_raw = cvol[idx.to_slices()]

result = np.transpose(data_raw, (3, 0, 1, 2))
Expand All @@ -226,7 +239,9 @@ def write(self, idx: VolumetricIndex, data: npt.NDArray):
f"but got a tensor of with ndim == {data.ndim}"
)

cvol = _get_cv_cached(self.path, idx.resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path, idx.resolution, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)
slices = idx.to_slices()
# Enable autocrop for writes only
cvol.autocrop = True
Expand Down Expand Up @@ -301,15 +316,30 @@ def with_changes(self, **kwargs) -> CVBackend:
return result

def get_voxel_offset(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path,
resolution=resolution,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
)
return Vec3D[int](*cvol.voxel_offset)

def get_chunk_size(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path,
resolution=resolution,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
)
return Vec3D[int](*cvol.chunk_size)

def get_dataset_size(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path,
resolution=resolution,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
)
return Vec3D[int](*cvol.volume_size)

def get_bounds(self, resolution: Vec3D) -> VolumetricIndex: # pragma: no cover
Expand Down
3 changes: 3 additions & 0 deletions zetta_utils/layer/volumetric/cloudvol/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def build_cv_layer( # pylint: disable=too-many-locals
JointIndexDataProcessor[npt.NDArray | torch.Tensor, VolumetricIndex],
]
] = (),
cache_bytes_limit: int | None = None,
) -> VolumetricLayer: # pragma: no cover # trivial conditional, delegation only
"""Build a CloudVolume layer.
Expand Down Expand Up @@ -94,6 +95,7 @@ def build_cv_layer( # pylint: disable=too-many-locals
returning it to the user.
:param write_procs: List of processors that will be applied to the data given by
the user before writing it to the backend.
:param cache_bytes_limit: Cache size limit in bytes.
:return: Layer built according to the spec.
"""
if cv_kwargs is None:
Expand Down Expand Up @@ -143,6 +145,7 @@ def build_cv_layer( # pylint: disable=too-many-locals
info_overwrite=info_overwrite,
info_keep_existing_scales=info_keep_existing_scales,
info_spec=info_spec,
cache_bytes_limit=cache_bytes_limit,
)

result = build_volumetric_layer(
Expand Down
46 changes: 38 additions & 8 deletions zetta_utils/layer/volumetric/cloudvol/deprecated/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

IN_MEM_CACHE_NUM_BYTES_PER_CV = 128 * 1024 ** 2


# To avoid reloading info file - note that an empty provenance is passed
# since otherwise the CloudVolume's __new__ will download the provenance
# TODO: Use `assume_metadata` off of the cached info, using `get_info`.
Expand Down Expand Up @@ -100,6 +101,7 @@ class CVBackend(VolumetricBackend): # pylint: disable=too-few-public-methods
cv_kwargs: Dict[str, Any] = attrs.field(factory=dict)
info_spec: Optional[PrecomputedInfoSpec] = None
on_info_exists: InfoExistsModes = "expect_same"
cache_bytes_limit: Optional[int] = None

def __attrs_post_init__(self):
if "mip" in self.cv_kwargs:
Expand Down Expand Up @@ -142,13 +144,17 @@ def name(self, name: str) -> None: # pragma: no cover

@property
def dtype(self) -> np.dtype:
result = _get_cv_cached(self.path, **self.cv_kwargs)
result = _get_cv_cached(
self.path, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)

return np.dtype(result.data_type)

@property
def num_channels(self) -> int: # pragma: no cover
result = _get_cv_cached(self.path, **self.cv_kwargs)
result = _get_cv_cached(
self.path, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)
return result.num_channels

@property
Expand Down Expand Up @@ -192,14 +198,21 @@ def clear_disk_cache(self) -> None: # pragma: no cover
info = get_info(self.path)
for scale in info["scales"]:
res = Vec3D[float](*scale["resolution"])
_get_cv_cached(self.path, resolution=res, **self.cv_kwargs).cache.flush()
_get_cv_cached(
self.path,
resolution=res,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
).cache.flush()

def clear_cache(self) -> None: # pragma: no cover
_clear_cv_cache(self.path)

def read(self, idx: VolumetricIndex) -> npt.NDArray:
# Data out: cxyz
cvol = _get_cv_cached(self.path, idx.resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path, idx.resolution, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)
data_raw = cvol[idx.to_slices()]

result = np.transpose(data_raw, (3, 0, 1, 2))
Expand All @@ -221,7 +234,9 @@ def write(self, idx: VolumetricIndex, data: npt.NDArray):
f"but got a tensor of with ndim == {data.ndim}"
)

cvol = _get_cv_cached(self.path, idx.resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path, idx.resolution, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)
slices = idx.to_slices()
# Enable autocrop for writes only
cvol.autocrop = True
Expand Down Expand Up @@ -294,15 +309,30 @@ def with_changes(self, **kwargs) -> CVBackend:
return result

def get_voxel_offset(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path,
resolution=resolution,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
)
return Vec3D[int](*cvol.voxel_offset)

def get_chunk_size(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path,
resolution=resolution,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
)
return Vec3D[int](*cvol.chunk_size)

def get_dataset_size(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path,
resolution=resolution,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
)
return Vec3D[int](*cvol.volume_size)

def get_bounds(self, resolution: Vec3D) -> VolumetricIndex: # pragma: no cover
Expand Down
3 changes: 3 additions & 0 deletions zetta_utils/layer/volumetric/cloudvol/deprecated/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def build_cv_layer( # pylint: disable=too-many-locals
JointIndexDataProcessor[npt.NDArray | torch.Tensor, VolumetricIndex],
]
] = (),
cache_bytes_limit: int | None = None,
) -> VolumetricLayer: # pragma: no cover # trivial conditional, delegation only
"""Build a CloudVolume layer.
Expand Down Expand Up @@ -115,6 +116,7 @@ def build_cv_layer( # pylint: disable=too-many-locals
returning it to the user.
:param write_procs: List of processors that will be applied to the data given by
the user before writing it to the backend.
:param cache_bytes_limit: Cache size limit in bytes.
:return: Layer built according to the spec.
"""
Expand Down Expand Up @@ -145,6 +147,7 @@ def build_cv_layer( # pylint: disable=too-many-locals
add_scales_exclude_fields=info_add_scales_exclude_fields,
only_retain_scales=info_only_retain_scales,
),
cache_bytes_limit=cache_bytes_limit,
)

result = build_volumetric_layer(
Expand Down

0 comments on commit fa1dd08

Please sign in to comment.