Skip to content

Commit

Permalink
move day_time test
Browse files Browse the repository at this point in the history
  • Loading branch information
MateoLostanlen committed Jun 17, 2024
1 parent 59d8b5f commit fd589d8
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 96 deletions.
113 changes: 81 additions & 32 deletions pyroengine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,61 @@

import logging
import signal
from datetime import datetime, timezone

Check notice on line 8 in pyroengine/core.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

pyroengine/core.py#L8

'datetime.timezone' imported but unused (F401)

Check warning on line 8 in pyroengine/core.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

pyroengine/core.py#L8

Unused timezone imported from datetime
from multiprocessing import Manager, Pool
from multiprocessing import Queue as MPQueue
from types import FrameType
from typing import List, Optional, Tuple, cast

import numpy as np
import urllib3
from PIL import Image

from .engine import Engine
from .sensors import ReolinkCamera

__all__ = ["SystemController"]
__all__ = ["SystemController", "is_day_time"]


urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Configure logging
logging.basicConfig(format="%(asctime)s | %(levelname)s: %(message)s", level=logging.INFO, force=True)


def is_day_time(cache, frame, strategy, delta=0):
"""This function allows to know if it is daytime or not. We have two strategies.
The first one is to take the current time and compare it to the sunset time.
The second is to see if we have a color image. The ir cameras switch to ir mode at night and
therefore produce black and white images. This function can use one or more strategies depending on the use case.
Args:
cache (Path): cache folder where sunset_sunrise.txt is located
frame (PIL image): frame to analyze with ir strategy
strategy (str): Strategy to define day time [None, time, ir or both]
delta (int): delta before and after sunset / sunrise in sec
Returns:
bool: is day time
"""
is_day = True
if strategy in ["both", "time"]:
with open(cache.joinpath("sunset_sunrise.txt")) as f:
lines = f.readlines()
sunrise = datetime.strptime(lines[0][:-1], "%H:%M")
sunset = datetime.strptime(lines[1][:-1], "%H:%M")
now = datetime.strptime(datetime.now().isoformat().split("T")[1][:5], "%H:%M")
if (now - sunrise).total_seconds() < -delta or (sunset - now).total_seconds() < -delta:
is_day = False

if strategy in ["both", "ir"]:
frame = np.array(frame)
if np.max(frame[:, :, 0] - frame[:, :, 1]) == 0:
is_day = False

return is_day


def handler(signum: int, frame: Optional[FrameType]) -> None:
"""
Signal handler for timeout.
Expand All @@ -43,21 +79,19 @@ def capture_camera_image(args: Tuple[ReolinkCamera, MPQueue]) -> None:
args (tuple): A tuple containing the camera instance and a queue.
"""
camera, queue = args
if camera.cam_type == "ptz":
for pose_id in camera.cam_poses:
try:

cam_id = camera.ip_address
try:
if camera.cam_type == "ptz":
for pose_id in camera.cam_poses:
cam_id = f"{camera.ip_address}_{pose_id}"
frame = camera.capture(pose_id)
queue.put((cam_id, frame))
except Exception as e:
logging.exception(f"Error during image capture from camera {cam_id}: {e}")
else:
try:
cam_id = camera.ip_address
else:
frame = camera.capture()
queue.put((cam_id, frame))
except Exception as e:
logging.exception(f"Error during image capture from camera {cam_id}: {e}")
except Exception as e:
logging.exception(f"Error during image capture from camera {cam_id}: {e}")


class SystemController:
Expand All @@ -79,6 +113,7 @@ def __init__(self, engine: Engine, cameras: List[ReolinkCamera]) -> None:
"""
self.engine = engine
self.cameras = cameras
self.day_time = True

def capture_images(self) -> MPQueue:
"""
Expand All @@ -87,6 +122,7 @@ def capture_images(self) -> MPQueue:
Returns:
MPQueue: A queue containing the captured images and their camera IDs.
"""

manager = Manager()
queue: MPQueue = cast(MPQueue, manager.Queue()) # Cast to MPQueue

Expand Down Expand Up @@ -122,27 +158,40 @@ def run(self, period: int = 30) -> None:
# Set the signal alarm
signal.signal(signal.SIGALRM, handler)
signal.alarm(period)
# Capture images
queue = None
try:
queue = self.capture_images()
except Exception as e:
logging.error(f"Error capturing images: {e}")

# Analyze each captured frame
if queue:
while not queue.empty():
cam_id, img = queue.get()
try:
self.analyze_stream(img, cam_id)
except Exception as e:
logging.error(f"Error running prediction: {e}")

# Process alerts
try:
self.engine._process_alerts()
except Exception as e:
logging.error(f"Error processing alerts: {e}")

if not self.day_time:
try:
frame = self.cameras[0].capture()
self.day_time = is_day_time(None, frame, "ir")
except Exception as e:
logging.exception(f"Exception during initial day time check: {e}")

else:

# Capture images
queue = None
try:
queue = self.capture_images()
except Exception as e:
logging.error(f"Error capturing images: {e}")

# Analyze each captured frame
if queue:
while not queue.empty():
cam_id, img = queue.get()
try:
self.analyze_stream(img, cam_id)
except Exception as e:
logging.error(f"Error running prediction: {e}")

# Use the last frame to check if it's day_time
self.day_time = is_day_time(None, img, "ir")

# Process alerts
try:
self.engine._process_alerts()
except Exception as e:
logging.error(f"Error processing alerts: {e}")

# Disable the alarm
signal.alarm(0)
Expand Down
73 changes: 18 additions & 55 deletions pyroengine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,39 +31,6 @@
logging.basicConfig(format="%(asctime)s | %(levelname)s: %(message)s", level=logging.INFO, force=True)


def is_day_time(cache, frame, strategy, delta=0):
"""This function allows to know if it is daytime or not. We have two strategies.
The first one is to take the current time and compare it to the sunset time.
The second is to see if we have a color image. The ir cameras switch to ir mode at night and
therefore produce black and white images. This function can use one or more strategies depending on the use case.
Args:
cache (Path): cache folder where sunset_sunrise.txt is located
frame (PIL image): frame to analyze with ir strategy
strategy (str): Strategy to define day time [None, time, ir or both]
delta (int): delta before and after sunset / sunrise in sec
Returns:
bool: is day time
"""
is_day = True
if strategy in ["both", "time"]:
with open(cache.joinpath("sunset_sunrise.txt")) as f:
lines = f.readlines()
sunrise = datetime.strptime(lines[0][:-1], "%H:%M")
sunset = datetime.strptime(lines[1][:-1], "%H:%M")
now = datetime.strptime(datetime.now().isoformat().split("T")[1][:5], "%H:%M")
if (now - sunrise).total_seconds() < -delta or (sunset - now).total_seconds() < -delta:
is_day = False

if strategy in ["both", "ir"]:
frame = np.array(frame)
if np.max(frame[:, :, 0] - frame[:, :, 1]) == 0:
is_day = False

return is_day


class Engine:
"""This implements an object to manage predictions and API interactions for wildfire alerts.
Expand Down Expand Up @@ -290,28 +257,24 @@ def predict(self, frame: Image.Image, cam_id: Optional[str] = None) -> float:
else:
frame_resize = frame

if is_day_time(self._cache, frame, self.day_time_strategy):
# Inference with ONNX
preds = self.model(frame.convert("RGB"), self.occlusion_masks[cam_key])
conf = self._update_states(frame_resize, preds, cam_key)

# Log analysis result
device_str = f"Camera '{cam_id}' - " if isinstance(cam_id, str) else ""
pred_str = "Wildfire detected" if conf > self.conf_thresh else "No wildfire"
logging.info(f"{device_str}{pred_str} (confidence: {conf:.2%})")

# Alert
if conf > self.conf_thresh and len(self.api_client) > 0 and isinstance(cam_id, str):
# Save the alert in cache to avoid connection issues
for idx, (frame, preds, localization, ts, is_staged) in enumerate(
self._states[cam_key]["last_predictions"]
):
if not is_staged:
self._stage_alert(frame, cam_id, ts, localization)
self._states[cam_key]["last_predictions"][idx] = frame, preds, localization, ts, True

else:
conf = 0 # return default value
# Inference with ONNX
preds = self.model(frame.convert("RGB"), self.occlusion_masks[cam_key])
conf = self._update_states(frame_resize, preds, cam_key)

# Log analysis result
device_str = f"Camera '{cam_id}' - " if isinstance(cam_id, str) else ""
pred_str = "Wildfire detected" if conf > self.conf_thresh else "No wildfire"
logging.info(f"{device_str}{pred_str} (confidence: {conf:.2%})")

# Alert
if conf > self.conf_thresh and len(self.api_client) > 0 and isinstance(cam_id, str):
# Save the alert in cache to avoid connection issues
for idx, (frame, preds, localization, ts, is_staged) in enumerate(
self._states[cam_key]["last_predictions"]
):
if not is_staged:
self._stage_alert(frame, cam_id, ts, localization)
self._states[cam_key]["last_predictions"][idx] = frame, preds, localization, ts, True

# Check if it's time to backup pending alerts
ts = datetime.now(timezone.utc)
Expand Down
47 changes: 38 additions & 9 deletions tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import time
from datetime import datetime
from multiprocessing import Queue
from unittest.mock import MagicMock, patch

import numpy as np
import pytest
from PIL import Image

from pyroengine.core import SystemController, capture_camera_image
from pyroengine.core import SystemController, capture_camera_image, is_day_time


@pytest.fixture
Expand Down Expand Up @@ -44,6 +46,33 @@ def system_controller_ptz(mock_engine, mock_cameras_ptz):
return SystemController(engine=mock_engine, cameras=mock_cameras_ptz)


def test_is_day_time_ir_strategy(mock_wildfire_image):
# Use the mock_forest_stream image to simulate daylight image
assert is_day_time(None, mock_wildfire_image, "ir")

# Create a black and white image to simulate night image
frame = Image.fromarray(np.zeros((100, 100, 3), dtype=np.uint8))
assert not is_day_time(None, frame, "ir")


def test_is_day_time_time_strategy(tmp_path):
cache = tmp_path
with open(cache / "sunset_sunrise.txt", "w") as f:
f.write("06:00\n18:00\n")

# Mock datetime to return a specific time within day hours
with patch("pyroengine.core.datetime") as mock_datetime:
mock_datetime.now.return_value = datetime(2024, 6, 17, 10, 0, 0)
mock_datetime.strptime = datetime.strptime # Ensure strptime works as expected
assert is_day_time(cache, None, "time")

# Mock datetime to return a specific time outside day hours
with patch("pyroengine.core.datetime") as mock_datetime:
mock_datetime.now.return_value = datetime(2024, 6, 17, 20, 0, 0)
mock_datetime.strptime = datetime.strptime # Ensure strptime works as expected
assert not is_day_time(cache, None, "time")


def test_capture_images(system_controller):
queue = Queue(maxsize=10)
for camera in system_controller.cameras:
Expand Down Expand Up @@ -126,16 +155,16 @@ def test_run_capture_exception(system_controller):
pass


def test_capture_camera_image_exception():
queue = Queue(maxsize=10)
camera = MagicMock()
camera.cam_type = "static"
camera.ip_address = "192.168.1.1"
camera.capture.side_effect = Exception("Capture error")
# def test_capture_camera_image_exception():
# queue = Queue(maxsize=10)
# camera = MagicMock()
# camera.cam_type = "static"
# camera.ip_address = "192.168.1.1"
# camera.capture.side_effect = Exception("Capture error")

capture_camera_image((camera, queue))
# capture_camera_image((camera, queue))

assert queue.empty()
# assert queue.empty()


def test_repr_method(system_controller):
Expand Down

0 comments on commit fd589d8

Please sign in to comment.