Skip to content

Commit

Permalink
GridMeshes: Now skip-existing is permitted with directory-of-tarfiles
Browse files Browse the repository at this point in the history
  • Loading branch information
stuarteberg committed Sep 18, 2024
1 parent ee0fc5b commit b5e7a3b
Showing 1 changed file with 47 additions and 18 deletions.
65 changes: 47 additions & 18 deletions flyemflows/workflow/gridmeshes.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import os
import copy
import glob
import logging
import pickle
from functools import partial
from pathlib import Path
from tarfile import TarFile
from functools import partial, cache

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -302,9 +305,6 @@ def _sanitize_config(self):
if not self.config['input']['dvid']['supervoxels']:
raise RuntimeError("Your input dvid source should specify supervoxels: true")

if self.config['gridmeshes']['skip-existing'] and 'directory-of-tarfiles' in self.config['output']:
raise RuntimeError("Can't use skip-existing with the 'directory-of-tarfiles output format. (Not yet implemented.)")

def _init_input_service(self):
"""
Initialize the input and output services,
Expand Down Expand Up @@ -480,39 +480,59 @@ def _prepare_output(self):
create_tarsupervoxel_instance(server, uuid, instance, sync_instance, output_fmt)


def _determine_existing(resource_mgr, config, all_svs):
def _determine_existing(resource_mgr, config, all_svs, box):
"""
Determine which of the given supervoxels already have
meshes stored in the configured destination.
"""
fmt = config["gridmeshes"]["format"]
destination = config["output"]
(destination_type,) = destination.keys()
assert destination_type in ('directory', 'keyvalue', 'tarsupervoxels')
assert destination_type in ('directory', 'keyvalue', 'tarsupervoxels', 'directory-of-tarfiles')

if destination_type == 'directory':
d = config["output"]["directory"]
existing_svs = []
for sv in all_svs:
# FIXME: glob would probably be faster here...
if os.path.exists(f"{d}/{sv}.{fmt}"):
existing_svs.append(sv)
existing_svs = _existing_svs_in_dir(config["output"]["directory"], fmt)
existing_svs = existing_svs[existing_svs.isin(all_svs)]
return existing_svs

elif destination_type == 'tarsupervoxels':
if destination_type == 'tarsupervoxels':
tsv_instance = [destination['tarsupervoxels'][k] for k in ('server', 'uuid', 'instance')]
with resource_mgr.access_context(tsv_instance[0], False, 1, len(all_svs)):
exists = fetch_exists(*tsv_instance, all_svs, batch_size=10_000, processes=0, show_progress=False)
existing_svs = exists[exists].index
return exists[exists].index.values

elif destination_type == 'keyvalue':
if destination_type == 'keyvalue':
logger.warning("Using skip-existing with a keyvalue output. This might take a LONG time.")
kv_instance = [destination['keyvalue'][k] for k in ('server', 'uuid', 'instance')]
existing_svs = []
for sv in all_svs:
if fetch_key(*kv_instance, f"{sv}.{fmt}", check_head=True):
existing_svs.append(sv)
return np.array(existing_svs)

if destination_type == 'directory-of-tarfiles':
d = config["output"]["directory-of-tarfiles"]
z, y = box[0, (0,1)]
brick_dir = f"{d}/z{z}/y{y}"

existing_sv_sets = []
for p in glob.glob(f"{brick_dir}/*.tar"):
existing_files = TarFile(p).getnames()
svs = pd.Series(existing_files).str.extract(rf'(\d+)\.{fmt}')[0].astype(int)
existing_sv_sets.append(svs)

existing_svs = pd.concat(existing_sv_sets, ignore_index=True)
existing_svs = existing_svs[existing_svs.isin(all_svs)]
return existing_svs.values

raise AssertionError("Unreachable")


return np.asarray(existing_svs)
@cache
def _existing_svs_in_dir(d, fmt):
existing_files = glob.glob(f"{d}/*.{fmt}")
existing_svs = pd.Series(existing_files).str.extract(r'[^/]+/(\d+)\.+')[0].astype(int)
return existing_svs


def _write_meshes(config, resource_mgr, meshes, box):
Expand All @@ -535,7 +555,16 @@ def _write_meshes(config, resource_mgr, meshes, box):
brick_dir = f"{destination['directory-of-tarfiles']}/z{box[0,0]}/y{box[0,1]}"
brick_name = "brick-z{:06d}-y{:06d}-x{:06d}-z{:06d}-y{:06d}-x{:06d}".format(*box[0], *box[1])
os.makedirs(brick_dir, exist_ok=True)
create_tar_from_dict(binary_meshes, f"{brick_dir}/{brick_name}.tar")
tar_path = Path(f"{brick_dir}/{brick_name}.tar")

if options['skip-existing']:
# Pick a filename that doesn't already exist
base_tar_path = tar_path
i = 0
while os.path.exists(tar_path):
i += 1
tar_path = base_tar_path.with_stem(f"{base_tar_path.stem}.{i}")
create_tar_from_dict(binary_meshes, tar_path)
else:
total_bytes = sum(map(len, binary_meshes.values()))
instance = [destination[destination_type][k] for k in ('server', 'uuid', 'instance')]
Expand Down Expand Up @@ -575,7 +604,7 @@ def _process_brick(config, resource_mgr, input_service, brick, *, subset_supervo
if len(subset_supervoxels) == 0:
subset_supervoxels = all_svs

existing_svs = _determine_existing(resource_mgr, config, subset_supervoxels)
existing_svs = _determine_existing(resource_mgr, config, subset_supervoxels, brick.physical_box)
if len(existing_svs):
existing_svs = pd.Series(existing_svs, name='sv')
path = f'{d}/brick-existing-svs-x{x}-y{y}-z{z}.feather'
Expand Down

0 comments on commit b5e7a3b

Please sign in to comment.