Skip to content

Commit

Permalink
TensorStoreVolumeService: Add option for out-of-bounds-access
Browse files Browse the repository at this point in the history
  • Loading branch information
stuarteberg committed Jul 7, 2022
1 parent 232c279 commit fd83e21
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 11 deletions.
91 changes: 81 additions & 10 deletions flyemflows/volumes/tensorstore_volume_service.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import copy
import pickle
import logging
import numpy as np

from confiddler import validate, emit_defaults
from dvid_resource_manager.client import ResourceManagerClient
from neuclease.util import box_to_slicing
from neuclease.util import box_to_slicing, box_intersection

from ..util import auto_retry, replace_default_entries
from . import VolumeServiceWriter, GeometrySchema, SegmentationAdapters

logger = logging.getLogger(__name__)

TensorStoreSpecSchema = \
{
"description": "TensorStore Spec",
Expand Down Expand Up @@ -136,6 +139,15 @@
"properties": {
"spec": TensorStoreSpecSchema,
"context": TensorStoreContextSchema,
"out-of-bounds-access": {
"description": "If 'forbid', any out-of-bounds read/write is an error.\n"
"If 'permit', out-of-bounds reads are permitted, and filled with zeros, and out-of-bounds writes are merely ignored.\n"
"If 'permit-empty', out-of-bounds writes are permitted (and ignored) only if the out-of-bounds voxels are zero.\n"
"Otherwise, they're an error.",
"type": "string",
"enum": ["forbid", "permit", "permit-empty"],
"default": "permit-empty"
},
"reinitialize-via": {
"type": "string",
"enum": ["unpickle", "reopen"],
Expand Down Expand Up @@ -314,6 +326,7 @@ def __init__(self, volume_config, resource_manager_client=None):
self._preferred_message_shape_zyx = preferred_message_shape_zyx
self._available_scales = available_scales
self._reinitialize_via = volume_config["tensorstore"]["reinitialize-via"]
self._out_of_bounds_access = volume_config["tensorstore"]["out-of-bounds-access"]

# Overwrite config entries that we might have modified
volume_config["geometry"]["block-width"] = self._block_width
Expand Down Expand Up @@ -418,6 +431,35 @@ def resource_manager_client(self):
@auto_retry(3, pause_between_tries=30.0, logging_name=__name__,
predicate=lambda ex: '503' not in str(ex.args[0]) and '504' not in str(ex.args[0]))
def get_subvolume(self, box_zyx, scale=0):
box_zyx = np.asarray(box_zyx)
full_shape_xyzc = self.store(scale).shape
full_shape_zyx = full_shape_xyzc[-2::-1]
clipped_box = box_intersection(box_zyx, [(0,0,0), full_shape_zyx])
if (clipped_box == box_zyx).all():
return self._get_subvolume(box_zyx, scale)

# Note that this message shows the true zarr storage bounds,
# and doesn't show the logical bounds according to global_offset (if any).
msg = (f"TensorStore Request is out-of-bounds (XYZ): {box_zyx[:, ::-1].tolist()}"
" relative to volume extents (XYZC): {full_shape_xyzc.tolist()}")
if self._out_of_bounds_access in ("permit", "permit-empty"):
logger.warning(msg)
else:
msg += "\nAdd 'out-of-bounds-access' to your config to allow such requests"
raise RuntimeError(msg)

if (clipped_box[1] - clipped_box[0] <= 0).any():
# request is completely out-of-bounds; just return zeros
return np.zeros(box_zyx[1] - box_zyx[0], self.dtype)

# Request is partially out-of-bounds; read what we can, zero-fill for the rest.
clipped_vol = self._get_subvolume(clipped_box, scale)
result = np.zeros(box_zyx[1] - box_zyx[0], self.dtype)
localbox = clipped_box - box_zyx[0]
result[box_to_slicing(*localbox)] = clipped_vol
return result

def _get_subvolume(self, box_zyx, scale):
box_zyx = np.asarray(box_zyx)
req_bytes = 8 * np.prod(box_zyx[1] - box_zyx[0])
try:
Expand Down Expand Up @@ -456,18 +498,47 @@ def write_subvolume(self, subvolume, offset_zyx, scale=0):
...: dset.chunk_layout.write_chunk.shape
Out[1]: (2048, 2048, 2048, 1)
"""
# TODO: We may need to add extra logic here to handle (or ignore) out-of-bounds writes,
# similar to what is implemented in ZarrVolumeService.write_subvolume()
store = self.store(scale)

subvolume_czyx = subvolume[None, ...]
offset_zyx = np.array(offset_zyx)
box_zyx = np.array([offset_zyx, offset_zyx+subvolume.shape])

full_shape_xyzc = self.store(scale).shape
full_shape_zyx = full_shape_xyzc[-2::-1]
clipped_box = box_intersection(box_zyx, [(0,0,0), full_shape_zyx])
if (clipped_box == box_zyx).all():
# Box is fully contained within the Zarr volume bounding box.
self._write_subvolume(subvolume, box_zyx[0], scale)
return

msg = (f"Box (XYZ): {box_zyx[:, ::-1].tolist()}"
f" exceeds full scale-{scale} extents (XYZC) {full_shape_xyzc.tolist()}")

if self._out_of_bounds_access == 'forbid':
msg = "Cannot write subvolume. " + msg
msg += "\nAdd permit-out-of-bounds to your config to allow such writes,"
msg += " assuming the out-of-bounds portion is completely empty."
raise RuntimeError(msg)

# If any of the out-of-bounds portion is non-empty, that's an error.
subvol_copy = subvolume.copy()
subvol_copy[box_to_slicing(*(clipped_box - box_zyx[0]))] = 0
if self._out_of_bounds_access == 'permit-empty' and subvol_copy.any():
msg = ("Cannot write subvolume. Box extends beyond volume storage bounds (XYZ): "
f"{box_zyx[:, ::-1].tolist()} exceeds {full_shape_zyx.tolist()}\n"
"and the out-of-bounds portion is not empty (contains non-zero values).\n")
raise RuntimeError(msg)

logger.warning(msg)
clipped_subvolume = subvolume[box_to_slicing(*clipped_box - box_zyx[0])]
self._write_subvolume(clipped_subvolume, clipped_box[0], scale)

def _write_subvolume(self, subvolume, offset_zyx, scale):
offset_czyx = np.array((0, *offset_zyx))
shape_czyx = np.array((1, *subvolume.shape))
box_czyx = np.array([offset_czyx, offset_czyx + shape_czyx])

# Tensorstore and neuroglancer_precomputed use X,Y,Z conventions,
# so it's best to send a Fortran array.
vol_xyzc = subvolume_czyx.transpose()
# With neuroglancer_precomputed, tensorstore uses X,Y,Z conventions,
# regardless of the actual memory order. So it's best to send a Fortran array.
vol_xyzc = subvolume[None, ...].transpose()
box_xyzc = box_czyx[:, ::-1]
fut = store[box_to_slicing(*box_xyzc)].write(vol_xyzc)
fut = self.store(scale)[box_to_slicing(*box_xyzc)].write(vol_xyzc)
fut.result()
3 changes: 2 additions & 1 deletion tests/volumes/test_tensorstore_volume_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def check_vol(box_zyx, scale):
reason="Skipping TensorStore test: tensorstore isn't installed")
def test_write_seg(disable_auto_retry):
tmpdir = tempfile.mkdtemp()
print(tmpdir)
#print(tmpdir)

# Modeled on examples from this page:
# https://google.github.io/tensorstore/driver/neuroglancer_precomputed/index.html#id12
Expand Down Expand Up @@ -136,6 +136,7 @@ def test_write_seg(disable_auto_retry):
"data_copy_concurrency": {"limit": 8},
'file_io_concurrency': {'limit': 1}
},
"out-of-bounds-access": "permit-empty",
},
"geometry": {}
}
Expand Down

0 comments on commit fd83e21

Please sign in to comment.