Skip to content

Commit

Permalink
add splitting of image over 10k height
Browse files Browse the repository at this point in the history
  • Loading branch information
ArdaxHz committed Sep 30, 2024
1 parent ea25576 commit d67946d
Showing 1 changed file with 69 additions and 22 deletions.
91 changes: 69 additions & 22 deletions mupl/image_validator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import enum
import io
import logging
import math
import string
import zipfile
from pathlib import Path
from typing import List, Dict, Union, Literal, Optional
from typing import Tuple

import natsort
from PIL import Image, ImageSequence
Expand All @@ -14,6 +16,8 @@

logger = logging.getLogger("mupl")

Image.MAX_IMAGE_PIXELS = None


class Format(enum.Enum):
PNG = 0
Expand All @@ -26,9 +30,9 @@ class Format(enum.Enum):

class ImageProcessorBase:
@staticmethod
def key(x: "str") -> "Union[Literal[0], str]":
def key(x: "tuple") -> "Union[Literal[0], str]":
"""Give a higher priority in sorting for images with their first character a punctuation."""
if Path(x).name[0].lower() in string.punctuation:
if Path(x[0]).name[0].lower() in string.punctuation:
return 0
else:
return x
Expand Down Expand Up @@ -72,6 +76,37 @@ def get_new_format_for_webp(image_bytes: "bytes") -> "str":
# Otherwise, convert to JPEG
return "JPEG"

@staticmethod
def split_tall_images(image_name: "str", image_bytes: "bytes") -> "List[bytes]":
with Image.open(io.BytesIO(image_bytes)) as image:
split_image = []
width, height = image.size

if height < 10_000:
return [image_bytes]

ext = ImageProcessorBase.get_image_format(image_bytes)
ext = ext.name.lower()

max_height = 3000
num_chunks = math.ceil(height / max_height)
logger.info(f"Split {image_name} into {num_chunks} chunks.")
print(f"Split {image_name} into {num_chunks} chunks.")
for i in range(num_chunks):
left = 0
upper = i * max_height
right = width
lower = min((i + 1) * max_height, height)
bbox = (left, upper, right, lower)
working_slice = image.crop(bbox)

img_byte_arr = io.BytesIO()
working_slice.save(img_byte_arr, format=ext)
img_byte_arr = img_byte_arr.getvalue()
split_image.append(img_byte_arr)

return split_image


class ImageProcessor:
def __init__(self, file_name_obj: "FileProcesser", folder_upload: "bool") -> None:
Expand All @@ -88,16 +123,14 @@ def __init__(self, file_name_obj: "FileProcesser", folder_upload: "bool") -> Non
# Renamed file to original file name
self.images_to_upload_names: "Dict[str, str]" = {}
self.converted_images: "Dict[str, str]" = {}
self.processed_images: "List[Tuple[str, bytes]]" = []

self.images_upload_session = NUMBER_OF_IMAGES_UPLOAD

self.info_list = self._get_valid_images()

def _is_image_valid(self, image: "str") -> "bool":
return (
ImageProcessorBase.get_image_format(self._read_image_data(image))
is not None
)
def _is_image_valid(self, image: "str") -> "List[bytes]":
return self._get_bytes_for_upload(image)

def _read_image_data(self, image: "str") -> "bytes":
"""Read the image data from the zip or from the folder."""
Expand All @@ -112,22 +145,27 @@ def _read_zip(self) -> "zipfile.ZipFile":
"""Open zip file in read only mode."""
return zipfile.ZipFile(self.to_upload)

def _get_bytes_for_upload(self, image: "str") -> "bytes":
def _get_bytes_for_upload(self, image: "str") -> "Optional[List[bytes]]":
image_bytes = self._read_image_data(image)
current_format = ImageProcessorBase.get_image_format(image_bytes)
new_format = None
current_format = ImageProcessorBase.get_image_format(image_bytes)

if not current_format:
return None

if current_format == Format.WEBP:
new_format = ImageProcessorBase.get_new_format_for_webp(image_bytes)

if not new_format:
return image_bytes
if new_format:
self.converted_images.update({image: new_format})
logger.info(f"Converted {image} into {new_format}")
with Image.open(io.BytesIO(image_bytes)) as image:
output = io.BytesIO()
image.save(output, new_format)
image_bytes = output.getvalue()

self.converted_images.update({image: new_format})
logger.info(f"Converted {image} into {new_format}")
with Image.open(io.BytesIO(image_bytes)) as image:
output = io.BytesIO()
image.save(output, new_format)
return output.getvalue()
split_image = ImageProcessorBase.split_tall_images(image, image_bytes)
return split_image

def _get_valid_images(self):
"""Validate the files in the archive.
Expand All @@ -138,8 +176,15 @@ def _get_valid_images(self):
else:
to_iter = [x.filename for x in self.myzip.infolist()]

info_list = [image for image in to_iter if self._is_image_valid(image)]
info_list_images_only = natsort.natsorted(info_list, key=ImageProcessorBase.key)
for image in to_iter:
image_valid = self._is_image_valid(image)
if image_valid:
for count, img in enumerate(image_valid, start=1):
self.processed_images.append((f"{image}_{count}", img))

info_list_images_only = natsort.natsorted(
self.processed_images, key=ImageProcessorBase.key
)

self.valid_images_to_upload = [
info_list_images_only[l : l + self.images_upload_session]
Expand All @@ -148,16 +193,18 @@ def _get_valid_images(self):
logger.debug(f"Images to upload: {self.valid_images_to_upload}")
return info_list_images_only

def get_images_to_upload(self, images_to_read: "List[str]") -> "Dict[str, bytes]":
def get_images_to_upload(
self, images_to_read: "List[Tuple[str, bytes]]"
) -> "Dict[str, bytes]":
"""Read the image data from the zip as list."""
logger.debug(f"Reading data for images: {images_to_read}")
# Dictionary to store the image index to the image bytes
files: "Dict[str, bytes]" = {}
for array_index, image in enumerate(images_to_read, start=1):
image_filename = str(Path(image).name)
image_filename = str(Path(image[0]).name)
# Get index of the image in the images array
renamed_file = str(self.info_list.index(image))
# Keeps track of which image index belongs to which image name
self.images_to_upload_names.update({renamed_file: image_filename})
files.update({renamed_file: self._get_bytes_for_upload(image)})
files.update({renamed_file: image[1]})
return files

0 comments on commit d67946d

Please sign in to comment.