Skip to content

Commit

Permalink
more saving methods to compare (best zarr is save_region_bioio_2)
Browse files Browse the repository at this point in the history
  • Loading branch information
soham1202 committed Jan 4, 2025
1 parent c21b7d1 commit 290eede
Showing 1 changed file with 223 additions and 18 deletions.
241 changes: 223 additions & 18 deletions software/control/stitcher/stitcher_process.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
# stitcher_process.py
import os
import sys
from multiprocessing import Process, Queue, Event
import psutil
import shutil
import random
import json
import time
import math
from lxml import etree
import numpy as np
import pandas as pd
import cv2
Expand All @@ -19,10 +15,16 @@
import ome_zarr
import zarr
import imageio
from aicsimageio.writers import OmeTiffWriter
from aicsimageio.writers import OmeZarrWriter
from aicsimageio import types
from aicsimageio.writers import OmeTiffWriter as aicsTiffWriter
from aicsimageio.writers import OmeZarrWriter as aicsZarrWriter
from aicsimageio import types as aics_types
from bioio.writers import OmeTiffWriter as bioioTiffWriter
from bioio.writers import OmeZarrWriter as bioioZarrWriter
from bioio.writers.ome_zarr_writer_2 import OmeZarrWriter as bioioZarrWriter2
from bioio.writers.ome_zarr_writer_2 import compute_level_shapes, compute_level_chunk_sizes_zslice
from bioio_base import types as bioio_types
from basicpy import BaSiC
from multiprocessing import Process, Queue, Event, Pool, cpu_count
from control.stitcher.stitcher_parameters import StitchingParameters

# Cephla-Lab: Squid Microscopy Image Stitcher (soham mukherjee)
Expand All @@ -49,7 +51,7 @@ def __init__(self, params: StitchingParameters, progress_queue: Queue,

# Validate and store parameters
self.params = params
params.validate()
params.validate() # comment out for save_region_test.py script to bypass actual parameters

# Core attributes from parameters
self.input_folder = params.input_folder
Expand Down Expand Up @@ -144,7 +146,10 @@ def emit_progress(self, current: int, total: int):
current (int): Current progress value
total (int): Total progress value
"""
self.progress_queue.put(('progress', (current, total)))
if self.progress_queue is None:
print(f"PROGRESS: {(current, total)}")
else:
self.progress_queue.put(('progress', (current, total)))

def emit_status(self, status: str, is_saving: bool = False):
"""Send status update through queue.
Expand All @@ -153,7 +158,10 @@ def emit_status(self, status: str, is_saving: bool = False):
status (str): Status message
is_saving (bool): Whether the status is about saving data
"""
self.status_queue.put(('status', (status, is_saving)))
if self.status_queue is None:
print(f"STATUS: {status}")
else:
self.status_queue.put(('status', (status, is_saving)))

def emit_complete(self, output_path: str, dtype):
"""Send completion status through queue.
Expand All @@ -162,7 +170,10 @@ def emit_complete(self, output_path: str, dtype):
output_path (str): Path to the output file
dtype: Data type of the output
"""
self.complete_queue.put(('complete', (output_path, dtype)))
if self.complete_queue is None:
print(f"COMPLETE:")
else:
self.complete_queue.put(('complete', (output_path, dtype)))

def check_stop(self):
"""Check if processing should stop and terminate if needed."""
Expand Down Expand Up @@ -953,7 +964,7 @@ def save_region_aics(self, timepoint, region, stitched_region):
os.makedirs(os.path.dirname(output_path), exist_ok=True)

# Create physical pixel sizes object
physical_pixel_sizes = types.PhysicalPixelSizes(
physical_pixel_sizes = aics_types.PhysicalPixelSizes(
Z=self.acquisition_params.get("dz(um)", 1.0),
Y=self.pixel_size_um,
X=self.pixel_size_um
Expand All @@ -969,7 +980,7 @@ def save_region_aics(self, timepoint, region, stitched_region):

if self.output_format.endswith('.zarr'):
print(f"Writing OME-ZARR to: {output_path}\n")
writer = OmeZarrWriter(output_path)
writer = aicsZarrWriter(output_path)

# Write the image with metadata
writer.write_image(
Expand All @@ -989,7 +1000,7 @@ def save_region_aics(self, timepoint, region, stitched_region):
print(f"Writing OME-TIFF to: {output_path}\n")

# Build OME metadata for TIFF
ome_meta = OmeTiffWriter.build_ome(
ome_meta = aicsTiffWriter.build_ome(
data_shapes=[stitched_region.shape],
data_types=[stitched_region.dtype],
dimension_order=["TCZYX"],
Expand All @@ -1000,7 +1011,7 @@ def save_region_aics(self, timepoint, region, stitched_region):
)

# Write the image with metadata
OmeTiffWriter.save(
aicsTiffWriter.save(
data=stitched_region,
uri=output_path,
dim_order="TCZYX",
Expand Down Expand Up @@ -1103,6 +1114,195 @@ def save_region_ome_zarr(self, timepoint, region, stitched_region):
print(f"Time to save region {region} timepoint {timepoint}: {time.time() - start_time}")
return output_path

def save_region_parallel(self, timepoint, region, stitched_region):
"""Save stitched region data using Zarr and multiprocessing for parallel writing."""
start_time = time.time()

# Configure output path and ensure directory exists
output_path = os.path.join(
self.output_folder, f"{timepoint}_stitched", f"{region}_stitched{self.output_format}"
)
os.makedirs(os.path.dirname(output_path), exist_ok=True)

# Configure Zarr store and dataset
store = zarr.DirectoryStore(output_path)
root = zarr.group(store, overwrite=True)
chunks = (1, 1, 1, 4096, 4096)
compressor = zarr.Blosc(cname="zstd", clevel=1, shuffle=zarr.Blosc.SHUFFLE)

dataset = root.create_dataset(
"0",
shape=stitched_region.shape,
chunks=chunks,
dtype=stitched_region.dtype,
compressor=compressor,
)

# Helper function to process a single chunk
def process_chunk(chunk_coords):
"""Write a single chunk to the dataset."""
try:
z, y, x = chunk_coords
z_slice = slice(z, z + 1)
y_slice = slice(y, min(y + chunks[3], stitched_region.shape[3]))
x_slice = slice(x, min(x + chunks[4], stitched_region.shape[4]))
for c in range(stitched_region.shape[1]):
dataset[0, c, z_slice, y_slice, x_slice] = stitched_region[
0, c, z_slice, y_slice, x_slice
]
return True
except Exception as e:
return False, str(e)

# Generate chunk coordinates
chunk_coords = [
(z, y, x)
for z in range(stitched_region.shape[2]) # Z levels
for y in range(0, stitched_region.shape[3], chunks[3]) # Y chunks
for x in range(0, stitched_region.shape[4], chunks[4]) # X chunks
]

# Process chunks in parallel
num_workers = max(1, min(os.cpu_count() // 2, 8))
with Pool(num_workers) as pool:
results = list(pool.imap_unordered(process_chunk, chunk_coords))

# Check for any errors in chunk processing
if not all(r is True for r in results):
print("Errors occurred in chunk processing:", results)

# Add metadata to the Zarr store
root.attrs["multiscales"] = [
{
"datasets": [
{
"path": "0",
"coordinateTransformations": [
{
"type": "scale",
"scale": [
1, # Time
1, # Channels
float(self.acquisition_params.get("dz(um)", 1.0)), # Z
float(self.pixel_size_um), # Y
float(self.pixel_size_um), # X,
],
}
],
}
],
"version": "0.4",
}
]
root.attrs["omero"] = {
"id": 1,
"name": f"{region}_t{timepoint}",
"version": "0.4",
"channels": [
{
"label": name,
"color": f"{color:06X}",
"window": {
"start": 0,
"end": np.iinfo(self.dtype).max,
"min": 0,
"max": np.iinfo(self.dtype).max,
},
}
for name, color in zip(self.monochrome_channels, self.monochrome_colors)
],
}

elapsed = time.time() - start_time
print(f"Saved {output_path} in {elapsed:.1f}s")
return output_path

def save_region_bioio(self, timepoint, region, stitched_region):
"""Save stitched region using BioIO's OmeZarrWriter or OmeTiffWriter."""
start_time = time.time()
output_path = os.path.join(
self.output_folder, f"{timepoint}_stitched", f"{region}_stitched{self.output_format}"
)
os.makedirs(os.path.dirname(output_path), exist_ok=True)

try:
if self.output_format.endswith('.zarr'):
writer = bioioZarrWriter(output_path)
physical_pixel_sizes = bioio_types.PhysicalPixelSizes(
Z=self.acquisition_params.get("dz(um)", 1.0),
Y=self.pixel_size_um,
X=self.pixel_size_um,
)
channel_colors = [0x0000FF] * self.num_c # Default to blue for all channels

writer.write_image(
image_data=stitched_region,
image_name=f"{region}_t{timepoint}",
physical_pixel_sizes=physical_pixel_sizes,
channel_names=self.monochrome_channels,
channel_colors=channel_colors,
chunk_dims=self.chunks,
scale_num_levels=self.num_pyramid_levels,
scale_factor=2.0,
dimension_order="TCZYX",
)
else: # Save as TIFF
physical_pixel_sizes = bioio_types.PhysicalPixelSizes(
Z=self.acquisition_params.get("dz(um)", 1.0),
Y=self.pixel_size_um,
X=self.pixel_size_um,
)

bioioTiffWriter.save(
data=stitched_region,
uri=output_path,
dim_order="TCZYX",
physical_pixel_sizes=physical_pixel_sizes,
channel_names=self.monochrome_channels,
)

elapsed = time.time() - start_time
print(f"Saved {output_path} in {elapsed:.1f}s")
return output_path
except Exception as e:
self.status_queue.put(('error', f"Error saving region {region}: {str(e)}"))
raise

def save_region_bioio_2(self, timepoint, region, stitched_region):
"""Save stitched region using bioio.ome_zarr_writer_2 for OME-Zarr."""
start_time = time.time()
output_path = os.path.join(
self.output_folder, f"{timepoint}_stitched", f"{region}_stitched.ome.zarr"
)
os.makedirs(os.path.dirname(output_path), exist_ok=True)

try:
shapes = compute_level_shapes(stitched_region.shape, (1, 1, 1, 2, 2), self.num_pyramid_levels)
chunk_sizes = compute_level_chunk_sizes_zslice(shapes)

writer = bioioZarrWriter2()
writer.init_store(output_path, shapes, chunk_sizes, stitched_region.dtype)

writer.write_t_batches_array(stitched_region, tbatch=4)

metadata = writer.generate_metadata(
image_name=f"{region}_t{timepoint}",
channel_names=self.monochrome_channels,
physical_dims={"x": self.pixel_size_um, "y": self.pixel_size_um, "z": 1.0},
physical_units={"x": "micrometer", "y": "micrometer", "z": "micrometer"},
channel_colors=[0x0000FF] * self.num_c,
)
writer.write_metadata(metadata)

elapsed = time.time() - start_time
print(f"Saved {output_path} in {elapsed:.1f}s")
return output_path
except Exception as e:
self.status_queue.put(('error', f"Error saving region {region}: {str(e)}"))
raise



def _save_debug_slice(self, stitched_region, zarr_path):
"""Save a debug RGB image slice for verification.
Expand Down Expand Up @@ -1550,10 +1750,15 @@ def run(self):

# Save region
if self.output_format.endswith('.zarr'):
output_path = self.save_region_ome_zarr(timepoint, region, stitched_region) # zarr output
# output_path = self.save_region_ome_zarr(timepoint, region, stitched_region) # zarr
# output_path = self.save_region_aics(timepoint, region, stitched_region) # zarr or tiff
# output_path = self.save_region_bioio(timepoint, region, stitched_region) # zarr or tiff
output_path = self.save_region_bioio_2(timepoint, region, stitched_region) # zarr
# output_path = self.save_region_parallel(timepoint, region, stitched_region) # zarr # TO FIX

else:
output_path = self.save_region_aics(timepoint, region, stitched_region) # tiff output
# output_path = self.save_region_aics(timepoint, region, stitched_region) # zarr or tiff output
output_path = self.save_region_aics(timepoint, region, stitched_region) # zarr or tiff
# output_path = self.save_region_bioio(timepoint, region, stitched_region) # zarr or tiff

print(f"Completed region {region} in {time.time() - rtime:.1f}s")

Expand Down

0 comments on commit 290eede

Please sign in to comment.