Skip to content

Commit

Permalink
MitoDistances: Don't rely on Client.map() to batch the workload. It d…
Browse files Browse the repository at this point in the history
…oesn't work that way, apparently.
  • Loading branch information
stuarteberg committed Dec 30, 2020
1 parent 80624e6 commit cbc47aa
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions flyemflows/workflow/mitodistances.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from dvid_resource_manager.client import ResourceManagerClient

from neuclease.util import Timer, tqdm_proxy
from neuclease.util import Timer, tqdm_proxy, iter_batches
from neuclease.dvid import fetch_annotation_label, fetch_supervoxels
from neuclease.misc.measure_tbar_mito_distances import initialize_results, measure_tbar_mito_distances

Expand Down Expand Up @@ -205,9 +205,13 @@ def process_and_save(body):
return (body, len(tbars), 'success', timer.seconds)

logger.info(f"Processing {len(bodies)}, skipping {bodies_df['should_skip'].sum()}")

def process_batch(bodies):
return [*map(process_and_save, bodies)]

with dvid_mgr_context:
batch_size = max(1, len(bodies) // 10_000)
futures = self.client.map(process_and_save, bodies, batch_size=batch_size)
futures = self.client.map(process_batch, iter_batches(bodies, batch_size))

# Support synchronous testing with a fake 'as_completed' object
if hasattr(self.client, 'DEBUG'):
Expand All @@ -218,7 +222,7 @@ def process_and_save(body):
try:
results = []
for f, r in tqdm_proxy(ac, total=len(futures)):
results.append(r)
results.extend(r)
finally:
results = pd.DataFrame(results, columns=['body', 'synapses', 'status', 'processing_time'])
results.to_csv('results-summary.csv', header=True, index=False)
Expand Down

0 comments on commit cbc47aa

Please sign in to comment.