diff --git a/bin/iconv.dll b/bin/iconv.dll
new file mode 100644
index 0000000..ca3447e
Binary files /dev/null and b/bin/iconv.dll differ
diff --git a/bin/libcairo-2.dll b/bin/libcairo-2.dll
new file mode 100644
index 0000000..2332d09
Binary files /dev/null and b/bin/libcairo-2.dll differ
diff --git a/bin/libffi-6.dll b/bin/libffi-6.dll
new file mode 100644
index 0000000..3937264
Binary files /dev/null and b/bin/libffi-6.dll differ
diff --git a/bin/libgdk_pixbuf-2.0-0.dll b/bin/libgdk_pixbuf-2.0-0.dll
new file mode 100644
index 0000000..d9a37e5
Binary files /dev/null and b/bin/libgdk_pixbuf-2.0-0.dll differ
diff --git a/bin/libgio-2.0-0.dll b/bin/libgio-2.0-0.dll
new file mode 100644
index 0000000..5b5879a
Binary files /dev/null and b/bin/libgio-2.0-0.dll differ
diff --git a/bin/libglib-2.0-0.dll b/bin/libglib-2.0-0.dll
new file mode 100644
index 0000000..b890755
Binary files /dev/null and b/bin/libglib-2.0-0.dll differ
diff --git a/bin/libgmodule-2.0-0.dll b/bin/libgmodule-2.0-0.dll
new file mode 100644
index 0000000..c50f3b7
Binary files /dev/null and b/bin/libgmodule-2.0-0.dll differ
diff --git a/bin/libgobject-2.0-0.dll b/bin/libgobject-2.0-0.dll
new file mode 100644
index 0000000..faf3f6c
Binary files /dev/null and b/bin/libgobject-2.0-0.dll differ
diff --git a/bin/libgthread-2.0-0.dll b/bin/libgthread-2.0-0.dll
new file mode 100644
index 0000000..61af73d
Binary files /dev/null and b/bin/libgthread-2.0-0.dll differ
diff --git a/bin/libintl-8.dll b/bin/libintl-8.dll
new file mode 100644
index 0000000..e49171b
Binary files /dev/null and b/bin/libintl-8.dll differ
diff --git a/bin/libjpeg-62.dll b/bin/libjpeg-62.dll
new file mode 100644
index 0000000..c11cb36
Binary files /dev/null and b/bin/libjpeg-62.dll differ
diff --git a/bin/libopenjp2.dll b/bin/libopenjp2.dll
new file mode 100644
index 0000000..260d118
Binary files /dev/null and b/bin/libopenjp2.dll differ
diff --git a/bin/libpixman-1-0.dll b/bin/libpixman-1-0.dll
new file mode 100644
index 0000000..427d292
Binary files /dev/null and b/bin/libpixman-1-0.dll differ
diff --git a/bin/libpng16-16.dll b/bin/libpng16-16.dll
new file mode 100644
index 0000000..8c2f16c
Binary files /dev/null and b/bin/libpng16-16.dll differ
diff --git a/bin/libsqlite3-0.dll b/bin/libsqlite3-0.dll
new file mode 100644
index 0000000..266a288
Binary files /dev/null and b/bin/libsqlite3-0.dll differ
diff --git a/bin/libtiff-5.dll b/bin/libtiff-5.dll
new file mode 100644
index 0000000..8f67475
Binary files /dev/null and b/bin/libtiff-5.dll differ
diff --git a/bin/libxml2-2.dll b/bin/libxml2-2.dll
new file mode 100644
index 0000000..4f099b1
Binary files /dev/null and b/bin/libxml2-2.dll differ
diff --git a/bin/openslide-jni.dll b/bin/openslide-jni.dll
new file mode 100644
index 0000000..1198b4d
Binary files /dev/null and b/bin/openslide-jni.dll differ
diff --git a/bin/openslide.jar b/bin/openslide.jar
new file mode 100644
index 0000000..8054024
Binary files /dev/null and b/bin/openslide.jar differ
diff --git a/bin/zlib1.dll b/bin/zlib1.dll
new file mode 100644
index 0000000..b56c2e6
Binary files /dev/null and b/bin/zlib1.dll differ
diff --git a/histoqc/AnnotationModule.py b/histoqc/AnnotationModule.py
index 5027bcf..323fb1f 100644
--- a/histoqc/AnnotationModule.py
+++ b/histoqc/AnnotationModule.py
@@ -1,117 +1,67 @@
import logging
+from typing import List, Tuple
from histoqc.BaseImage import printMaskHelper
from skimage import io, img_as_ubyte
-from skimage.draw import polygon
import os
from pathlib import PurePosixPath, Path
-import json
-import xml.etree.ElementTree as ET
+from shapely.geometry import Polygon
+from shapely import affinity
+from PIL import Image, ImageDraw
import numpy as np
+from histoqc.annotations.annot_collection import AnnotCollection, PARSER_BUILDER_MAP, TYPE_SUPPORTED_PARSER, Region
+
+
+def rescale_by_img_bbox(polygon: Polygon, offset_xy: Tuple[float, float], resize_factor: float) -> Polygon:
+ if isinstance(offset_xy, float):
+ offset_xy = (offset_xy, offset_xy)
+ x_off, y_off = offset_xy
+ polygon = affinity.translate(polygon, xoff=x_off, yoff=y_off)
+ polygon = affinity.scale(polygon, xfact=resize_factor, yfact=resize_factor, origin=(0, 0))
+ return polygon
+
+
+def polygon_filled(draw_pil: ImageDraw, polygon: Polygon, offset_xy: Tuple[float, float], resize_factor: float):
+ polygon = rescale_by_img_bbox(polygon, offset_xy, resize_factor)
+ # outer
+ exterior_coords = list(polygon.exterior.coords)
+ draw_pil.polygon(exterior_coords, fill=1, outline=1, width=0)
+ for component in polygon.interiors:
+ interior_coord = list(component.coords)
+ draw_pil.polygon(interior_coord, fill=0, outline=0, width=0)
+ return draw_pil
+
+
+def annotation_to_mask(width: int, height: int, annot_collection: AnnotCollection, offset_xy: Tuple[float, float],
+ resize_factor: float) -> np.ndarray:
+ # binary
+ mask = Image.new(mode="1", size=(width, height))
+ draw_pil = ImageDraw.Draw(mask)
+ all_regions: List[Region] = annot_collection.all_regions
+ for region in all_regions:
+ polygon: Polygon = region['polygon']
+ # skip if empty ring (e.g., misclick in qupath)
+ if polygon.is_empty or (not polygon.is_valid):
+ continue
+ draw_pil = polygon_filled(draw_pil, polygon, offset_xy, resize_factor)
+ # noinspection PyTypeChecker
+ return np.array(mask)
-def get_points_from_xml(xml_fname):
- """
- Parses the xml file to get those annotations as lists of verticies
- xmlMask will create a mask that is true inside the annotated region described in the specified xml file. The xml file must follow the ImageScope format, the minimal components of which are:
- ```
-
-
-
-
-
-
-
-
-
-
-
-
-
- ```
- With more or blocks as needed for additional annotations. There is no functional difference between multiple blocks and one blocks with multiple blocks
- """
- # create element tree object
- tree = ET.parse(xml_fname)
-
- # get root element
- root = tree.getroot()
-
- # list of list of vertex coordinates
- # i.e. a list of sets of points
- points = []
-
- for annotation in root.findall('Annotation'):
- for regions in annotation.findall('Regions'):
- for region in regions.findall('Region'):
- for vertices in region.findall('Vertices'):
- points.append([(int(float(vertex.get('X'))),int(float(vertex.get('Y')))) for vertex in vertices.findall('Vertex')])
-
- return points
-
-def get_points_from_geojson(s, fname):
- """
- Parses a typical GeoJSON file containing one or more Polygon or MultiPolygon features.
- These JSON files are the preferred way to serialize QuPath annotations, for example.
- See https://qupath.readthedocs.io/en/latest/docs/scripting/overview.html#serialization-json
- """
- with open(fname) as f:
- geojson = json.load(f)
- point_sets = []
- for annot in geojson:
- geometry = annot['geometry']
- geom_type = geometry['type']
- coordinates = geometry['coordinates']
- if geom_type == 'MultiPolygon':
- for roi in coordinates:
- for points in roi:
- point_sets.append([(coord[0], coord[1]) for coord in points])
- elif geom_type == 'Polygon':
- for points in coordinates:
- point_sets.append([(coord[0], coord[1]) for coord in points])
- elif geom_type == 'LineString':
- point_sets.append([(coord[0], coord[1]) for coord in coordinates])
- else:
- msg = f"Skipping {geom_type} geometry in {fname}. Only Polygon, MultiPolygon, and LineString annotation types can be used."
- logging.warning(s['filename'] + ' - ' + msg)
- s["warnings"].append(msg)
- return point_sets
-
-def resize_points(points, resize_factor, offset=(0,0)):
- for k, pointSet in enumerate(points):
- points[k] = [(int((p[0] - offset[0]) * resize_factor), int((p[1] - offset[1]) * resize_factor)) for p in pointSet]
- return points.copy()
-
-def mask_out_annotation(s, point_sets):
- """Returns the mask of annotations"""
- (x, y, ncol, nrow) = s["img_bbox"]
- resize_factor = np.shape(s["img_mask_use"])[1] / ncol
-
- point_sets = resize_points(point_sets, resize_factor, offset=(x,y))
-
- mask = np.zeros((np.shape(s["img_mask_use"])[0],np.shape(s["img_mask_use"])[1]),dtype=np.uint8)
-
- for pointSet in point_sets:
- poly = np.asarray(pointSet)
- rr, cc = polygon(poly[:,1],poly[:,0],mask.shape)
- mask[rr,cc] = 1
-
- return mask
def getParams(s, params):
# read params - format: xml, json; file_path; suffix;
- format = params.get("format", None)
+ ann_format = params.get("format", None)
file_path = params.get("file_path", None)
suffix = params.get("suffix", "")
# try use default value if the params are not provided
- if not format:
+ if not ann_format:
# set default format
- format = "xml"
+ ann_format = "xml"
# warning msg
msg = f"format is not provided, using xml as the default format."
logging.warning(f"{s['filename']} - {msg}")
s["warnings"].append(msg)
-
-
+
if not file_path:
# set default file path
file_path = s["dir"]
@@ -119,42 +69,48 @@ def getParams(s, params):
msg = f"file path is not provided, using \"{s['dir']}\" as the default file path"
logging.warning(f"{s['filename']} - {msg}")
s["warnings"].append(msg)
-
- return (format, file_path, suffix)
+ return ann_format, file_path, suffix
+
def saveAnnotationMask(s, params):
logging.info(f"{s['filename']} - \tgetAnnotationMask")
-
- (format, file_path, suffix) = getParams(s, params)
-
+
+ (ann_format, file_path, suffix) = getParams(s, params)
+
# annotation file path
- f_path = f"{file_path}{os.sep}{PurePosixPath(s['filename']).stem}{suffix}.{format}"
+ f_path = f"{file_path}{os.sep}{PurePosixPath(s['filename']).stem}{suffix}.{ann_format}"
if not Path(f_path).is_file():
msg = f"Annotation file {f_path} does not exist. Skipping..."
logging.warning(f"{s['filename']} - {msg}")
s["warnings"].append(msg)
return
-
+
logging.info(f"{s['filename']} - \tusing {f_path}")
+ # todo better using the Py3.10 match statement - so it will be a Literal
+ # noinspection PyTypeChecker
+ annotation_type: TYPE_SUPPORTED_PARSER = ann_format.lower()
+ logging.info(f"{s['filename']} - \tusing {annotation_type}")
# read points set
- if(format.lower() == 'xml'): # xml
- point_sets = get_points_from_xml(f_path)
- elif(format.lower() == 'json'): # geojson
- point_sets = get_points_from_geojson(s, f_path)
- else: # unsupported format
- msg = f"unsupported file format '{format}'. Skipping..."
+ if annotation_type in PARSER_BUILDER_MAP: # xml
+ annot_collection = AnnotCollection.build(parser_type=annotation_type, uri=f_path, label_map=None)
+ # get_points_from_geojson(s, f_path)
+ else: # unsupported format
+ msg = f"unsupported file format '{ann_format}'. Skipping..."
logging.warning(f"{s['filename']} - {msg}")
s["warnings"].append(msg)
return
- annotationMask = mask_out_annotation(s, point_sets) > 0
+ (off_x, off_y, ncol, nrow) = s["img_bbox"]
+ resize_factor = np.shape(s["img_mask_use"])[1] / ncol
+ height, width = s["img_mask_use"].shape
+ annotationMask = annotation_to_mask(width, height, annot_collection, (off_x, off_y), resize_factor) > 0
- mask_file_name = f"{s['outdir']}{os.sep}{s['filename']}_annot_{format.lower()}.png"
+ mask_file_name = f"{s['outdir']}{os.sep}{s['filename']}_annot_{ann_format.lower()}.png"
io.imsave(mask_file_name, img_as_ubyte(annotationMask))
-
+
prev_mask = s["img_mask_use"]
s["img_mask_use"] = prev_mask & annotationMask
s.addToPrintList("getAnnotationMask",
@@ -162,8 +118,9 @@ def saveAnnotationMask(s, params):
if len(s["img_mask_use"].nonzero()[0]) == 0: # add warning in case the final tissue is empty
logging.warning(
- f"{s['filename']} - After AnnotationModule.getAnnotationMask NO tissue remains detectable! Downstream modules likely to be incorrect/fail")
+ f"{s['filename']} - After AnnotationModule.getAnnotationMask "
+ f"NO tissue remains detectable! Downstream modules likely to be incorrect/fail")
s["warnings"].append(
- f"After AnnotationModule.getAnnotationMask NO tissue remains detectable! Downstream modules likely to be incorrect/fail")
-
- return
\ No newline at end of file
+ f"After AnnotationModule.getAnnotationMask NO tissue remains detectable!"
+ f" Downstream modules likely to be incorrect/fail")
+ return
diff --git a/histoqc/BaseImage.py b/histoqc/BaseImage.py
index 85cb5b0..78108e8 100644
--- a/histoqc/BaseImage.py
+++ b/histoqc/BaseImage.py
@@ -1,12 +1,14 @@
import logging
import os
import numpy as np
-import zlib, dill
+import zlib
+import dill
from distutils.util import strtobool
from PIL import Image
import re
from typing import Union, Tuple
from histoqc.wsihandles.WSIImageHandle import WSIImageHandle
+
_REGEX_MAG = r"^(\d?\.?\d*X?)"
_PATTERN_MAG: re.Pattern = re.compile(_REGEX_MAG, flags=re.IGNORECASE)
MAG_NA = None
@@ -29,25 +31,25 @@ def __init__(self, fname, fname_outdir, params):
self["outdir"] = fname_outdir
self["dir"] = os.path.dirname(fname)
-
# get handles from config
handles = params.get("handles", "openslide,wsidicom")
# dynamically load wsi image handle
self["os_handle"]: WSIImageHandle = WSIImageHandle.create_wsi_handle(fname, handles)
self["image_base_size"] = self["os_handle"].dimensions
- self["enable_bounding_box"] = strtobool(params.get("enable_bounding_box","False"))
+ self["enable_bounding_box"] = strtobool(params.get("enable_bounding_box", "False"))
# check if the bbox if doesn't have bbox set enable_bounding_box to False
self.setBBox()
self.addToPrintList("image_bounding_box", self["img_bbox"])
self["image_work_size"] = params.get("image_work_size", "1.25x")
self["mask_statistics"] = params.get("mask_statistics", "relative2mask")
-
+
self["base_mag"] = getMag(self, params)
if not self["base_mag"]:
- logging.error(f"{self['filename']}: Has unknown or uncalculated base magnification, cannot specify magnification scale! Did you try getMag?")
- return -1
+ logging.error(
+ f"{self['filename']}: Has unknown or uncalculated base magnification, cannot specify magnification scale! Did you try getMag?")
+ return
self.addToPrintList("base_mag", self["base_mag"])
@@ -64,15 +66,15 @@ def __init__(self, fname, fname_outdir, params):
def __getitem__(self, key):
value = super(BaseImage, self).__getitem__(key)
- if hasattr(self,"in_memory_compression") and self.in_memory_compression and key.startswith("img"):
+ if hasattr(self, "in_memory_compression") and self.in_memory_compression and key.startswith("img"):
value = dill.loads(zlib.decompress(value))
return value
def __setitem__(self, key, value):
- if hasattr(self,"in_memory_compression") and self.in_memory_compression and key.startswith("img"):
+ if hasattr(self, "in_memory_compression") and self.in_memory_compression and key.startswith("img"):
value = zlib.compress(dill.dumps(value), level=5)
- return super(BaseImage, self).__setitem__(key,value)
+ return super(BaseImage, self).__setitem__(key, value)
# setbounding box start coordinate and size
def setBBox(self):
@@ -82,14 +84,14 @@ def setBBox(self):
(dim_width, dim_height) = osh.dimensions
self["img_bbox"] = (0, 0, dim_width, dim_height)
# try to get bbox if bounding_box is ture
-
+
# Does WSI has bounding box
if self["enable_bounding_box"] and osh.has_bounding_box:
self["img_bbox"] = osh.bounding_box
elif self["enable_bounding_box"] and not osh.has_bounding_box:
self["enable_bounding_box"] = False
logging.warning(f"{self['filename']}: Bounding Box requested but could not read")
- self["warnings"].append("Bounding Box requested but could not read")
+ self["warnings"].append("Bounding Box requested but could not read")
def addToPrintList(self, name, val):
self[name] = val
@@ -97,10 +99,10 @@ def addToPrintList(self, name, val):
# find the next higher level by giving a downsample factor
# return (level, isFindCloseLevel)
- def getBestLevelForDownsample(self, downsample_factor: float) -> Tuple[int, bool]:
+ def getBestLevelForDownsample(self, downsample_factor: float) -> Tuple[int, bool]:
osh = self["os_handle"]
- relative_down_factors_idx=[np.isclose(i/downsample_factor,1,atol=.01) for i in osh.level_downsamples]
- level=np.where(relative_down_factors_idx)[0]
+ relative_down_factors_idx = [np.isclose(i / downsample_factor, 1, atol=.01) for i in osh.level_downsamples]
+ level = np.where(relative_down_factors_idx)[0]
if level.size:
return (level[0], True)
else:
@@ -129,7 +131,7 @@ def getImgThumb(self, size: str):
# return the img if it exists
if key in self:
return self[key]
-
+
# get open slide handle
osh = self["os_handle"]
@@ -154,10 +156,10 @@ def getImgThumb(self, size: str):
base_mag = self["base_mag"]
target_sampling_factor = base_mag / target_mag
target_dims = tuple(np.rint(np.asarray(img_base_size) / target_sampling_factor).astype(int))
-
+
# generate the thumb img
self[key] = getBestThumb(self, bx, by, target_dims, target_sampling_factor)
-
+
# the size of the img is number
elif size.replace(".", "0", 1).isdigit():
size = float(size)
@@ -166,7 +168,7 @@ def getImgThumb(self, size: str):
target_downscaling_factor = size
target_sampling_factor = 1 / target_downscaling_factor
target_dims = tuple(np.rint(np.asarray(img_base_size) * target_downscaling_factor).astype(int))
-
+
# generate the thumb img
self[key] = getBestThumb(self, bx, by, target_dims, target_sampling_factor)
@@ -175,20 +177,20 @@ def getImgThumb(self, size: str):
target_level = int(size)
if target_level >= osh.level_count:
target_level = osh.level_count - 1
- msg = f"Desired Image Level {size+1} does not exist! Instead using level {osh.level_count-1}! Downstream output may not be correct"
- logging.error(f"{self['filename']}: {msg}" )
+ msg = f"Desired Image Level {size + 1} does not exist! Instead using level {osh.level_count - 1}! Downstream output may not be correct"
+ logging.error(f"{self['filename']}: {msg}")
self["warnings"].append(msg)
- size = (tuple((np.array(img_base_size)/osh.level_downsamples[target_level]).astype(int))
+ size = (tuple((np.array(img_base_size) / osh.level_downsamples[target_level]).astype(int))
if self["enable_bounding_box"]
else osh.level_dimensions[target_level])
logging.info(
f"{self['filename']} - \t\tloading image from level {target_level} of size {osh.level_dimensions[target_level]}")
- tile = osh.read_region((bx, by), target_level, size)
+ tile = osh.read_region((bx, by), target_level, size)
self[key] = (np.asarray(rgba2rgb(self, tile))
- if np.shape(tile)[-1]==4
- else np.asarray(tile))
-
- # specifies a desired size of thumbnail
+ if np.shape(tile)[-1] == 4
+ else np.asarray(tile))
+
+ # specifies a desired size of thumbnail
else:
# recommend having the dimension is less than 10k
if size > 10000:
@@ -201,36 +203,40 @@ def getImgThumb(self, size: str):
self[key] = getBestThumb(self, bx, by, target_dims, target_sampling_factor)
return self[key]
+
def getBestThumb(s: BaseImage, x: int, y: int, dims: Tuple[int, int], target_sampling_factor: float):
osh = s["os_handle"]
-
+
# get thumb from og
if not s["enable_bounding_box"]:
max_dim = dims[0] if dims[0] > dims[1] else dims[1]
return np.array(osh.get_thumbnail((max_dim, max_dim)))
-
+
(level, isExactLevel) = s.getBestLevelForDownsample(target_sampling_factor)
-
+
# check if get the existing level
if isExactLevel:
tile = osh.read_region((x, y), level, dims)
- return np.asarray(rgba2rgb(s, tile)) if np.shape(tile)[-1]==4 else np.asarray(tile)
+ return np.asarray(rgba2rgb(s, tile)) if np.shape(tile)[-1] == 4 else np.asarray(tile)
# scale down the thumb img from the next high level
else:
return resizeTileDownward(s, target_sampling_factor, level)
-
+
+
'''
the followings are helper functions
'''
+
+
def resizeTileDownward(self, target_downsampling_factor, level):
osh = self["os_handle"]
(bx, by, bwidth, bheight) = self["img_bbox"]
end_x = bx + bwidth
end_y = by + bheight
-
+
cloest_downsampling_factor = osh.level_downsamples[level]
win_size = 2048
-
+
# create a new img
output = []
for x in range(bx, end_x, win_size):
@@ -240,26 +246,25 @@ def resizeTileDownward(self, target_downsampling_factor, level):
# Adjust extraction size for endcut
if end_x < x + win_width:
win_width = end_x - x
- if end_y < y + win_height:
+ if end_y < y + win_height:
win_height = end_y - y
-
win_down_width = int(round(win_width / target_downsampling_factor))
win_down_height = int(round(win_height / target_downsampling_factor))
-
+
win_width = int(round(win_width / cloest_downsampling_factor))
win_height = int(round(win_height / cloest_downsampling_factor))
-
+
# TODO Note: this isn't very efficient, and if more efficiency isneeded
# We should likely refactor using "paste" from Image.
# Or even just set the pixels directly with indexing.
cloest_region = osh.read_region((x, y), level, (win_width, win_height))
- if np.shape(cloest_region)[-1]==4:
+ if np.shape(cloest_region)[-1] == 4:
cloest_region = rgba2rgb(self, cloest_region)
target_region = cloest_region.resize((win_down_width, win_down_height))
row_piece.append(target_region)
row_piece = np.concatenate(row_piece, axis=0)
-
+
output.append(row_piece)
output = np.concatenate(output, axis=1)
return output
@@ -308,7 +313,6 @@ def parsed_mag(mag: Union[str, int, float]) -> Union[None, float]:
# this function is seperated out because in the future we hope to have automatic detection of
# magnification if not present in open slide, and/or to confirm openslide base magnification
def getMag(s: BaseImage, params) -> Union[float, None]:
-
osh = s["os_handle"]
mag = osh.magnification or MAG_NA
# workaround for unspecified mag -- with or without automatic detection it might be preferred to have
diff --git a/histoqc/BasicModule.py b/histoqc/BasicModule.py
index 33053ec..418c715 100644
--- a/histoqc/BasicModule.py
+++ b/histoqc/BasicModule.py
@@ -10,14 +10,13 @@
def getBasicStats(s, params):
logging.info(f"{s['filename']} - \tgetBasicStats")
osh = s["os_handle"]
- s.addToPrintList("type", osh.vendor)
- s.addToPrintList("levels", osh.level_count)
- (width, height) = osh.dimensions
- s.addToPrintList("height", width)
- s.addToPrintList("width", height)
- s.addToPrintList("mpp_x", osh.mpp_x)
- s.addToPrintList("mpp_y", osh.mpp_y)
- s.addToPrintList("comment", osh.comment.replace("\n", " ").replace("\r", " "))
+ s.addToPrintList("type", osh.properties.get("openslide.vendor", "NA"))
+ s.addToPrintList("levels", osh.properties.get("openslide.level-count", "NA"))
+ s.addToPrintList("height", osh.properties.get("openslide.level[0].height", "NA"))
+ s.addToPrintList("width", osh.properties.get("openslide.level[0].width", "NA"))
+ s.addToPrintList("mpp_x", osh.properties.get("openslide.mpp-x", "NA"))
+ s.addToPrintList("mpp_y", osh.properties.get("openslide.mpp-y", "NA"))
+ s.addToPrintList("comment", osh.properties.get("openslide.comment", "NA").replace("\n", " ").replace("\r", " "))
return
diff --git a/histoqc/ClassificationModule.py b/histoqc/ClassificationModule.py
index 8651e15..9567a49 100644
--- a/histoqc/ClassificationModule.py
+++ b/histoqc/ClassificationModule.py
@@ -8,7 +8,7 @@
from distutils.util import strtobool
from histoqc.BaseImage import printMaskHelper
-from skimage import io, img_as_ubyte
+from skimage import io, img_as_ubyte, img_as_bool
from skimage.filters import gabor_kernel, frangi, gaussian, median, laplace
from skimage.color import rgb2gray
from skimage.morphology import remove_small_objects, disk, dilation
@@ -179,7 +179,18 @@ def byExampleWithFeatures(s, params):
eximg = compute_features(img, params)
eximg = eximg.reshape(-1, eximg.shape[2])
- mask = io.imread(ex[1], as_gray=True).reshape(-1, 1)
+ # read mask as grayscale images
+ mask = io.imread(ex[1], as_gray=True)
+ # convert grayscale images into binary images if images are not binary format
+ if mask.dtype.kind != 'b':
+ # warning log
+ msg = f"Mask file '{ex[1]}' is not a binary image. Automatically converting to binary..."
+ logging.warning(s['filename'] + ' - ' + msg)
+ s["warnings"].append(msg)
+ # convert to binary
+ mask = img_as_bool(mask)
+
+ mask = mask.reshape(-1, 1)
if nsamples_per_example != -1: #sub sambling required
nitems = nsamples_per_example if nsamples_per_example > 1 else int(mask.shape[0]*nsamples_per_example)
diff --git a/histoqc/DeconvolutionModule.py b/histoqc/DeconvolutionModule.py
index 8c85732..dbb41f1 100644
--- a/histoqc/DeconvolutionModule.py
+++ b/histoqc/DeconvolutionModule.py
@@ -23,9 +23,9 @@ def separateStains(s, params):
sys.exit(1)
return
- stain_matrix = getattr(sys.modules[__name__], stain, "")
+ stain_matrix = getattr(sys.modules[__name__], stain, None)
- if stain_matrix == "":
+ if stain_matrix is None:
logging.error(f"{s['filename']} - Unknown stain matrix specified in DeconolutionModule.separateStains")
sys.exit(1)
return
diff --git a/histoqc/LocalTextureEstimationModule.py b/histoqc/LocalTextureEstimationModule.py
index 87186e2..6d3733f 100644
--- a/histoqc/LocalTextureEstimationModule.py
+++ b/histoqc/LocalTextureEstimationModule.py
@@ -24,6 +24,12 @@ def estimateGreyComatrixFeatures(s, params):
img = color.rgb2gray(img)
mask = s[mask_name] if not invert else ~s[mask_name]
+ if len(mask.nonzero()[0]) == 0: # add warning in case the no tissus detected in mask
+ msg = f"LocalTextureEstimationModule.estimateGreyComatrixFeatures:{prefix} Can not estimate the empty mask since NO tissue remains detectable in mask"
+ logging.warning(f"{s['filename']} - {msg}")
+ s["warnings"].append(msg)
+ return
+
maskidx = mask.nonzero()
maskidx = np.asarray(maskidx).transpose()
idx = np.random.choice(maskidx.shape[0], npatches)
diff --git a/histoqc/SaveModule.py b/histoqc/SaveModule.py
index 656a1e1..627f8ca 100644
--- a/histoqc/SaveModule.py
+++ b/histoqc/SaveModule.py
@@ -40,24 +40,14 @@ def saveAssociatedImage(s, key:str, dim:int):
logging.info(f"{s['filename']} - \tsave{key.capitalize()}")
osh = s["os_handle"]
- # get asscociated image by key
- associated_img = None
- try:
- # get label image
- if key == "label":
- associated_img = osh.read_label()
- # get macro image
- elif key == "macro":
- associated_img = osh.read_macro()
- else:
- raise NameError(f"Unknown {key} image in associated_images from {s['filename']}")
- except Exception:
- message = f"{s['filename']}- \tsave{key.capitalize()} Can't Read '{key}' Image from Slide's Associated Images"
+ if not key in osh.associated_images:
+ message = f"{s['filename']}- save{key.capitalize()} Can't Read '{key}' Image from Slide's Associated Images"
logging.warning(message)
s["warnings"].append(message)
return
-
- # get associated image size
+
+ # get asscociated image by key
+ associated_img = osh.associated_images[key]
(width, height) = associated_img.size
# calulate the width or height depends on dim
@@ -72,11 +62,6 @@ def saveAssociatedImage(s, key:str, dim:int):
associated_img = np.asarray(associated_img)[:, :, 0:3]
io.imsave(f"{s['outdir']}{os.sep}{s['filename']}_{key}.png", associated_img)
-def saveLabel(s, params):
- dim = params.get("small_dim", 500)
- saveAssociatedImage(s, "label", dim)
- return
-
def saveMacro(s, params):
dim = params.get("small_dim", 500)
saveAssociatedImage(s, "macro", dim)
diff --git a/histoqc/TileExtractionModule.py b/histoqc/TileExtractionModule.py
index 6058d22..0101ca4 100644
--- a/histoqc/TileExtractionModule.py
+++ b/histoqc/TileExtractionModule.py
@@ -1,32 +1,24 @@
+"""
+A standalone tile extraction module to locate tile bounding boxes in usable tissue region obtained by previous steps.
+Coordinates are saved in the half-open 4-tuple convention of (left, top, right, bottom), where `right` and `bottom`
+are open.
+"""
import os
-
+import openslide
import json
from histoqc.BaseImage import BaseImage
-from histoqc.wsihandles.WSIImageHandle import WSIImageHandle
-from typing import Callable, Dict, Any, List, Tuple, Union, TypeVar, Type
+from typing import Callable, Dict, Any, List, Tuple, Union
import numpy as np
from PIL import Image, ImageDraw
from skimage.measure import regionprops
from contextlib import contextmanager
from distutils.util import strtobool
import logging
-import importlib
-
-
-def __dynamic_import(module_name: str, attribute_name: str, surrogate: Union[str, None]):
- module = importlib.import_module(module_name)
- attribute = getattr(module, attribute_name, None)
- if attribute is not None:
- return attribute
- if surrogate is not None:
- return __dynamic_import(surrogate, attribute_name, None)
- raise ImportError(f"Cannot Import {attribute_name} from either {module_name} or {surrogate}")
-
-
-__TYPE_GET_ARGS = Callable[[Type, ], Tuple[Any, ...]]
-
-Literal: TypeVar = __dynamic_import("typing", "Literal", "typing_extensions")
-get_args: __TYPE_GET_ARGS = __dynamic_import("typing", "get_args", "typing_extensions")
+from histoqc.import_wrapper.typing import Literal, get_args
+# from histoqc.import_wrapper.helper import dynamic_import
+# __TYPE_GET_ARGS = Callable[[Type, ], Tuple[Any, ...]]
+# Literal: TypeVar = dynamic_import("typing", "Literal", "typing_extensions")
+# get_args: __TYPE_GET_ARGS = dynamic_import("typing", "get_args", "typing_extensions")
TYPE_TILE_SIZE = Literal['tile_size']
TYPE_TILE_STRIDE = Literal['tile_stride']
@@ -42,20 +34,27 @@ def __dynamic_import(module_name: str, attribute_name: str, surrogate: Union[str
TYPE_LOCK, TYPE_OUTLINE, TYPE_WIDTH, TYPE_SAVE_FLAG]
+TYPE_BBOX_FLOAT = Tuple[float, float, float, float]
+TYPE_BBOX_INT = Tuple[int, int, int, int]
+
+
def default_screen_identity(img: np.ndarray):
return True
class MaskTileWindows:
"""
- Locate the window of tiles in the given downsampled mask. Output Convention: (left, top, right, bottom)
+ Locate the window of tiles in the given downsampled mask. Output Convention: (left, top, right, bottom).
+ Coordinates are half-open as [left, right) and [top, bottom).
"""
__rp_list: List
__mask: np.ndarray
__mask_pil: Image.Image
__tissue_thresh: float
- __windows_on_mask: List[List[Tuple[int, int, int, int]]]
+ # note that the tile size on the corresponding downsampled masks may no longer be integer, therefore cause
+ # loss of precision when convert back to original tile size after working on mask
+ __windows_on_mask: List[List[TYPE_BBOX_FLOAT]]
__windows_on_original_image: List[List[Tuple[int, int, int, int]]]
@property
@@ -78,49 +77,6 @@ def _rp_list(self) -> List:
setattr(self, attr_name, regionprops(self.__mask))
return getattr(self, attr_name)
- def _tile_windows_on_mask(self) -> List[List[Tuple[int, int, int, int]]]:
- """Helper function to locate the windows of each region in format of (left, top, right, bottom)
- Returns:
- List of List of (left, top, right, bottom), nested by connected regions in the mask
- """
- result_list: List[List[Tuple[int, int, int, int]]] = []
- # loop the regionprop list
- for region in self._rp_list:
- # get bounding box of the individual region
- rp_bbox = region.bbox
- # get list of possible tile bounding boxes within the region bounding box, computed from
- # tile size, stride, and tissue thresh
- windows: List[Tuple[int, int, int, int]] = MaskTileWindows.rp_tile_windows_on_mask(self.mask_pil,
- rp_bbox,
- self.work_tile_size,
- self.work_stride,
- self.__tissue_thresh)
- # result_list += windows
- result_list.append(windows)
- return result_list
-
- @property
- def windows_on_mask(self) -> List[List[Tuple[int, int, int, int]]]:
- """
- Returns:
- Obtain the cached tile windows on the given mask. Results are cached.
- """
- if not hasattr(self, '__windows_on_mask') or self.__windows_on_mask is None:
- self.__windows_on_mask = self._tile_windows_on_mask()
- return self.__windows_on_mask
-
- @property
- def windows_on_original_image(self) -> List[List[Tuple[int, int, int, int]]]:
- """Zoom the windows from the mask (which is often downsampled) to the original image, using the defined
- size factor
- Returns:
- Zoomed windows on the original image (left, top, right, bottom)
- """
- if not hasattr(self, '__windows_on_original_image') or self.__windows_on_original_image is None:
- self.__windows_on_original_image = MaskTileWindows.__window_list_resize(self.windows_on_mask,
- self.__size_factor)
- return self.__windows_on_original_image
-
def __init_mask(self, mask: np.ndarray):
self.__mask = mask
@@ -140,89 +96,34 @@ def __init__(self, mask: np.ndarray, *, work_tile_size: int, work_stride: int,
self.__size_factor = size_factor
self.__tissue_thresh = tissue_thresh
- @staticmethod
- def max_tile_bbox_top_left_coord(rp_bbox: Tuple[int, int, int, int], work_tile_size: int, work_stride: int):
- """ find the coords of the top/left corner of the most right / bottom tile ever possible given the current
- size and stride
- Args:
- rp_bbox: [top, left, bottom, right]. Half-open -- [Left, Right) and [Top, Bottom). Note that this is the
- convention of sklearn's region properties, which is different to the (left, top, right, bottom) used by
- PIL or OpenSlide
- work_tile_size: Tile size on the working mask, which might be downsampled.
- work_stride: Stride size on the working mask, which might be downsampled.
- Returns:
- Tuple[int, int]
- """
- assert work_stride > 0
- assert work_tile_size > 0
-
- # not for skimage regionprops, the bbox is half-open at the bottom / right coordinates.
- # [left, right) and [top, bottom). Hence, the "+1" operation below for coord computation
- # is already priced-in
- top_rp, left_rp, bottom_rp, right_rp = rp_bbox
- # start + n_step * stride + tile_size = bottom/rightmost --> (rp_limit - tile_size) // stride = max step
- max_step_horiz = (right_rp - left_rp - work_tile_size) // work_stride
- max_step_vert = (bottom_rp - top_rp - work_tile_size) // work_stride
- tile_max_left = left_rp + max_step_horiz * work_stride
- tile_max_top = top_rp + max_step_vert * work_stride
-
- assert tile_max_left + work_tile_size - 1 <= right_rp
- assert tile_max_top + work_tile_size - 1 <= bottom_rp
- return tile_max_top, tile_max_left
-
- @staticmethod
- def region_tile_cand_pil_window_on_mask(rp_bbox: Tuple[int, int, int, int],
- work_tile_size: int,
- work_stride: int) -> List[Tuple[int, int, int, int]]:
- """ Split the region given by the region property bounding box into a grid of tile windows. Support overlapping.
- This computes the all possible window given by the rp regardless of the tissue condition. Refinement can be
- performed in further steps.
- Args:
- rp_bbox: sklearn region property style: [top, left, bottom, right]. Half-open
- -- [Left, Right) and [Top, Bottom)
- work_tile_size:
- work_stride:
-
- Returns:
- List of (left, top, right, bottom) tuples.
- """
- top_rp, left_rp, bottom_rp, right_rp = rp_bbox
- # top/left of the right/bottom most tile
- tile_max_top, tile_max_left = MaskTileWindows.max_tile_bbox_top_left_coord(rp_bbox,
- work_tile_size,
- work_stride)
- # obtain the top/left coord of all tile bboxes
- all_tops = np.arange(top_rp, tile_max_top + 1, work_stride, dtype=int)
- all_lefts = np.arange(left_rp, tile_max_left + 1, work_stride, dtype=int)
- def window(left, top, size): return int(left), int(top), int(left + size), int(top + size)
- # get full tile bbox representation
- all_tile_pil_window = [window(left, top, work_tile_size) for left in all_lefts for top in all_tops]
- return all_tile_pil_window
-
@staticmethod
def validate_tile_mask_area_thresh(mask_pil: Image.Image,
- tile_window_on_mask: Tuple[int, int, int, int],
+ tile_window_on_mask: TYPE_BBOX_FLOAT,
tissue_thresh: float) -> bool:
""" Validate whether the given tile window (left, top, right, bottom) contains sufficient tissue. This is
computed by calculating the tissue % in the corresponding mask region.
+ Note that if the coordinates are not int the actual area of region may be different
Args:
mask_pil:
- tile_window_on_mask: List of (left, top, right, bottom)
+ tile_window_on_mask: List of (left, top, right, bottom). Open on right and bottom
tissue_thresh: minimum requirement of tissue percentage
Returns:
True if the window has sufficient tissue.
"""
- # left, top, right, bottom = tile_window
- window_pil = mask_pil.crop(tile_window_on_mask)
+ left, top, right, bottom = tile_window_on_mask
+ # window_on_mask_work = tuple(round(x) for x in tile_window_on_mask)
+ window_on_mask_work = round(left), round(top), round(right), round(bottom)
+ window_pil = mask_pil.crop(window_on_mask_work)
+ # noinspection PyTypeChecker
window_np = np.array(window_pil, copy=False)
window_bool = window_np > 0
return window_bool.mean() >= tissue_thresh
@staticmethod
def _valid_tile_windows_on_mask_helper(mask_pil: Image.Image,
- tile_cand_pil_window_on_mask: List[Tuple[int, int, int, int]],
- tissue_thresh: float) -> List[Tuple[int, int, int, int]]:
+ tile_cand_pil_window_on_mask: List[TYPE_BBOX_FLOAT],
+ tissue_thresh: float) -> List[TYPE_BBOX_FLOAT]:
""" All tile windows with sufficient usable tissue from the grid of window candidates
Args:
mask_pil:
@@ -241,17 +142,49 @@ def _valid_tile_windows_on_mask_helper(mask_pil: Image.Image,
if MaskTileWindows.validate_tile_mask_area_thresh(mask_pil, window, tissue_thresh)]
@staticmethod
- def rp_tile_windows_on_mask(mask_pil, rp_bbox: Tuple[int, int, int, int],
- work_tile_size: int,
- work_stride: int,
- tissue_thresh: float) -> List[Tuple[int, int, int, int]]:
+ def region_tile_cand_pil_window_on_mask(rp_bbox: TYPE_BBOX_INT,
+ work_tile_size: float,
+ work_stride: float) -> List[TYPE_BBOX_FLOAT]:
+ """ Split the region given by the region property bounding box into a grid of tile windows. Support overlapping.
+ This computes the all possible window given by the rp regardless of the tissue condition. Refinement can be
+ performed in further steps.
+ Args:
+ rp_bbox: sklearn region property style: [top, left, bottom, right]. Half-open
+ -- [Left, Right) and [Top, Bottom)
+ work_tile_size:
+ work_stride:
+
+ Returns:
+ List of (left, top, right, bottom) tuples. Half-open
+ """
+ top_rp, left_rp, bottom_rp, right_rp = rp_bbox
+ # top/left of the right/bottom most tile
+ tile_max_top, tile_max_left = MaskTileWindows.max_tile_bbox_top_left_coord(rp_bbox,
+ work_tile_size,
+ work_stride)
+ # obtain the top/left coord of all tile bboxes
+ all_tops = np.arange(top_rp, tile_max_top + 1, work_stride, dtype=int)
+ all_lefts = np.arange(left_rp, tile_max_left + 1, work_stride, dtype=int)
+ # since it's open on right and bottom, right = left + size and bottom = top + size, wherein the right-1
+ # is the actual right most pixel and likewise bottom-1 is the actual bottom-most pixel.
+ def window(left, top, size): return left, top, (left + size), (top + size)
+ # get full tile bbox representation
+ all_tile_pil_window = [window(left, top, work_tile_size) for left in all_lefts for top in all_tops]
+ return all_tile_pil_window
+
+ @staticmethod
+ def rp_tile_windows_on_mask(mask_pil,
+ rp_bbox: TYPE_BBOX_INT,
+ work_tile_size: float,
+ work_stride: float,
+ tissue_thresh: float) -> List[TYPE_BBOX_FLOAT]:
"""Wrapper. Obtain the valid window with sufficient tissue from a list of region property objects based on
a given mask. For each individual region, a list of window in format (left, top, right, bottom) is obtained.
Resulting lists of windows of all regions are nested into a list as the object.
Args:
mask_pil: PIL handle of the downsampled mask
rp_bbox: bounding box of sklearn region properties. Note that its convention is [top, left, bottom, right],
- which is different to the (left, top, right, bottom) in PIL and OpenSlide
+ which is different to the (left, top, right, bottom) in PIL and OpenSlide. Int coords.
work_tile_size: Working tile size on the downsampled mask
work_stride: Working stride size on the downsampled mask
tissue_thresh: Minimum requirement of tissue % in each window
@@ -262,9 +195,92 @@ def rp_tile_windows_on_mask(mask_pil, rp_bbox: Tuple[int, int, int, int],
candidates = MaskTileWindows.region_tile_cand_pil_window_on_mask(rp_bbox, work_tile_size, work_stride)
return MaskTileWindows._valid_tile_windows_on_mask_helper(mask_pil, candidates, tissue_thresh)
+ def _tile_windows_on_mask(self) -> List[List[TYPE_BBOX_FLOAT]]:
+ """Helper function to locate the windows of each region in format of (left, top, right, bottom)
+ Note that to retain the precision the coordinates are in the float form, rather than cast to int.
+ (Noticeably the right/bottom coords due to that size may not be integer)
+ THe actual validation of corresponding bbox regions on mask on the other hand should convert the coords
+ correspondingly.
+ Returns:
+ List of List of (left, top, right, bottom), nested by connected regions in the mask
+ """
+ result_list: List[List[TYPE_BBOX_FLOAT]] = []
+ # loop the regionprop list
+ for region in self._rp_list:
+ # get bounding box of the individual region
+ rp_bbox = region.bbox
+ # get list of possible tile bounding boxes within the region bounding box, computed from
+ # tile size, stride, and tissue thresh
+ windows: List[TYPE_BBOX_FLOAT] = MaskTileWindows.rp_tile_windows_on_mask(self.mask_pil,
+ rp_bbox,
+ self.work_tile_size,
+ self.work_stride,
+ self.__tissue_thresh)
+ # result_list += windows
+ result_list.append(windows)
+ return result_list
+
+ @property
+ def windows_on_mask(self) -> List[List[TYPE_BBOX_FLOAT]]:
+ """
+ Returns:
+ Obtain the cached tile windows on the given mask. Results are cached.
+ """
+ if not hasattr(self, '__windows_on_mask') or self.__windows_on_mask is None:
+ self.__windows_on_mask = self._tile_windows_on_mask()
+ return self.__windows_on_mask
+
+ @property
+ def windows_on_original_image(self) -> List[List[TYPE_BBOX_INT]]:
+ """Zoom the windows from the mask (which is often downsampled) to the original image, using the defined
+ size factor
+ Returns:
+ Zoomed windows on the original image (left, top, right, bottom)
+ """
+ if not hasattr(self, '__windows_on_original_image') or self.__windows_on_original_image is None:
+ self.__windows_on_original_image = MaskTileWindows.__window_list_resize(self.windows_on_mask,
+ self.__size_factor)
+ return self.__windows_on_original_image
+
@staticmethod
- def __window_resize_helper(window_on_mask: Tuple[int, int, int, int], size_factor) -> Tuple[int, int, int, int]:
+ def max_tile_bbox_top_left_coord(rp_bbox: TYPE_BBOX_INT, work_tile_size: float, work_stride: float) \
+ -> Tuple[int, int]:
+ """ find the coords of the top/left corner of the most right / bottom tile ever possible given the current
+ size and stride.
+ Args:
+ rp_bbox: [top, left, bottom, right]. Half-open -- [Left, Right) and [Top, Bottom). Note that this is the
+ convention of sklearn's region properties, which is different to the (left, top, right, bottom) used by
+ PIL or OpenSlide. The bbox of connected tissue regions on mask. Int coords.
+ work_tile_size: Tile size on the working mask, which might be downsampled.
+ work_stride: Stride size on the working mask, which might be downsampled.
+ Returns:
+ Tuple[int, int]
+ """
+ assert work_stride > 0, f"work stride must be greater than 0 - got {work_stride}"
+ assert work_tile_size > 0, f"work tile size must be greater than 0 - got {work_tile_size}"
+
+ # not for skimage regionprops, the bbox is half-open at the bottom / right coordinates.
+ # [left, right) and [top, bottom). Hence, the "+1" operation below for coord computation
+ # is already priced-in
+ top_rp, left_rp, bottom_rp, right_rp = rp_bbox
+ # start + n_step * stride + tile_size = bottom/rightmost --> (rp_limit - tile_size) // stride = max step
+ max_step_horiz = (right_rp - left_rp - work_tile_size) / work_stride
+ max_step_vert = (bottom_rp - top_rp - work_tile_size) / work_stride
+ tile_max_left = left_rp + max_step_horiz * work_stride
+ tile_max_top = top_rp + max_step_vert * work_stride
+
+ assert round(tile_max_left + work_tile_size) <= right_rp,\
+ f"left + size check" \
+ f" {tile_max_left + work_tile_size} = {tile_max_left} + {work_tile_size} <= {right_rp} fail"
+ assert round(tile_max_top + work_tile_size) <= bottom_rp,\
+ f"top + size check" \
+ f" {tile_max_top + work_tile_size} = {tile_max_top} + {work_tile_size} <= {bottom_rp} fail"
+ return int(tile_max_top), int(tile_max_left)
+
+ @staticmethod
+ def __window_resize_helper(window_on_mask: Union[TYPE_BBOX_FLOAT, TYPE_BBOX_INT], size_factor) -> TYPE_BBOX_INT:
"""Helper function to zoom the window coordinates on downsampled mask to the original sized image.
+ Convert back to int.
Args:
window_on_mask: (left, top, right, bottom)
size_factor: size_factor = img_size / mask_size
@@ -277,7 +293,7 @@ def __window_resize_helper(window_on_mask: Tuple[int, int, int, int], size_facto
return left, top, right, bottom
@staticmethod
- def __window_list_resize(window_on_mask: List[List[Tuple[int, int, int, int]]],
+ def __window_list_resize(window_on_mask: Union[List[List[TYPE_BBOX_FLOAT]], List[List[TYPE_BBOX_INT]]],
size_factor: float) -> List[List[Tuple[int, int, int, int]]]:
"""
Args:
@@ -338,7 +354,7 @@ def _tile_windows_helper(mask_use_for_tiles,
mask_w, mask_h = mask.shape[1], mask.shape[0]
size_factor = img_w / mask_w
size_factor_ref = img_h / mask_h
- assert size_factor > 0
+ assert size_factor > 0, f"{size_factor} negative"
if round(size_factor) != round(size_factor_ref):
logging.warning(f"{filename}: Aspect Ratio Mismatch: {img_w, img_h} vs. "
f"{mask_w, mask_h}")
@@ -390,7 +406,8 @@ def clear_tile_window(self, tile_size: int = 256, tile_stride: int = 256,
root_dict.pop(key, None)
@staticmethod
- def __bbox_overlay_helper(img: np.ndarray, windows_grouped_by_region: List[List[Tuple[int, int, int, int]]],
+ def __bbox_overlay_helper(img: np.ndarray,
+ windows_grouped_by_region: Union[List[List[TYPE_BBOX_INT]], List[List[TYPE_BBOX_FLOAT]]],
outline: str = 'green', width: int = 2) -> Image.Image:
"""
Helper function to draw bbox and overlay to the img thumbnail
@@ -410,6 +427,7 @@ def __bbox_overlay_helper(img: np.ndarray, windows_grouped_by_region: List[List[
for window_list in windows_grouped_by_region:
for window in window_list:
# ImageDraw accepts x0 y0 x1 y1
+ window = tuple(round(x) for x in window)
left, top, right, bottom = window
draw_context.rectangle((left, top, right, bottom), outline=outline, width=width)
@@ -428,8 +446,7 @@ def bbox_overlay(self,
tile_windows: MaskTileWindows = self.tile_windows(mask_use_for_tiles, img_w, img_h,
tile_size_on_img, tile_stride_on_img, tissue_thresh,
force_rewrite=force_rewrite)
- windows_on_mask: List[List[Tuple[int, int, int, int]]] = tile_windows.windows_on_mask
-
+ windows_on_mask: List[List[TYPE_BBOX_FLOAT]] = tile_windows.windows_on_mask
# all properties below are cached
mapping: Dict[Literal[DRAW_TARGET_IMG_THUMB, DRAW_TARGET_MASK], np.ndarray] = {
get_args(DRAW_TARGET_IMG_THUMB)[0]: img_use_for_tiles,
@@ -515,7 +532,7 @@ def valid_tile_extraction(self,
tw: MaskTileWindows = self.tile_windows(mask_use_for_tiles, img_w, img_h,
tile_size, tile_stride, tissue_thresh, force_rewrite=force_rewrite)
window_list_of_regions = tw.windows_on_original_image
- image_handle: WSIImageHandle = s["os_handle"]
+ image_handle: openslide.OpenSlide = s["os_handle"]
valid_window_list_all_regions: List[List[Tuple[int, int, int, int]]] = []
for region_windows in window_list_of_regions:
region_windows: List[Tuple[int, int, int, int]]
@@ -539,6 +556,7 @@ def valid_tile_extraction(self,
def extract(s: BaseImage, params: Dict[PARAMS, Any]):
+ logging.info(f"{s['filename']} - \textract")
with params['lock']:
slide_out = s['outdir']
tile_output_dir = params.get('tile_output', os.path.join(slide_out, 'tiles'))
@@ -549,7 +567,6 @@ def extract(s: BaseImage, params: Dict[PARAMS, Any]):
outline: str = params.get('outline', "green")
width: int = int(params.get('width', 2))
save_image: bool = bool(strtobool(params.get("save_image", "False")))
-
tile_size = int(params.get('tile_size', 256))
tile_stride = int(params.get('tile_stride', 256))
tissue_thresh = float(params.get('tissue_ratio', 0.5))
diff --git a/histoqc/__main__.py b/histoqc/__main__.py
index b2b8fd1..c0bd0c0 100644
--- a/histoqc/__main__.py
+++ b/histoqc/__main__.py
@@ -116,7 +116,7 @@ def main(argv=None):
# --- document configuration in results -----------------------------------
results.add_header(f"start_time:\t{datetime.datetime.now()}")
- results.add_header(f"pipeline: {' '.join(_steps)}")
+ results.add_header(f"pipeline:\t{' '.join(_steps)}")
results.add_header(f"outdir:\t{os.path.realpath(args.outdir)}")
results.add_header(f"config_file:\t{os.path.realpath(args.config) if args.config is not None else 'default'}")
results.add_header(f"command_line_args:\t{' '.join(argv)}")
diff --git a/histoqc/annotations/__init__.py b/histoqc/annotations/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/histoqc/annotations/annot_collection.py b/histoqc/annotations/annot_collection.py
new file mode 100644
index 0000000..30a187c
--- /dev/null
+++ b/histoqc/annotations/annot_collection.py
@@ -0,0 +1,66 @@
+from typing import List, Dict, Union, Type, Tuple, Mapping # Literal, get_args,
+from types import MappingProxyType
+# from shapely.strtree import STRtree
+# from shapely.geometry import box as shapely_box
+from histoqc.import_wrapper.typing import Literal, get_args
+from lazy_property import LazyProperty
+from .annotation.base import Annotation, Region, TYPE_RAW_LABEL
+from .annotation.imagescope import ImageScopeAnnotation
+from .annotation.geojson import GEOJsonAnnotation
+
+
+TYPE_BBOX = Tuple[int, int, int, int]
+
+TYPE_GEO = Literal["geojson"]
+TYPE_IMAGESCOPE = Literal["imagescope"]
+TYPE_JSON = Literal["json"]
+TYPE_XML = Literal["xml"]
+
+TYPE_SUPPORTED_PARSER = Literal[TYPE_GEO, TYPE_IMAGESCOPE, TYPE_JSON, TYPE_XML]
+
+PARSER_BUILDER_MAP: Dict[str, Type[Annotation]] = {
+ get_args(TYPE_GEO)[0]: GEOJsonAnnotation,
+ get_args(TYPE_IMAGESCOPE)[0]: ImageScopeAnnotation,
+ # for HistoQC
+ get_args(TYPE_JSON)[0]: GEOJsonAnnotation, # duplicate
+ get_args(TYPE_XML)[0]: ImageScopeAnnotation,
+}
+
+
+class AnnotCollection:
+ _annotation_list: List[Annotation]
+ _label_to_regions_map: Mapping[TYPE_RAW_LABEL, List[Region]]
+
+ @LazyProperty
+ def all_regions(self) -> List[Region]:
+ region_list = []
+ for annotation in self._annotation_list:
+ region_list += annotation.regions
+ return region_list
+
+ # @LazyProperty
+ # def multipolygons(self) -> MultiPolygon:
+ # for annotation in self._annotation_list:
+
+ def __init__(self, annotation_list: List[Annotation]):
+ self._annotation_list = annotation_list
+ self._label_to_regions_map = self._new_label_to_regions_map()
+
+ @classmethod
+ def build(cls, parser_type: TYPE_SUPPORTED_PARSER, uri: str, label_map: Union[Dict[Union[str, int], int], None]):
+ construct = PARSER_BUILDER_MAP[parser_type]
+ annotation_list = construct.build_from_uri(uri=uri, label_map=label_map)
+ return cls(annotation_list)
+
+ def _new_label_to_regions_map(self) -> Mapping[TYPE_RAW_LABEL, List[Region]]:
+ out_dict: Dict[TYPE_RAW_LABEL, List[Region]] = dict()
+ for region in self.all_regions:
+ region: Region
+ label: TYPE_RAW_LABEL = region['label']
+ out_dict[label] = out_dict.get(label, [])
+ out_dict[label].append(region)
+ return MappingProxyType(out_dict)
+
+ @property
+ def label_to_regions_map(self) -> Mapping[TYPE_RAW_LABEL, List[Region]]:
+ return self._label_to_regions_map
diff --git a/histoqc/annotations/annotation/__init__.py b/histoqc/annotations/annotation/__init__.py
new file mode 100644
index 0000000..b6e690f
--- /dev/null
+++ b/histoqc/annotations/annotation/__init__.py
@@ -0,0 +1 @@
+from . import *
diff --git a/histoqc/annotations/annotation/base.py b/histoqc/annotations/annotation/base.py
new file mode 100644
index 0000000..36da725
--- /dev/null
+++ b/histoqc/annotations/annotation/base.py
@@ -0,0 +1,176 @@
+from abc import ABC, abstractmethod
+from typing import Generic, TypeVar, Dict, Union, List, Tuple, TypedDict
+from lazy_property import LazyProperty
+from shapely.geometry import Polygon, MultiPolygon
+import logging
+
+T = TypeVar("T")
+TYPE_POINT = Tuple[int, int]
+TYPE_POINT_SET = List[TYPE_POINT]
+TYPE_HOLED_SET = Tuple[TYPE_POINT_SET, Union[List[TYPE_POINT_SET], None]]
+# TYPE_HOLED_SET_COLLECTION = List[TYPE_HOLED_SET]
+
+TYPE_LABEL = Union[int, None]
+TYPE_RAW_LABEL = Union[str, None, TYPE_LABEL]
+
+WARNING_NOT_SIMPLE_POLYGON = f"Not a Simple Polygon: buffer of the polygon " \
+ f"with 0-distance resulting multiple polygons." \
+ f"The shape of these polygons may not be identical to" \
+ f"input annotations"
+
+
+class Region(TypedDict):
+ polygon: Polygon
+ point_set: TYPE_HOLED_SET # TYPE_POINT_SET
+ label: TYPE_RAW_LABEL
+ raw_label: TYPE_RAW_LABEL
+ uri: str
+
+
+class Annotation(ABC, Generic[T]):
+ """
+ Annotation --> an atomic annotation that may contain one or multiple regions.
+ One label is assigned to one annotation.
+ """
+
+ _label_map: Union[Dict[TYPE_RAW_LABEL, TYPE_LABEL], None]
+ _ann_data: T
+ _uri: str
+
+ @staticmethod
+ def point_to_int(point_xy: Tuple[Union[float, str], Union[float, str]]) -> TYPE_POINT:
+ raw_x, raw_y = point_xy
+ return int(float(raw_x)), int(float(raw_y))
+
+ @abstractmethod
+ def point_set_list(self) -> List[TYPE_HOLED_SET]:
+ return NotImplemented
+
+ @abstractmethod
+ def label_from_annotation(self) -> TYPE_RAW_LABEL:
+ return NotImplemented
+
+ @staticmethod
+ @abstractmethod
+ def annotation_list_from_uri(uri) -> List[T]:
+ return NotImplemented
+
+ @staticmethod
+ def _enough_points(point_set: TYPE_POINT_SET):
+ return len(point_set) >= 3
+
+ @staticmethod
+ def _sanitized_points_helper(point_set: TYPE_HOLED_SET) -> Union[TYPE_HOLED_SET, None]:
+ outer, inner = point_set
+ # if shell has less than 3 --> discard directly
+ if not Annotation._enough_points(outer):
+ return None
+ inner = [hole for hole in inner if Annotation._enough_points(hole)] if inner is not None else None
+ return outer, inner
+
+ @staticmethod
+ def _sanitized_points(point_set_list: List[TYPE_HOLED_SET]) -> List[TYPE_HOLED_SET]:
+ out_list = []
+ for point_set in point_set_list:
+ sanitized = Annotation._sanitized_points_helper(point_set)
+ if sanitized is None:
+ continue
+ out_list.append(sanitized)
+ return out_list
+
+ @staticmethod
+ def valid_polygon_helper(polygon: Polygon, point_set: TYPE_HOLED_SET) -> Tuple[List[Polygon], List[TYPE_HOLED_SET]]:
+ """
+ In case
+ Returns:
+
+ """
+ if polygon.is_valid:
+ return [polygon, ], [point_set, ]
+
+ valid_poly = polygon.buffer(0)
+ if isinstance(valid_poly, Polygon):
+ return [valid_poly, ], [point_set, ]
+ # if not simple polygon but multiple polygons
+ assert isinstance(valid_poly, MultiPolygon)
+ logging.warning(WARNING_NOT_SIMPLE_POLYGON)
+ # warning
+ polygon_list: List[Polygon] = list(valid_poly.geoms)
+ exterior_list: List[List[TYPE_POINT_SET]] = [list(x.exterior.coords) for x in polygon_list]
+ interior_list: List[List[List[TYPE_POINT_SET]]] = [[list(interior.coords) for interior in x.interiors]
+ for x in polygon_list]
+ point_set_list: List[TYPE_HOLED_SET] = [(outer, inner)
+ for outer, inner in zip(exterior_list, interior_list)]
+ return polygon_list, point_set_list
+
+ @staticmethod
+ def valid_polygon(point_set: TYPE_HOLED_SET) -> Tuple[List[Polygon], List[TYPE_HOLED_SET]]:
+ outer, inner = point_set
+ polygon = Polygon(outer, holes=inner)
+ # if not polygon.is_valid:
+ # return polygon.buffer(0)
+ # assert not isinstance(polygon, MultiPolygon)
+ # return [polygon, ], [point_set, ]
+ return Annotation.valid_polygon_helper(polygon, point_set)
+
+ @staticmethod
+ def regions_accumulate_helper(polygon_list: List[Polygon],
+ point_set_list: List[TYPE_HOLED_SET], label, raw_label, uri) -> List[Region]:
+ return [Region(polygon=polygon, point_set=point_set, label=label, raw_label=raw_label, uri=uri)
+ for polygon, point_set in zip(polygon_list, point_set_list)]
+
+ @LazyProperty
+ def regions(self) -> List[Region]:
+ point_set_list: List[TYPE_HOLED_SET] = self.point_set_list()
+ clean_list = Annotation._sanitized_points(point_set_list)
+ region_list: List[Region] = []
+ for point_set in clean_list:
+ point_set: TYPE_HOLED_SET
+ # polygon: Polygon = Annotation.valid_polygon(point_set)
+ polygon_list, point_set_list = Annotation.valid_polygon(point_set)
+ label = self.label
+ raw_label = self.raw_label
+ uri = self._uri
+ # curr_region = Region(polygon=polygon, point_set=point_set,
+ # label=label, raw_label=raw_label, uri=self._uri)
+ # region_list.append(curr_region)
+ curr_region_list = Annotation.regions_accumulate_helper(polygon_list,
+ point_set_list, label, raw_label, uri)
+ region_list += curr_region_list
+ return region_list
+
+ @staticmethod
+ def _mapped_label(label_map: Dict[TYPE_RAW_LABEL, TYPE_LABEL],
+ label_var: TYPE_RAW_LABEL) -> Union[TYPE_RAW_LABEL, TYPE_LABEL]:
+ if label_map is None or len(label_map) == 0:
+ return label_var
+ assert label_var in label_map
+ return label_map[label_var]
+
+ @LazyProperty
+ def raw_label(self) -> TYPE_RAW_LABEL:
+ return self.label_from_annotation()
+
+ @LazyProperty
+ def label(self) -> Union[TYPE_RAW_LABEL, TYPE_LABEL]:
+ raw_label = self.raw_label
+ label = Annotation._mapped_label(self._label_map, raw_label)
+ return label
+
+ @property
+ def ann_data(self) -> T:
+ return self._ann_data
+
+ def __init__(self, uri: str, ann_data: T, label_map: Dict[Union[str, int], int]):
+ self._uri = uri
+ self._ann_data = ann_data
+ self._label_map = label_map
+
+ @classmethod
+ def build(cls, uri: str, ann_data: T, label_map: Dict[Union[str, int], int]) -> "Annotation":
+ return cls(uri=uri, ann_data=ann_data, label_map=label_map)
+
+ @classmethod
+ def build_from_uri(cls, uri: str, label_map: Union[Dict[TYPE_RAW_LABEL, TYPE_LABEL], None]) -> List["Annotation"]:
+ ann_data_list: List[T] = cls.annotation_list_from_uri(uri)
+ return [cls.build(uri=uri, ann_data=ann_data, label_map=label_map) for ann_data in ann_data_list]
diff --git a/histoqc/annotations/annotation/geojson.py b/histoqc/annotations/annotation/geojson.py
new file mode 100644
index 0000000..cf5bf2e
--- /dev/null
+++ b/histoqc/annotations/annotation/geojson.py
@@ -0,0 +1,83 @@
+from typing import List, Dict, Callable, Any # Literal, get_args
+from histoqc.import_wrapper.typing import Literal, get_args
+from ..io_utils.json import load_json
+from .base import Annotation, TYPE_POINT_SET, TYPE_RAW_LABEL, TYPE_POINT, TYPE_HOLED_SET
+
+TYPE_GEO_MULTIPOLYGON = Literal['MultiPolygon']
+TYPE_GEO_POLYGON = Literal['Polygon']
+TYPE_GEO_LINE_STRING = Literal['LineString']
+
+TYPE_GEOTYPE = Literal[TYPE_GEO_MULTIPOLYGON, TYPE_GEO_POLYGON, TYPE_GEO_LINE_STRING]
+
+
+class GEOJsonAnnotation(Annotation[Dict]):
+ PROP: str = "properties"
+ CLASS: str = "classification"
+ NAME: str = "name"
+
+ """
+ Parses a typical GeoJSON file containing one or more Polygon or MultiPolygon features.
+ These JSON files are the preferred way to serialize QuPath annotations, for example.
+ See https://qupath.readthedocs.io/en/latest/docs/scripting/overview.html#serialization-json
+ """
+
+ @staticmethod
+ def point_set_helper_multipolygon(coordinates: List[List[TYPE_POINT_SET]]) -> List[TYPE_HOLED_SET]:
+ out_list = []
+ for roi in coordinates:
+ out_list += GEOJsonAnnotation.point_set_helper_polygon(roi)
+ return out_list
+
+ @staticmethod
+ def point_set_helper_polygon(coordinates: List[TYPE_POINT_SET]) -> List[TYPE_HOLED_SET]:
+ inner_list = []
+ outer = GEOJsonAnnotation._point_set_single(coordinates[0])
+ inner_source = coordinates[1:]
+ for points in inner_source:
+ points: List[TYPE_POINT]
+ inner_list.append(GEOJsonAnnotation._point_set_single(points))
+ # if len(inner_list) == 0:
+ # inner_list = None
+ holed: TYPE_HOLED_SET = outer, inner_list
+ return [holed]
+
+ @staticmethod
+ def point_set_helper_lines(coordinates: List[TYPE_POINT]) -> List[TYPE_HOLED_SET]:
+ holed_set: TYPE_HOLED_SET = GEOJsonAnnotation._point_set_single(coordinates), None
+ return [holed_set]
+
+ @staticmethod
+ def _point_set_single(coordinates: List[TYPE_POINT]) -> List[TYPE_POINT]:
+ return [Annotation.point_to_int((coord[0], coord[1])) for coord in coordinates]
+
+ @staticmethod
+ def _func_from_geom_type(geom_type: TYPE_GEOTYPE) -> Callable[[Any], List[TYPE_HOLED_SET]]:
+ GEOMETRY_MAP: Dict[str, Callable[[Any], List[TYPE_HOLED_SET]]] = {
+ get_args(TYPE_GEO_MULTIPOLYGON)[0]: GEOJsonAnnotation.point_set_helper_multipolygon,
+ get_args(TYPE_GEO_POLYGON)[0]: GEOJsonAnnotation.point_set_helper_polygon,
+ get_args(TYPE_GEO_LINE_STRING)[0]: GEOJsonAnnotation.point_set_helper_lines,
+ }
+ assert geom_type in GEOMETRY_MAP, f"Unsupported Geometry Type: {geom_type}"
+ return GEOMETRY_MAP[geom_type]
+
+ def point_set_list(self) -> List[TYPE_HOLED_SET]:
+ geometry = self.ann_data['geometry']
+ geom_type = geometry['type']
+ coordinates = geometry['coordinates']
+ func = GEOJsonAnnotation._func_from_geom_type(geom_type)
+ return func(coordinates)
+
+ def label_from_annotation(self) -> TYPE_RAW_LABEL:
+ prop = self.ann_data.get(GEOJsonAnnotation.PROP)
+ classification = prop.get(GEOJsonAnnotation.CLASS)
+ if classification is not None:
+ return classification.get(GEOJsonAnnotation.NAME)
+ return None
+
+ @staticmethod
+ def annotation_list_from_uri(uri) -> List[Dict]:
+ data = load_json(uri)
+ if isinstance(data, Dict):
+ return [data]
+ assert isinstance(data, List)
+ return data
diff --git a/histoqc/annotations/annotation/imagescope.py b/histoqc/annotations/annotation/imagescope.py
new file mode 100644
index 0000000..f5e0b5b
--- /dev/null
+++ b/histoqc/annotations/annotation/imagescope.py
@@ -0,0 +1,93 @@
+from typing import List, Dict, Union
+from xml.etree import ElementTree as ET
+from xml.etree.ElementTree import Element
+from .base import Annotation, TYPE_POINT_SET, TYPE_RAW_LABEL, TYPE_POINT, TYPE_HOLED_SET
+
+
+class ImageScopeAnnotation(Annotation[Element]):
+ ANNOTATION_TAG_NAME = "Annotation"
+
+ TAG_REGION_ALL = "Regions"
+ TAG_REGION = "Region"
+ VERTICES: str = 'Vertices'
+ VERTEX: str = "Vertex"
+ X: str = 'X'
+ Y: str = 'Y'
+
+ CLASS_ATTR = "Name"
+ _ann_data: Element
+
+ """
+ Parses the xml file to get those annotations as lists of verticies
+ xmlMask will create a mask that is true inside the annotated region described in the specified xml file.
+ The xml file must follow the ImageScope format, the minimal components of which are:
+ ```
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ```
+ With more or blocks as needed for additional annotations. There is no functional difference
+ between multiple blocks and one blocks with multiple blocks
+ """
+
+ def label_from_annotation(self) -> TYPE_RAW_LABEL:
+ """
+ Read the label of the whole annotated region.
+ Assume the annotation class label is under element's "Name" attribute.
+
+ Returns:
+ int
+ """
+ return self._ann_data.get(ImageScopeAnnotation.CLASS_ATTR)
+
+ @staticmethod
+ def vertex_from_node(vertex: Element) -> TYPE_POINT:
+ raw_x: str = vertex.get(ImageScopeAnnotation.X)
+ raw_y: str = vertex.get(ImageScopeAnnotation.Y)
+ raw_point = (raw_x, raw_y)
+ return Annotation.point_to_int(raw_point)
+
+ def point_set_list(self) -> List[TYPE_HOLED_SET]:
+ """
+ I doubt it is ever standardized how to define holes so herein we just ignore all holes.
+ Returns:
+
+ """
+ out_list = []
+ for regions_all in self.ann_data.findall(ImageScopeAnnotation.TAG_REGION_ALL):
+ for region in regions_all.findall(ImageScopeAnnotation.TAG_REGION):
+ for vertices in region.findall(ImageScopeAnnotation.VERTICES):
+ # the X and Y attributes are float strings --> cast to float first before flooring down to int
+ xy_list: TYPE_POINT_SET = [ImageScopeAnnotation.vertex_from_node(vertex)
+ for vertex in vertices.findall(ImageScopeAnnotation.VERTEX)]
+ holes = None
+ holed_point_set: TYPE_HOLED_SET = xy_list, holes
+ out_list.append(holed_point_set)
+ return out_list
+
+ @staticmethod
+ def validated_ann(ann_data: Element):
+ # todo perhaps using xmlschema and a predefined image scope xml as the schema to validate the structure?
+ assert ann_data.tag == ImageScopeAnnotation.ANNOTATION_TAG_NAME
+ return ann_data
+
+ @staticmethod
+ def annotation_list_from_uri(uri) -> List[Element]:
+ tree = ET.parse(uri)
+ root = tree.getroot()
+ return root.findall(ImageScopeAnnotation.ANNOTATION_TAG_NAME)
+
+ @classmethod
+ def build(cls, uri: str, ann_data: Element, label_map: Dict[Union[str, int], int]) -> Annotation:
+ ann_data = ImageScopeAnnotation.validated_ann(ann_data)
+ return super().build(uri=uri, ann_data=ann_data, label_map=label_map)
diff --git a/histoqc/annotations/io_utils/__init__.py b/histoqc/annotations/io_utils/__init__.py
new file mode 100644
index 0000000..b6e690f
--- /dev/null
+++ b/histoqc/annotations/io_utils/__init__.py
@@ -0,0 +1 @@
+from . import *
diff --git a/histoqc/annotations/io_utils/json.py b/histoqc/annotations/io_utils/json.py
new file mode 100644
index 0000000..7fe1b3d
--- /dev/null
+++ b/histoqc/annotations/io_utils/json.py
@@ -0,0 +1,12 @@
+import json
+
+
+def write_json(fname: str, obj, **kwargs):
+ with open(fname, 'w') as root:
+ json.dump(obj, root, **kwargs)
+
+
+def load_json(fname: str, **kwargs):
+ with open(fname, 'r') as root:
+ return json.load(root, **kwargs)
+
diff --git a/histoqc/import_wrapper/__init__.py b/histoqc/import_wrapper/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/histoqc/import_wrapper/helper.py b/histoqc/import_wrapper/helper.py
new file mode 100644
index 0000000..6c7d0ef
--- /dev/null
+++ b/histoqc/import_wrapper/helper.py
@@ -0,0 +1,23 @@
+import importlib
+from typing import Union
+
+
+def dynamic_import(module_name: str, attribute_name: str, surrogate: Union[str, None]):
+ """
+ Dynamically import the components from surrogate module if not available (e.g., `Literal` is only available in
+ typing from python3.8 but typing_extension provides the same functionality for python <=3.7.
+ Args:
+ module_name:
+ attribute_name:
+ surrogate:
+
+ Returns:
+
+ """
+ module = importlib.import_module(module_name)
+ attribute = getattr(module, attribute_name, None)
+ if attribute is not None:
+ return attribute
+ if surrogate is not None:
+ return dynamic_import(surrogate, attribute_name, None)
+ raise ImportError(f"Cannot Import {attribute_name} from either {module_name} or {surrogate}")
\ No newline at end of file
diff --git a/histoqc/_import_openslide.py b/histoqc/import_wrapper/openslide.py
similarity index 100%
rename from histoqc/_import_openslide.py
rename to histoqc/import_wrapper/openslide.py
diff --git a/histoqc/import_wrapper/typing.py b/histoqc/import_wrapper/typing.py
new file mode 100644
index 0000000..07f08b1
--- /dev/null
+++ b/histoqc/import_wrapper/typing.py
@@ -0,0 +1,7 @@
+from .helper import dynamic_import
+from typing import Type, Callable, Tuple, Any
+import typing
+
+__TYPE_GET_ARGS = Callable[[Type, ], Tuple[Any, ...]]
+Literal: Type["typing.Generic"] = dynamic_import("typing", "Literal", "typing_extensions")
+get_args: __TYPE_GET_ARGS = dynamic_import("typing", "get_args", "typing_extensions")
diff --git a/histoqc/ui/UserInterface/index.html b/histoqc/ui/UserInterface/index.html
index f16b534..680fd1e 100644
--- a/histoqc/ui/UserInterface/index.html
+++ b/histoqc/ui/UserInterface/index.html
@@ -46,7 +46,7 @@ HistoQc
diff --git a/histoqc/wsihandles/CuImageHandle.py b/histoqc/wsihandles/CuImageHandle.py
new file mode 100644
index 0000000..c70a415
--- /dev/null
+++ b/histoqc/wsihandles/CuImageHandle.py
@@ -0,0 +1,153 @@
+from PIL.Image import Image as PILImage
+from cucim.clara import CuImage
+from .WSIImageHandle import WSIImageHandle
+from PIL import Image
+from ..import_wrapper.openslide import openslide
+import cupy as cp
+from typing import List, Union, Tuple
+from lazy_property import LazyProperty
+import numpy as np
+from cucim import skimage as c_skimage
+
+
+class CuImageHandle(WSIImageHandle[CuImage, CuImage, cp.ndarray]):
+
+ osh: CuImage
+ fname: str
+
+ # TODO: standalone parser of vendor information
+ dummy_handle: openslide.OpenSlide
+
+ def backend_rgba2rgb(self, img: CuImage) -> CuImage:
+ # todo: verify
+ # todo: it appears that CuImage does not take care of the alpha channel at all.
+ return img
+
+ @classmethod
+ def region_resize_arr(cls, data: CuImage, new_size_wh: Tuple[int, int]) -> cp.ndarray:
+ w, h, *_ = new_size_wh
+ arr = cp.array(data)
+ return c_skimage.transform.resize(arr, output_shape=(h, w))
+
+ def __init__(self, fname: str):
+ self.fname = fname
+ self.osh = CuImage(fname)
+ self.dummy_handle = openslide.OpenSlide(fname)
+
+ @LazyProperty
+ def background_color(self):
+ return f"#{self.dummy_handle.properties.get(openslide.PROPERTY_NAME_BACKGROUND_COLOR, 'ffffff')}"
+
+ @LazyProperty
+ def bounding_box(self):
+ dim_width, dim_height = self.dimensions
+ x = int(self.dummy_handle.properties.get(openslide.PROPERTY_NAME_BOUNDS_X, 0))
+ y = int(self.dummy_handle.properties.get(openslide.PROPERTY_NAME_BOUNDS_Y, 0))
+ width = int(self.dummy_handle.properties.get(openslide.PROPERTY_NAME_BOUNDS_WIDTH, dim_width))
+ height = int(self.dummy_handle.properties.get(openslide.PROPERTY_NAME_BOUNDS_HEIGHT, dim_height))
+ return x, y, width, height
+
+ @LazyProperty
+ def has_bounding_box(self):
+ return (openslide.PROPERTY_NAME_BOUNDS_X in self.dummy_handle.properties
+ and openslide.PROPERTY_NAME_BOUNDS_X in self.dummy_handle.properties
+ and openslide.PROPERTY_NAME_BOUNDS_WIDTH in self.dummy_handle.properties
+ and openslide.PROPERTY_NAME_BOUNDS_HEIGHT in self.dummy_handle.properties
+ )
+
+ @LazyProperty
+ def dimensions(self):
+ return tuple(self.osh.metadata['cucim']['shape'][:2][::-1])
+
+ @LazyProperty
+ def magnification(self):
+ return self.dummy_handle.properties.get("openslide.objective-power") or \
+ self.dummy_handle.properties.get("aperio.AppMag")
+
+ @property
+ def level_count(self):
+ return self.osh.metadata['cucim']['resolutions']['level_count']
+
+ @property
+ def level_dimensions(self):
+ return self.osh.metadata['cucim']['resolutions']['level_dimensions']
+
+ @property
+ def level_downsamples(self):
+ return self.osh.metadata['cucim']['resolutions']['level_downsamples']
+
+ @property
+ def vendor(self):
+ return self.dummy_handle.properties.get("openslide.vendor", "NA")
+
+ @property
+ def mpp_x(self):
+ return self.dummy_handle.properties.get("openslide.mpp-x", "NA")
+
+ @property
+ def mpp_y(self):
+ return self.dummy_handle.properties.get("openslide.mpp-y", "NA")
+
+ @property
+ def comment(self):
+ return self.dummy_handle.properties.get("openslide.comment", "NA")
+
+ @staticmethod
+ def _curate_max_wh(width, height, max_size, aspect_ratio):
+ if height > width:
+ height = max(height, max_size)
+ width = round(height * aspect_ratio)
+ else:
+ width = max(width, max_size)
+ height = round(width / aspect_ratio)
+ return width, height
+
+ def get_thumbnail(self, new_dim):
+ # from openslide
+ downsample = max(*(dim / thumb for dim, thumb in zip(self.dimensions, new_dim)))
+ level = self.get_best_level_for_downsample(downsample)
+ thumb = self.backend_rgba2rgb(self.region_backend((0, 0), level, self.level_dimensions[level]))
+ # resize
+ thumb_cp = cp.array(thumb, copy=False)
+ target_w, target_h = (x // int(downsample) for x in self.dimensions)
+ aspect_ratio = self.dimensions[0] / self.dimensions[1]
+
+ target_w, target_h = self.__class__._curate_max_wh(target_w, target_h, max(new_dim), aspect_ratio)
+ return c_skimage.transform.resize(thumb_cp, output_shape=(target_h, target_w))
+
+ def get_best_level_for_downsample(self, down_factor: float) -> int:
+ """Return the largest level that's smaller than the target downsample factor, consistent with openslide.
+
+ Args:
+ down_factor:
+
+ Returns:
+
+ """
+ level_downsamples_arr = np.asarray(self.level_downsamples)
+ # not exceeding the current downsample level
+ down_indices = np.where(level_downsamples_arr <= down_factor)[0]
+ down_values = level_downsamples_arr[down_indices]
+ # find the indices of the down_indices that points to the best downsample factor value
+ return down_indices[down_values.argmax()]
+
+ def region_backend(self, location, level, size, **kwargs):
+ return self.osh.read_region(location=location, level=level, size=size, **kwargs)
+
+ @staticmethod
+ def backend_to_array(region: Union[CuImage, cp.ndarray]) -> cp.ndarray:
+ return cp.array(region, copy=False)
+
+ @classmethod
+ def backend_to_pil(cls, region: CuImage) -> PILImage:
+ return Image.fromarray(cls.backend_to_array(region).get())
+
+ def read_label(self) -> CuImage:
+ return self.osh.associated_image("label")
+
+ def read_macro(self) -> CuImage:
+ return self.osh.associated_image("macro")
+
+ @staticmethod
+ def grid_stack(grid: List[List[cp.ndarray]]):
+ return cp.concatenate([cp.concatenate(row, axis=0) for row in grid], axis=1)
diff --git a/histoqc/wsihandles/DicomHandle.py b/histoqc/wsihandles/DicomHandle.py
index be153af..e0461bf 100644
--- a/histoqc/wsihandles/DicomHandle.py
+++ b/histoqc/wsihandles/DicomHandle.py
@@ -3,12 +3,13 @@
import numpy as np
from typing import Union
-class DicomHandle(WSIImageHandle):
+
+class DicomHandle(WSIImageHandle[WsiDicom]):
+
def __init__(self, fname):
self.fname = fname
self.osh = WsiDicom.open(fname)
-
# get mmp
self._mpp_x = self.osh.mpp.width
self._mpp_y = self.osh.mpp.height
@@ -33,7 +34,6 @@ def __init__(self, fname):
self._has_bounding_box = False
self._bounding_box = (0, 0, self.osh.size.width, self.osh.size.height)
-
@property
def has_bounding_box(self):
return self._has_bounding_box
@@ -49,7 +49,7 @@ def bounding_box(self):
@property
def dimensions(self):
- return (self.osh.size.width, self.osh.size.height)
+ return self.osh.size.width, self.osh.size.height
@property
def magnification(self) -> Union[float, None]:
diff --git a/histoqc/wsihandles/OpenSlideHandle.py b/histoqc/wsihandles/OpenSlideHandle.py
index fe24185..b007de6 100644
--- a/histoqc/wsihandles/OpenSlideHandle.py
+++ b/histoqc/wsihandles/OpenSlideHandle.py
@@ -1,7 +1,24 @@
+import PIL.Image
+import numpy as np
+
from .WSIImageHandle import WSIImageHandle
-from histoqc._import_openslide import openslide
-from typing import Union
-class OpenSlideHandle(WSIImageHandle):
+from histoqc.import_wrapper.openslide import openslide
+from typing import Union, Tuple, Sequence, List
+from PIL.Image import Image as PILImage
+from .utils import rgba2rgb_pil
+from PIL import Image
+
+
+class OpenSlideHandle(WSIImageHandle[openslide.OpenSlide, PILImage, np.ndarray]):
+ _background_color: str
+ _magnification_factor: str
+ _has_bounding_box: bool
+ fname: str
+ osh: openslide.OpenSlide
+
+ def backend_rgba2rgb(self, img) -> PILImage:
+ return rgba2rgb_pil(img, self.background_color)
+
def __init__(self, fname):
self.fname = fname
self.osh = openslide.OpenSlide(fname)
@@ -13,10 +30,9 @@ def __init__(self, fname):
self.osh.properties.get("aperio.AppMag")
# get background color
- self._backfround_color = f"#{self.osh.properties.get(openslide.PROPERTY_NAME_BACKGROUND_COLOR, 'ffffff')}"
+ self._background_color = f"#{self.osh.properties.get(openslide.PROPERTY_NAME_BACKGROUND_COLOR, 'ffffff')}"
-
- def __get_bounding_box(self):
+ def __get_bounding_box(self) -> Tuple[int, int, int, int]:
(dim_width, dim_height) = self.osh.dimensions
try:
@@ -24,36 +40,38 @@ def __get_bounding_box(self):
y = int(self.osh.properties.get(openslide.PROPERTY_NAME_BOUNDS_Y, 'NA'))
width = int(self.osh.properties.get(openslide.PROPERTY_NAME_BOUNDS_WIDTH, 'NA'))
height = int(self.osh.properties.get(openslide.PROPERTY_NAME_BOUNDS_HEIGHT, 'NA'))
- return (x, y, width, height)
- except:
+ return x, y, width, height
+ # if any attribute is 'NA' and fails the int() cast
+ except ValueError:
self._has_bounding_box = False
- return (0, 0, dim_width, dim_height)
+ return 0, 0, dim_width, dim_height
@property
def background_color(self):
return self._background_color
+
@property
- def has_bounding_box(self):
+ def has_bounding_box(self) -> bool:
return self._has_bounding_box
@property
- def bounding_box(self):
+ def bounding_box(self) -> Tuple[int, int, int, int]:
return self._bounding_box
@property
- def dimensions(self):
+ def dimensions(self) -> Tuple[int, int]:
return self.osh.dimensions
@property
- def magnification(self) -> Union[float, None]:
+ def magnification(self) -> Union[str, None]:
return self._magnification_factor
@property
- def level_count(self):
+ def level_count(self) -> int:
return self.osh.level_count
@property
- def level_dimensions(self):
+ def level_dimensions(self) -> Sequence[Tuple[int, int]]:
return self.osh.level_dimensions
@property
@@ -65,28 +83,47 @@ def vendor(self):
return self.osh.properties.get("openslide.vendor", "NA")
@property
- def mpp_x(self):
+ def mpp_x(self) -> str:
return self.osh.properties.get("openslide.mpp-x", "NA")
@property
- def mpp_y(self):
+ def mpp_y(self) -> str:
return self.osh.properties.get("openslide.mpp-y", "NA")
@property
- def comment(self):
+ def comment(self) -> str:
return self.osh.properties.get("openslide.comment", "NA")
+ @classmethod
+ def region_resize_arr(cls, data: np.ndarray, new_size_wh: Tuple[int, int]):
+ return np.array(Image.fromarray(data).resize(new_size_wh), copy=False)
+
def get_thumbnail(self, new_dim):
return self.osh.get_thumbnail(new_dim)
def get_best_level_for_downsample(self, down_factor):
return self.osh.get_best_level_for_downsample(down_factor)
- def read_region(self, location, level, size):
+ def region_backend(self, location, level, size, **kwargs):
return self.osh.read_region(location, level, size)
+ @staticmethod
+ def backend_to_pil(region: Union[PILImage, np.ndarray]) -> PILImage:
+ if isinstance(region, np.ndarray):
+ return PIL.Image.fromarray(region)
+ return region
+
+ @staticmethod
+ def backend_to_array(region: PILImage) -> np.ndarray:
+ return np.array(region)
+
def read_label(self):
return self.osh.associated_images["label"]
def read_macro(self):
return self.osh.associated_images["macro"]
+
+ @staticmethod
+ def grid_stack(grid: List[List[np.ndarray]]):
+ return np.concatenate([np.concatenate(row, axis=0) for row in grid], axis=1)
+
\ No newline at end of file
diff --git a/histoqc/wsihandles/WSIImageHandle.py b/histoqc/wsihandles/WSIImageHandle.py
index f28fd3a..231c9e2 100644
--- a/histoqc/wsihandles/WSIImageHandle.py
+++ b/histoqc/wsihandles/WSIImageHandle.py
@@ -1,95 +1,166 @@
from abc import ABC, abstractmethod
from importlib import import_module
import logging
+from typing import Sequence, TypeVar, Tuple, List, Union
+from typing_extensions import Generic
+import numpy as np
+from PIL.Image import Image as PILImage
+
+T = TypeVar('T')
+Backend = TypeVar('Backend')
+ARRAY = TypeVar('ARRAY')
WSI_HANDLES = {
- "openslide" : "histoqc.wsihandles.OpenSlideHandle",
- "wsidicom" : "histoqc.wsihandles.DicomHandle"
+ "openslide": "histoqc.wsihandles.OpenSlideHandle",
+ "wsidicom": "histoqc.wsihandles.DicomHandle",
+ "cucim": "histoqc.wsihandles.CuImageHandle",
}
-class WSIImageHandle(ABC):
+
+class WSIImageHandle(ABC, Generic[T, Backend, ARRAY]):
+
+ osh: T
+ fname: str
+
+ @staticmethod
+ def create_wsi_handle(fname, handles) -> "WSIImageHandle":
+ osh = None
+ # get handles list
+ handle_list = handles.split(",")
+ for handle_type in handle_list:
+ handle_type = handle_type.strip()
+ try:
+ handle_name = WSI_HANDLES[handle_type]
+ except KeyError:
+ msg = f"WSIImageHandle: \"{handle_type}\" is not a registered handle"
+ logging.warning(msg)
+ continue
+ class_name = handle_name.split(".")[-1]
+ # dynamically import module by using module name
+ try:
+ module = import_module(handle_name)
+ except ImportError:
+ msg = f"WSIImageHandle: can't import wsi handle module - \"{handle_name}\" "
+ logging.warning(msg)
+ continue
+
+ # dynamically create the instance of wsi handle class
+ try:
+ cls = getattr(module, class_name)
+ except AttributeError:
+ msg = f"WSIImageHandle: can't get wsi handle class - \"{class_name}\" "
+ logging.warning(msg)
+ continue
+
+ # try to read the files by using seleted handle
+ # noinspection PyBroadException
+ try:
+ osh = cls(fname)
+ except Exception:
+ # current wsi handle class doesn't support this file
+ msg = f"WSIImageHandle: \"{class_name}\" doesn't support {fname}"
+ logging.warning(msg)
+ continue
+ if osh is None:
+ # error: no handles support this file
+ msg = f"WSIImageHandle: can't find the support wsi handles - {fname}"
+ logging.error(msg)
+ raise NotImplementedError(msg)
+ return osh
@property
@abstractmethod
- def background_color(self):
+ def background_color(self) -> str:
...
@property
@abstractmethod
- def bounding_box(self):
+ def bounding_box(self) -> Tuple[int, int, int, int]:
...
@property
@abstractmethod
- def has_bounding_box(self):
+ def has_bounding_box(self) -> bool:
...
@property
@abstractmethod
- def dimensions(self):
+ def dimensions(self) -> Tuple[int, int]:
...
@property
@abstractmethod
- def magnification(self):
+ def magnification(self) -> str:
...
@property
@abstractmethod
- def level_count(self):
+ def level_count(self) -> int:
...
@property
@abstractmethod
- def level_dimensions(self):
+ def level_dimensions(self) -> Sequence[Tuple[int, int]]:
...
@property
@abstractmethod
- def level_downsamples(self):
+ def level_downsamples(self) -> Sequence[float]:
...
@property
@abstractmethod
- def vendor(self):
+ def vendor(self) -> str:
...
@property
@abstractmethod
- def level_count(self):
+ def mpp_x(self) -> str:
...
@property
@abstractmethod
- def mpp_x(self):
+ def mpp_y(self) -> str:
...
@property
@abstractmethod
- def mpp_y(self):
+ def comment(self) -> str:
...
- @property
@abstractmethod
- def comment(self):
+ def get_thumbnail(self, new_dim) -> Union[ARRAY, Backend]:
...
- @property
@abstractmethod
- def bounding_box(self):
+ def backend_rgba2rgb(self, img) -> Backend:
+ """Remove the alpha channel with a predefined background color blended into the image.
+
+ Args:
+ img:
+
+ Returns:
+ R
+ """
...
@abstractmethod
- def get_thumbnail(self, new_dim):
+ def region_backend(self, location, level, size, **kwargs):
...
+ @staticmethod
@abstractmethod
- def get_best_level_for_downsample(self, down_factor):
+ def backend_to_pil(region: Union[Backend, ARRAY]) -> PILImage:
...
+ @staticmethod
@abstractmethod
- def read_region(self, location, level, size):
+ def backend_to_array(region: Union[Backend, ARRAY]) -> ARRAY:
...
+
+ def read_region(self, location, level, size, **kwargs) -> PILImage:
+ region = self.region_backend(location=location, level=level, size=size, **kwargs)
+ return self.__class__.backend_to_pil(region)
@abstractmethod
def read_label(self):
@@ -99,48 +170,88 @@ def read_label(self):
def read_macro(self):
...
-
+ @classmethod
+ @abstractmethod
+ def region_resize_arr(cls, data: ARRAY, new_size_wh: Tuple[int, int]) -> ARRAY:
+ ...
+
+ @abstractmethod
+ def get_best_level_for_downsample(self, downsample_factor: float):
+ ...
+
+ def curated_best_level_for_downsample(self, downsample_factor: float) -> Tuple[int, bool]:
+ relative_down_factors_idx = [np.isclose(i / downsample_factor, 1, atol=.01) for i in self.level_downsamples]
+ level = np.where(relative_down_factors_idx)[0]
+ if level.size:
+ return level[0], True
+ return self.get_best_level_for_downsample(downsample_factor), False
+
@staticmethod
- def create_wsi_handle(fname, handles):
- osh = None
- # get handles list
- handle_list = handles.split(",")
- for handle_type in handle_list:
- handle_type = handle_type.strip()
- try:
- handle_name = WSI_HANDLES[handle_type]
- except:
- msg = f"WSIImageHandle: \"{handle_type}\" is not a registered handle"
- logging.warn(msg)
- continue
- class_name = handle_name.split(".")[-1]
- # dynamically import module by using module name
- try:
- module = import_module(handle_name)
- except ImportError:
- msg = f"WSIImageHandle: can't import wsi handle module - \"{handle_name}\" "
- logging.warning(msg)
- continue
-
- # dynamically create the instance of wsi handle class
- try:
- cls = getattr(module, class_name)
- except AttributeError:
- msg = f"WSIImageHandle: can't get wsi handle class - \"{class_name}\" "
- logging.warning(msg)
- continue
-
- # try to read the files by using seleted handle
- try:
- osh = cls(fname)
- except:
- # current wsi handle class doesn't support this file
- msg = f"WSIImageHandle: \"{class_name}\" doesn't support {fname}"
- logging.warning(msg)
- continue
- if osh == None:
- #error: no handles support this file
- msg = f"WSIImageHandle: can't find the support wsi handles - {fname}"
- logging.error(msg)
- raise NotImplementedError(msg)
- return osh
+ @abstractmethod
+ def grid_stack(grid: List[List[ARRAY]]):
+ ...
+
+ def resize_tile_downward(self, target_downsampling_factor, level,
+ win_size: int = 2048, **read_region_kwargs) -> List[List[ARRAY]]:
+
+ (bx, by, bwidth, bheight) = self.bounding_box
+ end_x = bx + bwidth
+ end_y = by + bheight
+
+ closest_downsampling_factor = self.level_downsamples[level]
+
+ # create a new img
+ grid = []
+ for x in range(bx, end_x, win_size):
+ row_piece = []
+ for y in range(by, end_y, win_size):
+ win_width, win_height = [win_size] * 2
+ # Adjust extraction size for endcut
+ if end_x < x + win_width:
+ win_width = end_x - x
+ if end_y < y + win_height:
+ win_height = end_y - y
+
+ win_down_width = int(round(win_width / target_downsampling_factor))
+ win_down_height = int(round(win_height / target_downsampling_factor))
+
+ win_width = int(round(win_width / closest_downsampling_factor))
+ win_height = int(round(win_height / closest_downsampling_factor))
+
+ # TODO Note: this isn't very efficient, and if more efficiency isneeded
+
+ # TODO cont. Separate the public interface read_region -> PIL.Image to the internal data backend
+ # TODO (data_from_region)
+ # TODO e.g., cupy is far more efficient for resize w/ interpolation and antialiasing.
+ closest_region = self.region_backend(location=(x, y), level=level, size=(win_width, win_height),
+ **read_region_kwargs)
+ if np.shape(closest_region)[-1] == 4:
+ closest_region = self.backend_rgba2rgb(closest_region)
+ closest_region_arr = self.__class__.backend_to_array(closest_region)
+ target_region = self.__class__.region_resize_arr(closest_region_arr,
+ (win_down_width, win_down_height))
+ row_piece.append(target_region)
+ # row_piece = np.concatenate(row_piece, axis=0)
+ grid.append(row_piece)
+ # grid = np.concatenate(output, axis=1)
+ #
+ return self.__class__.grid_stack(grid)
+
+ def best_thumb(self, x: int, y: int, dims: Tuple[int, int],
+ target_sampling_factor: float, **read_region_kwargs) -> ARRAY:
+
+ # get thumb from og
+ if not self.has_bounding_box:
+ max_dim = dims[0] if dims[0] > dims[1] else dims[1]
+ return self.__class__.backend_to_array(self.get_thumbnail((max_dim, max_dim)))
+
+ (level, is_exact_level) = self.curated_best_level_for_downsample(target_sampling_factor)
+
+ # check if get the existing level
+ if is_exact_level:
+ backend: Backend = self.read_region((x, y), level, dims)
+ return self.__class__.backend_to_array(self.backend_rgba2rgb(backend)) \
+ if np.shape(backend)[-1] == 4 else self.__class__.backend_to_array(backend)
+ # scale down the thumb img from the next high level
+ else:
+ return self.resize_tile_downward(target_sampling_factor, level, win_size=2048, **read_region_kwargs)
diff --git a/histoqc/wsihandles/utils.py b/histoqc/wsihandles/utils.py
new file mode 100644
index 0000000..fcceafc
--- /dev/null
+++ b/histoqc/wsihandles/utils.py
@@ -0,0 +1,38 @@
+from PIL import Image
+from PIL.Image import Image as PILImage
+from typing import Union, Iterable
+
+
+def hex_to_rgb(hex_color: str):
+ if hex_color.startswith('#'):
+ hex_color = hex_color[1:]
+
+ if len(hex_color) != 6:
+ raise ValueError(f"Invalid hex triplets. Length: {len(hex_color)}")
+
+ rgb_color = tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4))
+ return rgb_color
+
+
+def _validate_numerics(data: Iterable[float]):
+ if not isinstance(data, Iterable):
+ return False
+ return all([isinstance(x, float) for x in data])
+
+
+def validate_color(background_color: Union[str, Iterable[float], float]):
+ # if str -> assume a hex triplet
+ if isinstance(background_color, str):
+ return hex_to_rgb(background_color)
+ # must be numeric, or sequence of numeric
+ if isinstance(background_color, float):
+ return background_color
+ assert _validate_numerics(background_color), (f"background color must be a hex triplet string, a number,"
+ f" or a sequence of numbers")
+ return tuple(x for x in background_color)
+
+
+def rgba2rgb_pil(img: PILImage, background_color) -> PILImage:
+ thumb = Image.new("RGB", img.size, validate_color(background_color))
+ thumb.paste(img, None, img)
+ return thumb
diff --git a/pyproject.toml b/pyproject.toml
index 7fb7892..e068ec1 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -24,3 +24,7 @@ exclude_lines = [
"if MYPY:",
"except ImportError:",
]
+
+[options.extras_require]
+cucim = ["cupy", "cucim"]
+dicom = ["wsidicom"]
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 9d6ee28..05d6a4a 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -12,4 +12,4 @@ typing-extensions
requests~=2.22.0
Pillow~=9.1.0
setuptools~=49.2.1
-wsidicom==0.10.0
+
diff --git a/setup.py b/setup.py
index ac20fff..e6a092b 100644
--- a/setup.py
+++ b/setup.py
@@ -24,6 +24,10 @@
"version_scheme": "post-release",
},
setup_requires=['setuptools_scm'],
+ extras_require={
+ "dicom": ["wsidicom"],
+ "cucim": ["cucim", "cupy"],
+ },
package_data={
'histoqc.config': ['*.ini'],
'histoqc.data': data_files,