From b5e7a3be78c8cea0e69b07e56e26c68af0e381ac Mon Sep 17 00:00:00 2001 From: Stuart Berg Date: Wed, 18 Sep 2024 15:34:13 -0400 Subject: [PATCH] GridMeshes: Now skip-existing is permitted with directory-of-tarfiles --- flyemflows/workflow/gridmeshes.py | 65 ++++++++++++++++++++++--------- 1 file changed, 47 insertions(+), 18 deletions(-) diff --git a/flyemflows/workflow/gridmeshes.py b/flyemflows/workflow/gridmeshes.py index b9765cb5..a367e226 100644 --- a/flyemflows/workflow/gridmeshes.py +++ b/flyemflows/workflow/gridmeshes.py @@ -1,8 +1,11 @@ import os import copy +import glob import logging import pickle -from functools import partial +from pathlib import Path +from tarfile import TarFile +from functools import partial, cache import numpy as np import pandas as pd @@ -302,9 +305,6 @@ def _sanitize_config(self): if not self.config['input']['dvid']['supervoxels']: raise RuntimeError("Your input dvid source should specify supervoxels: true") - if self.config['gridmeshes']['skip-existing'] and 'directory-of-tarfiles' in self.config['output']: - raise RuntimeError("Can't use skip-existing with the 'directory-of-tarfiles output format. (Not yet implemented.)") - def _init_input_service(self): """ Initialize the input and output services, @@ -480,7 +480,7 @@ def _prepare_output(self): create_tarsupervoxel_instance(server, uuid, instance, sync_instance, output_fmt) -def _determine_existing(resource_mgr, config, all_svs): +def _determine_existing(resource_mgr, config, all_svs, box): """ Determine which of the given supervoxels already have meshes stored in the configured destination. @@ -488,31 +488,51 @@ def _determine_existing(resource_mgr, config, all_svs): fmt = config["gridmeshes"]["format"] destination = config["output"] (destination_type,) = destination.keys() - assert destination_type in ('directory', 'keyvalue', 'tarsupervoxels') + assert destination_type in ('directory', 'keyvalue', 'tarsupervoxels', 'directory-of-tarfiles') if destination_type == 'directory': - d = config["output"]["directory"] - existing_svs = [] - for sv in all_svs: - # FIXME: glob would probably be faster here... - if os.path.exists(f"{d}/{sv}.{fmt}"): - existing_svs.append(sv) + existing_svs = _existing_svs_in_dir(config["output"]["directory"], fmt) + existing_svs = existing_svs[existing_svs.isin(all_svs)] + return existing_svs - elif destination_type == 'tarsupervoxels': + if destination_type == 'tarsupervoxels': tsv_instance = [destination['tarsupervoxels'][k] for k in ('server', 'uuid', 'instance')] with resource_mgr.access_context(tsv_instance[0], False, 1, len(all_svs)): exists = fetch_exists(*tsv_instance, all_svs, batch_size=10_000, processes=0, show_progress=False) - existing_svs = exists[exists].index + return exists[exists].index.values - elif destination_type == 'keyvalue': + if destination_type == 'keyvalue': logger.warning("Using skip-existing with a keyvalue output. This might take a LONG time.") kv_instance = [destination['keyvalue'][k] for k in ('server', 'uuid', 'instance')] existing_svs = [] for sv in all_svs: if fetch_key(*kv_instance, f"{sv}.{fmt}", check_head=True): existing_svs.append(sv) + return np.array(existing_svs) + + if destination_type == 'directory-of-tarfiles': + d = config["output"]["directory-of-tarfiles"] + z, y = box[0, (0,1)] + brick_dir = f"{d}/z{z}/y{y}" + + existing_sv_sets = [] + for p in glob.glob(f"{brick_dir}/*.tar"): + existing_files = TarFile(p).getnames() + svs = pd.Series(existing_files).str.extract(rf'(\d+)\.{fmt}')[0].astype(int) + existing_sv_sets.append(svs) + + existing_svs = pd.concat(existing_sv_sets, ignore_index=True) + existing_svs = existing_svs[existing_svs.isin(all_svs)] + return existing_svs.values + + raise AssertionError("Unreachable") + - return np.asarray(existing_svs) +@cache +def _existing_svs_in_dir(d, fmt): + existing_files = glob.glob(f"{d}/*.{fmt}") + existing_svs = pd.Series(existing_files).str.extract(r'[^/]+/(\d+)\.+')[0].astype(int) + return existing_svs def _write_meshes(config, resource_mgr, meshes, box): @@ -535,7 +555,16 @@ def _write_meshes(config, resource_mgr, meshes, box): brick_dir = f"{destination['directory-of-tarfiles']}/z{box[0,0]}/y{box[0,1]}" brick_name = "brick-z{:06d}-y{:06d}-x{:06d}-z{:06d}-y{:06d}-x{:06d}".format(*box[0], *box[1]) os.makedirs(brick_dir, exist_ok=True) - create_tar_from_dict(binary_meshes, f"{brick_dir}/{brick_name}.tar") + tar_path = Path(f"{brick_dir}/{brick_name}.tar") + + if options['skip-existing']: + # Pick a filename that doesn't already exist + base_tar_path = tar_path + i = 0 + while os.path.exists(tar_path): + i += 1 + tar_path = base_tar_path.with_stem(f"{base_tar_path.stem}.{i}") + create_tar_from_dict(binary_meshes, tar_path) else: total_bytes = sum(map(len, binary_meshes.values())) instance = [destination[destination_type][k] for k in ('server', 'uuid', 'instance')] @@ -575,7 +604,7 @@ def _process_brick(config, resource_mgr, input_service, brick, *, subset_supervo if len(subset_supervoxels) == 0: subset_supervoxels = all_svs - existing_svs = _determine_existing(resource_mgr, config, subset_supervoxels) + existing_svs = _determine_existing(resource_mgr, config, subset_supervoxels, brick.physical_box) if len(existing_svs): existing_svs = pd.Series(existing_svs, name='sv') path = f'{d}/brick-existing-svs-x{x}-y{y}-z{z}.feather'