Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cv): expose cache_bytes_limit #905

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 38 additions & 8 deletions zetta_utils/layer/volumetric/cloudvol/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

IN_MEM_CACHE_NUM_BYTES_PER_CV = 128 * 1024 ** 2


# To avoid reloading info file - note that an empty provenance is passed
# since otherwise the CloudVolume's __new__ will download the provenance
# TODO: Use `assume_metadata` off of the cached info, using `get_info`.
Expand Down Expand Up @@ -101,6 +102,7 @@ class CVBackend(VolumetricBackend): # pylint: disable=too-few-public-methods
info_spec: PrecomputedInfoSpec | None = None
info_overwrite: bool = False
info_keep_existing_scales: bool = True
cache_bytes_limit: Optional[int] = None

def __attrs_post_init__(self):
if "mip" in self.cv_kwargs:
Expand Down Expand Up @@ -147,13 +149,17 @@ def name(self, name: str) -> None: # pragma: no cover

@property
def dtype(self) -> np.dtype:
result = _get_cv_cached(self.path, **self.cv_kwargs)
result = _get_cv_cached(
self.path, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)

return np.dtype(result.data_type)

@property
def num_channels(self) -> int: # pragma: no cover
result = _get_cv_cached(self.path, **self.cv_kwargs)
result = _get_cv_cached(
self.path, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)
return result.num_channels

@property
Expand Down Expand Up @@ -197,14 +203,21 @@ def clear_disk_cache(self) -> None: # pragma: no cover
info = get_info(self.path)
for scale in info["scales"]:
res = Vec3D[float](*scale["resolution"])
_get_cv_cached(self.path, resolution=res, **self.cv_kwargs).cache.flush()
_get_cv_cached(
self.path,
resolution=res,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
).cache.flush()

def clear_cache(self) -> None: # pragma: no cover
_clear_cv_cache(self.path)

def read(self, idx: VolumetricIndex) -> npt.NDArray:
# Data out: cxyz
cvol = _get_cv_cached(self.path, idx.resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path, idx.resolution, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)
data_raw = cvol[idx.to_slices()]

result = np.transpose(data_raw, (3, 0, 1, 2))
Expand All @@ -226,7 +239,9 @@ def write(self, idx: VolumetricIndex, data: npt.NDArray):
f"but got a tensor of with ndim == {data.ndim}"
)

cvol = _get_cv_cached(self.path, idx.resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path, idx.resolution, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)
slices = idx.to_slices()
# Enable autocrop for writes only
cvol.autocrop = True
Expand Down Expand Up @@ -301,15 +316,30 @@ def with_changes(self, **kwargs) -> CVBackend:
return result

def get_voxel_offset(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path,
resolution=resolution,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
)
return Vec3D[int](*cvol.voxel_offset)

def get_chunk_size(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path,
resolution=resolution,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
)
return Vec3D[int](*cvol.chunk_size)

def get_dataset_size(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path,
resolution=resolution,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
)
return Vec3D[int](*cvol.volume_size)

def get_bounds(self, resolution: Vec3D) -> VolumetricIndex: # pragma: no cover
Expand Down
3 changes: 3 additions & 0 deletions zetta_utils/layer/volumetric/cloudvol/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def build_cv_layer( # pylint: disable=too-many-locals
JointIndexDataProcessor[npt.NDArray | torch.Tensor, VolumetricIndex],
]
] = (),
cache_bytes_limit: int | None = None,
) -> VolumetricLayer: # pragma: no cover # trivial conditional, delegation only
"""Build a CloudVolume layer.

Expand Down Expand Up @@ -94,6 +95,7 @@ def build_cv_layer( # pylint: disable=too-many-locals
returning it to the user.
:param write_procs: List of processors that will be applied to the data given by
the user before writing it to the backend.
:param cache_bytes_limit: Cache size limit in bytes.
:return: Layer built according to the spec.
"""
if cv_kwargs is None:
Expand Down Expand Up @@ -143,6 +145,7 @@ def build_cv_layer( # pylint: disable=too-many-locals
info_overwrite=info_overwrite,
info_keep_existing_scales=info_keep_existing_scales,
info_spec=info_spec,
cache_bytes_limit=cache_bytes_limit,
)

result = build_volumetric_layer(
Expand Down
46 changes: 38 additions & 8 deletions zetta_utils/layer/volumetric/cloudvol/deprecated/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

IN_MEM_CACHE_NUM_BYTES_PER_CV = 128 * 1024 ** 2


# To avoid reloading info file - note that an empty provenance is passed
# since otherwise the CloudVolume's __new__ will download the provenance
# TODO: Use `assume_metadata` off of the cached info, using `get_info`.
Expand Down Expand Up @@ -100,6 +101,7 @@ class CVBackend(VolumetricBackend): # pylint: disable=too-few-public-methods
cv_kwargs: Dict[str, Any] = attrs.field(factory=dict)
info_spec: Optional[PrecomputedInfoSpec] = None
on_info_exists: InfoExistsModes = "expect_same"
cache_bytes_limit: Optional[int] = None

def __attrs_post_init__(self):
if "mip" in self.cv_kwargs:
Expand Down Expand Up @@ -142,13 +144,17 @@ def name(self, name: str) -> None: # pragma: no cover

@property
def dtype(self) -> np.dtype:
result = _get_cv_cached(self.path, **self.cv_kwargs)
result = _get_cv_cached(
self.path, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)

return np.dtype(result.data_type)

@property
def num_channels(self) -> int: # pragma: no cover
result = _get_cv_cached(self.path, **self.cv_kwargs)
result = _get_cv_cached(
self.path, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)
return result.num_channels

@property
Expand Down Expand Up @@ -192,14 +198,21 @@ def clear_disk_cache(self) -> None: # pragma: no cover
info = get_info(self.path)
for scale in info["scales"]:
res = Vec3D[float](*scale["resolution"])
_get_cv_cached(self.path, resolution=res, **self.cv_kwargs).cache.flush()
_get_cv_cached(
self.path,
resolution=res,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
).cache.flush()

def clear_cache(self) -> None: # pragma: no cover
_clear_cv_cache(self.path)

def read(self, idx: VolumetricIndex) -> npt.NDArray:
# Data out: cxyz
cvol = _get_cv_cached(self.path, idx.resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path, idx.resolution, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)
data_raw = cvol[idx.to_slices()]

result = np.transpose(data_raw, (3, 0, 1, 2))
Expand All @@ -221,7 +234,9 @@ def write(self, idx: VolumetricIndex, data: npt.NDArray):
f"but got a tensor of with ndim == {data.ndim}"
)

cvol = _get_cv_cached(self.path, idx.resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path, idx.resolution, cache_bytes_limit=self.cache_bytes_limit, **self.cv_kwargs
)
slices = idx.to_slices()
# Enable autocrop for writes only
cvol.autocrop = True
Expand Down Expand Up @@ -294,15 +309,30 @@ def with_changes(self, **kwargs) -> CVBackend:
return result

def get_voxel_offset(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path,
resolution=resolution,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
)
return Vec3D[int](*cvol.voxel_offset)

def get_chunk_size(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path,
resolution=resolution,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
)
return Vec3D[int](*cvol.chunk_size)

def get_dataset_size(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
cvol = _get_cv_cached(
self.path,
resolution=resolution,
cache_bytes_limit=self.cache_bytes_limit,
**self.cv_kwargs,
)
return Vec3D[int](*cvol.volume_size)

def get_bounds(self, resolution: Vec3D) -> VolumetricIndex: # pragma: no cover
Expand Down
3 changes: 3 additions & 0 deletions zetta_utils/layer/volumetric/cloudvol/deprecated/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def build_cv_layer( # pylint: disable=too-many-locals
JointIndexDataProcessor[npt.NDArray | torch.Tensor, VolumetricIndex],
]
] = (),
cache_bytes_limit: int | None = None,
) -> VolumetricLayer: # pragma: no cover # trivial conditional, delegation only
"""Build a CloudVolume layer.

Expand Down Expand Up @@ -115,6 +116,7 @@ def build_cv_layer( # pylint: disable=too-many-locals
returning it to the user.
:param write_procs: List of processors that will be applied to the data given by
the user before writing it to the backend.
:param cache_bytes_limit: Cache size limit in bytes.
:return: Layer built according to the spec.

"""
Expand Down Expand Up @@ -145,6 +147,7 @@ def build_cv_layer( # pylint: disable=too-many-locals
add_scales_exclude_fields=info_add_scales_exclude_fields,
only_retain_scales=info_only_retain_scales,
),
cache_bytes_limit=cache_bytes_limit,
)

result = build_volumetric_layer(
Expand Down
Loading