-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
e6bed00
commit ba448f5
Showing
13 changed files
with
439 additions
and
279 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
class StorageKeyError(Exception): | ||
""" | ||
Exception raised when the storage key does not exist. | ||
""" | ||
|
||
def __init__(self, key: str) -> None: | ||
self.key = key | ||
super().__init__(f"Storage key '{key}' does not exist.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
from django.forms import CharField, ValidationError | ||
|
||
|
||
class MeanValuesTupleField(CharField): | ||
def to_python(self, value): | ||
try: | ||
values = tuple(map(float, value.split(", "))) | ||
if len(values) != 3: | ||
raise ValueError("The tuple must have exactly three elements.") | ||
if not all(-255 <= v <= 255 for v in values): | ||
raise ValueError("Each value in the tuple must be between -255 and 255.") | ||
return values | ||
except Exception as e: | ||
raise ValidationError( | ||
""" | ||
Enter a valid tuple of three float values separated by commas and spaces, e.g. '0.0, 0.0, 0.0'. | ||
Each value must be between -255 and 255. | ||
""" | ||
) from e | ||
|
||
def prepare_value(self, value): | ||
if isinstance(value, tuple): | ||
return ", ".join(map(str, value)) | ||
return super().prepare_value(value) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
from django.conf import settings | ||
|
||
import cv2 | ||
from constance import config | ||
|
||
from hope_dedup_engine.apps.core.storage import CV2DNNStorage | ||
|
||
|
||
class DNNInferenceManager: | ||
""" | ||
A class to manage the loading and configuration of a neural network model using OpenCV's DNN module. | ||
The DNNInferenceManager class provides functionality to load a neural network model from Caffe files stored in a | ||
specified storage and configure the model with preferred backend and target settings. | ||
""" | ||
|
||
def __init__(self, storage: CV2DNNStorage) -> None: | ||
""" | ||
Loads and configures the neural network model using the specified storage. | ||
Args: | ||
storage (CV2DNNStorage): The storage object from which to load the neural network model. | ||
""" | ||
self.net = cv2.dnn.readNetFromCaffe( | ||
storage.path(settings.PROTOTXT_FILE), | ||
storage.path(settings.CAFFEMODEL_FILE), | ||
) | ||
self.net.setPreferableBackend(int(config.DNN_BACKEND)) | ||
self.net.setPreferableTarget(int(config.DNN_TARGET)) | ||
|
||
def get_model(self) -> cv2.dnn_Net: | ||
""" | ||
Get the loaded and configured neural network model. | ||
Returns: | ||
cv2.dnn_Net: The neural network model loaded and configured by this manager. | ||
""" | ||
return self.net |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from django.conf import settings | ||
|
||
from hope_dedup_engine.apps.core.storage import CV2DNNStorage, HDEAzureStorage, HOPEAzureStorage | ||
from hope_dedup_engine.apps.faces.exceptions import StorageKeyError | ||
|
||
|
||
class StorageManager: | ||
""" | ||
A class to manage different types of storage systems used in the application. | ||
""" | ||
|
||
def __init__(self) -> None: | ||
""" | ||
Initialize the StorageManager. | ||
Raises: | ||
FileNotFoundError: If any of the required DNN model files do not exist in the storage. | ||
""" | ||
self.storages = { | ||
"images": HOPEAzureStorage(), | ||
"cv2dnn": CV2DNNStorage(settings.CV2DNN_PATH), | ||
"encoded": HDEAzureStorage(), | ||
} | ||
for file in (settings.PROTOTXT_FILE, settings.CAFFEMODEL_FILE): | ||
if not self.storages.get("cv2dnn").exists(file): | ||
raise FileNotFoundError(f"File {file} does not exist in storage.") | ||
|
||
def get_storage(self, key: str) -> HOPEAzureStorage | CV2DNNStorage | HDEAzureStorage: | ||
""" | ||
Get the storage object for the given key. | ||
Args: | ||
key (str): The key associated with the desired storage backend. | ||
Returns: | ||
HOPEAzureStorage | CV2DNNStorage | HDEAzureStorage: The storage object associated with the given key. | ||
Raises: | ||
StorageKeyError: If the given key does not exist in the storages dictionary. | ||
""" | ||
if key not in self.storages: | ||
raise StorageKeyError(key) | ||
return self.storages[key] |
Empty file.
105 changes: 105 additions & 0 deletions
105
src/hope_dedup_engine/apps/faces/services/duplication_detector.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
import logging | ||
import os | ||
|
||
import face_recognition | ||
import numpy as np | ||
|
||
from hope_dedup_engine.apps.faces.services.image_processor import ImageProcessor | ||
from hope_dedup_engine.apps.faces.utils.duplicate_groups_builder import DuplicateGroupsBuilder | ||
from hope_dedup_engine.apps.faces.validators import IgnorePairsValidator | ||
from src.hope_dedup_engine.apps.faces.managers.storage import StorageManager | ||
|
||
|
||
class DuplicationDetector: | ||
""" | ||
A class to detect and process duplicate faces in images. | ||
""" | ||
|
||
logger: logging.Logger = logging.getLogger(__name__) | ||
|
||
def __init__(self, filenames: tuple[str], ignore_pairs: tuple[str, str] = tuple()) -> None: | ||
""" | ||
Initialize the DuplicationDetector with the given filenames and ignore pairs. | ||
Args: | ||
filenames (tuple[str]): The filenames of the images to process. | ||
ignore_pairs (tuple[tuple[str, str]], optional): | ||
The pairs of filenames to ignore. Defaults to an empty tuple. | ||
""" | ||
self.filenames = filenames | ||
self.ignore_set = IgnorePairsValidator.validate(ignore_pairs) | ||
self.storages = StorageManager() | ||
self.image_processor = ImageProcessor() | ||
|
||
def _encodings_filename(self, filename: str) -> str: | ||
""" | ||
Generate the filename for the face encodings of a given image. | ||
Args: | ||
filename (str): The filename of the image. | ||
Returns: | ||
str: The filename for the face encodings. | ||
""" | ||
return f"{filename}.npy" | ||
|
||
def _has_encodings(self, filename: str) -> bool: | ||
""" | ||
Check if the face encodings for a given image exist in storage. | ||
Args: | ||
filename (str): The filename of the image. | ||
Returns: | ||
bool: True if the encodings exist, False otherwise. | ||
""" | ||
return self.storages.get_storage("encoded").exists(self._encodings_filename(filename)) | ||
|
||
def _load_encodings_all(self) -> dict[str, list[np.ndarray]]: | ||
""" | ||
Load all face encodings from storage. | ||
Returns: | ||
dict[str, list[np.ndarray]]: A dictionary with filenames as keys and lists of face encodings as values. | ||
""" | ||
data: dict[str, list[np.ndarray]] = {} | ||
try: | ||
_, files = self.storages.get_storage("encoded").listdir("") | ||
for file in files: | ||
if self._has_encodings(filename := os.path.splitext(file)[0]): | ||
with self.storages.get_storage("encoded").open(file, "rb") as f: | ||
data[filename] = np.load(f, allow_pickle=False) | ||
except Exception as e: | ||
self.logger.exception("Error loading encodings.") | ||
raise e | ||
return data | ||
|
||
def find_duplicates(self) -> tuple[tuple[str]]: | ||
""" | ||
Find and return a list of duplicate images based on face encodings. | ||
Returns: | ||
tuple[tuple[str]]: A tuple of tuples, where each inner tuple contains the filenames of duplicate images. | ||
""" | ||
try: | ||
for filename in self.filenames: | ||
if not self._has_encodings(filename): | ||
self.image_processor.encode_face(filename, self._encodings_filename(filename)) | ||
encodings_all = self._load_encodings_all() | ||
|
||
checked = set() | ||
for path1, encodings1 in encodings_all.items(): | ||
for path2, encodings2 in encodings_all.items(): | ||
if path1 < path2 and (path1, path2) not in self.ignore_set: | ||
min_distance = float("inf") | ||
for encoding1 in encodings1: | ||
if ( | ||
current_min := min(face_recognition.face_distance(encodings2, encoding1)) | ||
) < min_distance: | ||
min_distance = current_min | ||
checked.add((path1, path2, min_distance)) | ||
|
||
return DuplicateGroupsBuilder.build(checked) | ||
except Exception as e: | ||
self.logger.exception("Error finding duplicates for images %s", self.filenames) | ||
raise e |
159 changes: 159 additions & 0 deletions
159
src/hope_dedup_engine/apps/faces/services/image_processor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
import logging | ||
import re | ||
from dataclasses import dataclass, field | ||
|
||
from django.conf import settings | ||
from django.core.exceptions import ValidationError | ||
|
||
import cv2 | ||
import face_recognition | ||
import numpy as np | ||
from constance import config | ||
|
||
from hope_dedup_engine.apps.faces.managers.net import DNNInferenceManager | ||
from hope_dedup_engine.apps.faces.managers.storage import StorageManager | ||
|
||
|
||
@dataclass(frozen=True, slots=True) | ||
class FaceEncodingsConfig: | ||
num_jitters: int | ||
model: str | ||
|
||
|
||
@dataclass(frozen=True, slots=True) | ||
class BlobFromImageConfig: | ||
shape: dict[str, int] = field(init=False) | ||
scale_factor: float | ||
mean_values: tuple[float, float, float] | ||
|
||
def __post_init__(self) -> None: | ||
object.__setattr__(self, "shape", self._get_shape()) | ||
mean_values = self.mean_values | ||
if isinstance(mean_values, str): | ||
mean_values = tuple(map(float, mean_values.split(", "))) | ||
object.__setattr__(self, "mean_values", mean_values) | ||
|
||
def _get_shape(self) -> dict[str, int]: | ||
pattern = r"input_shape\s*\{\s*dim:\s*(\d+)\s*dim:\s*(\d+)\s*dim:\s*(\d+)\s*dim:\s*(\d+)\s*\}" | ||
with open(settings.PROTOTXT_FILE, "r") as file: | ||
if match := re.search(pattern, file.read()): | ||
return { | ||
"batch_size": int(match.group(1)), | ||
"channels": int(match.group(2)), | ||
"height": int(match.group(3)), | ||
"width": int(match.group(4)), | ||
} | ||
else: | ||
raise ValidationError("Could not find input_shape in prototxt file.") | ||
|
||
|
||
class ImageProcessor: | ||
""" | ||
A class to handle image processing tasks, including face detection and encoding. | ||
""" | ||
|
||
logger: logging.Logger = logging.getLogger(__name__) | ||
|
||
def __init__(self) -> None: | ||
""" | ||
Initialize the ImageProcessor with the required configurations. | ||
""" | ||
self.storages = StorageManager() | ||
self.net = DNNInferenceManager(self.storages.get_storage("cv2dnn")).get_model() | ||
|
||
self.blob_from_image_cfg = BlobFromImageConfig( | ||
scale_factor=config.BLOB_FROM_IMAGE_SCALE_FACTOR, mean_values=config.BLOB_FROM_IMAGE_MEAN_VALUES | ||
) | ||
self.face_encodings_cfg = FaceEncodingsConfig( | ||
num_jitters=config.FACE_ENCODINGS_NUM_JITTERS, | ||
model=config.FACE_ENCODINGS_MODEL, | ||
) | ||
self.face_detection_confidence: float = config.FACE_DETECTION_CONFIDENCE | ||
self.distance_threshold: float = config.FACE_DISTANCE_THRESHOLD | ||
self.nms_threshold: float = config.NMS_THRESHOLD | ||
|
||
def _get_face_detections_dnn(self, filename: str) -> list[tuple[int, int, int, int]]: | ||
""" | ||
Detect faces in an image using the DNN model. | ||
Args: | ||
filename (str): The filename of the image to process. | ||
Returns: | ||
list[tuple[int, int, int, int]]: A list of tuples representing face regions in the image. | ||
""" | ||
face_regions: list[tuple[int, int, int, int]] = [] | ||
try: | ||
with self.storages.get_storage("images").open(filename, "rb") as img_file: | ||
img_array = np.frombuffer(img_file.read(), dtype=np.uint8) | ||
# Decode image from binary buffer to 3D numpy array (height, width, channels of BlueGreeRed color space) | ||
image = cv2.imdecode(img_array, cv2.IMREAD_COLOR) | ||
(h, w) = image.shape[:2] | ||
# Create a blob (4D tensor) from the image | ||
blob = cv2.dnn.blobFromImage( | ||
image=cv2.resize( | ||
image, dsize=(self.blob_from_image_cfg.shape["height"], self.blob_from_image_cfg.shape["width"]) | ||
), | ||
size=(self.blob_from_image_cfg.shape["height"], self.blob_from_image_cfg.shape["width"]), | ||
scalefactor=self.blob_from_image_cfg.scale_factor, | ||
mean=self.blob_from_image_cfg.mean_values, | ||
) | ||
self.net.setInput(blob) | ||
# Forward pass to get output with shape (1, 1, N, 7), | ||
# where N is the number of faces and 7 are the detection values: | ||
# 1st: image index (0), 2nd: class label (0), 3rd: confidence (0-1), | ||
# 4th-5th: x, y coordinates, 6th-7th: width, height | ||
detections = self.net.forward() | ||
boxes, confidences = [], [] | ||
for i in range(detections.shape[2]): | ||
confidence = detections[0, 0, i, 2] | ||
# Filter out weak detections by ensuring the confidence is greater than the minimum confidence | ||
if confidence > self.face_detection_confidence: | ||
box = (detections[0, 0, i, 3:7] * np.array([w, h, w, h])).astype("int") | ||
boxes.append(box) | ||
confidences.append(confidence) | ||
if boxes: | ||
# Apply non-maxima suppression to suppress weak, overlapping bounding boxes | ||
indices = cv2.dnn.NMSBoxes(boxes, confidences, self.face_detection_confidence, self.nms_threshold) | ||
if indices is not None: | ||
for i in indices: | ||
face_regions.append(tuple(boxes[i])) | ||
except Exception as e: | ||
self.logger.exception("Error processing face detection for image %s", filename) | ||
raise e | ||
return face_regions | ||
|
||
def encode_face(self, filename: str, encodings_filename: str) -> None: | ||
""" | ||
Encode faces detected in an image and save the encodings to storage. | ||
Args: | ||
filename (str): The filename of the image to process. | ||
encodings_filename (str): The filename to save the face encodings. | ||
""" | ||
try: | ||
with self.storages.get_storage("images").open(filename, "rb") as img_file: | ||
image = face_recognition.load_image_file(img_file) | ||
encodings: list = [] | ||
face_regions = self._get_face_detections_dnn(filename) | ||
if not face_regions: | ||
self.logger.error("No face regions detected in image %s", filename) | ||
else: | ||
for region in face_regions: | ||
if isinstance(region, (list, tuple)) and len(region) == 4: | ||
top, right, bottom, left = region | ||
face_encodings = face_recognition.face_encodings( | ||
image, | ||
[(top, right, bottom, left)], | ||
num_jitters=self.face_encodings_cfg.num_jitters, | ||
model=self.face_encodings_cfg.model, | ||
) | ||
encodings.extend(face_encodings) | ||
else: | ||
self.logger.error("Invalid face region %s", region) | ||
with self.storages.get_storage("encoded").open(encodings_filename, "wb") as f: | ||
np.save(f, encodings) | ||
except Exception as e: | ||
self.logger.exception("Error processing face encodings for image %s", filename) | ||
raise e |
Oops, something went wrong.