diff --git a/.env_sample b/.env_sample index 3661206..9924861 100644 --- a/.env_sample +++ b/.env_sample @@ -2,4 +2,8 @@ SLEEP_DATA_PATH=/path/to/sleep/data DEBUG=False OWL=False VIDEO_PATH=/path/to/video -HATCH_IP=192.168.HATCH.IP \ No newline at end of file +HATCH_IP=192.168.HATCH.IP +AREA_X= +AREA_Y= +AREA_WIDTH=1920 +AREA_HEIGHT=1080 \ No newline at end of file diff --git a/.gitignore b/.gitignore index 72fd018..00083d1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,14 +1,12 @@ -* -!.gitignore -!LICENSE -!README.md -!main.py -!helpers.py -!webapp -!webapp/** +__pycache__ node_modules build -!sleep_logs.csv -!*.ipynb -!requirements.txt -!.env_sample +*.log +.env +.ipynb_checkpoints +baby +*.jpg +*.jpeg +*.png +*.avi +*.mp4 \ No newline at end of file diff --git a/helpers.py b/helpers.py deleted file mode 100644 index 9b19fdf..0000000 --- a/helpers.py +++ /dev/null @@ -1,164 +0,0 @@ -import math -import numpy as np -import cv2 -import os -from pyhatchbabyrest import PyHatchBabyRest -from dotenv import load_dotenv - -load_dotenv() - -def euclidean(point, point1): - x = point.x - y = point.y - x1 = point1.x - y1 = point1.y - - return math.sqrt((x1 - x)**2 + (y1 - y)**2) - - -# Given x/y coords of eyes, returns a ratio representing "openness" of eyes -def closed_ratio(img, debug_img, landmarks, left_eye_indices, right_eye_indices): - rh_right = landmarks[right_eye_indices[0]] - rh_left = landmarks[right_eye_indices[8]] - rv_top = landmarks[right_eye_indices[12]] - rv_bottom = landmarks[right_eye_indices[4]] - - lh_right = landmarks[left_eye_indices[0]] - lh_left = landmarks[left_eye_indices[8]] - lv_top = landmarks[left_eye_indices[12]] - lv_bottom = landmarks[left_eye_indices[4]] - - rhDistance = euclidean(rh_right, rh_left) - rvDistance = euclidean(rv_top, rv_bottom) - lvDistance = euclidean(lv_top, lv_bottom) - lhDistance = euclidean(lh_right, lh_left) - reRatio = rhDistance/rvDistance - leRatio = lhDistance/lvDistance - ratio = (reRatio + leRatio)/2 - - # print('reRatio: ', reRatio) - # print('leRatio: ', leRatio) - return ratio - - -def set_hatch(is_awake): - print("attempting to boost hatch brightness") - rest = PyHatchBabyRest(os.getenv('HATCH_IP')) - rest.set_brightness(5) - print("brightness: ", rest.brightness) - - return - - -def check_eyes_open(landmarks, img, debug_img, left_eye_indices, right_eye_indices): - eyes_closed_ratio = closed_ratio(img, debug_img, landmarks, left_eye_indices, right_eye_indices) - ratio_threshold = 5 - if eyes_closed_ratio > ratio_threshold: - return 0 # closed - else: - return 1 # open - - -def get_top_lip_height(landmarks): - # 39 -> 81 - # 0 -> 13 - # 269 -> 311 - - p39 = np.array([landmarks[39].x, landmarks[39].y, landmarks[39].z]) - p81 = np.array([landmarks[81].x, landmarks[81].y, landmarks[81].z]) - p0 = np.array([landmarks[0].x, landmarks[0].y, landmarks[0].z]) - p13 = np.array([landmarks[13].x, landmarks[13].y, landmarks[13].z]) - p269 = np.array([landmarks[269].x, landmarks[269].y, landmarks[269].z]) - p311 = np.array([landmarks[311].x, landmarks[311].y, landmarks[311].z]) - - d1 = np.linalg.norm(p39-p81) - d2 = np.linalg.norm(p0-p13) - d3 = np.linalg.norm(p269-p311) - - # print("average: ", (d1 + d2 + d3) / 3) - return (d1 + d2 + d3) / 3 - - -def get_bottom_lip_height(landmarks): - # 181 -> 178 - # 17 -> 14 - # 405 -> 402 - - p181 = np.array([landmarks[181].x, landmarks[181].y, landmarks[181].z]) - p178 = np.array([landmarks[178].x, landmarks[178].y, landmarks[178].z]) - p17 = np.array([landmarks[17].x, landmarks[17].y, landmarks[17].z]) - p14 = np.array([landmarks[14].x, landmarks[14].y, landmarks[14].z]) - p405 = np.array([landmarks[405].x, landmarks[405].y, landmarks[405].z]) - p402 = np.array([landmarks[402].x, landmarks[402].y, landmarks[402].z]) - - d1 = np.linalg.norm(p181-p178) - d2 = np.linalg.norm(p17-p14) - d3 = np.linalg.norm(p405-p402) - - # print("average: ", (d1 + d2 + d3) / 3) - return (d1 + d2 + d3) / 3 - - -def get_mouth_height(landmarks): - # 178 -> 81 - # 14 -> 13 - # 402 -> 311 - - p178 = np.array([landmarks[178].x, landmarks[178].y, landmarks[178].z]) - p81 = np.array([landmarks[81].x, landmarks[81].y, landmarks[81].z]) - p14 = np.array([landmarks[14].x, landmarks[14].y, landmarks[14].z]) - p13 = np.array([landmarks[13].x, landmarks[13].y, landmarks[13].z]) - p402 = np.array([landmarks[402].x, landmarks[402].y, landmarks[402].z]) - p311 = np.array([landmarks[311].x, landmarks[311].y, landmarks[311].z]) - - d1 = np.linalg.norm(p178-p81) - d2 = np.linalg.norm(p14-p13) - d3 = np.linalg.norm(p402-p311) - - # print("average: ", (d1 + d2 + d3) / 3) - return (d1 + d2 + d3) / 3 - - -def check_mouth_open(landmarks): - top_lip_height = get_top_lip_height(landmarks) - bottom_lip_height = get_bottom_lip_height(landmarks) - mouth_height = get_mouth_height(landmarks) - - # if mouth is open more than lip height * ratio, return true. - ratio = 0.8 - if mouth_height > min(top_lip_height, bottom_lip_height) * ratio: - return 1 - else: - return 0 - - -# Resizes a image and maintains aspect ratio -def maintain_aspect_ratio_resize(self, image, width=None, height=None, inter=cv2.INTER_AREA): - # Grab the image size and initialize dimensions - dim = None - (h, w) = image.shape[:2] - - # Return original image if no need to resize - if width is None and height is None: - return image - - # We are resizing height if width is none - if width is None: - # Calculate the ratio of the height and construct the dimensions - r = height / float(h) - dim = (int(w * r), height) - # We are resizing width if height is none - else: - # Calculate the ratio of the 0idth and construct the dimensions - r = width / float(w) - dim = (width, int(h * r)) - - # Return the resized image - return cv2.resize(image, dim, interpolation=inter) - - -def gamma_correction(self, og, gamma): - invGamma = 1 / gamma - table = [((i / 255) ** invGamma) * 255 for i in range(256)] - table = np.array(table, np.uint8) - return cv2.LUT(og, table) \ No newline at end of file diff --git a/main.py b/main.py index 2c39615..b562c4b 100644 --- a/main.py +++ b/main.py @@ -1,574 +1,239 @@ +import argparse +import fire import cv2 -import numpy as np -import time -from threading import Timer, Lock, Event, Thread +from threading import Thread, Event import os -import mediapipe as mp from collections import deque import _thread import logging -import serial -import queue -import statistics -from dotenv import load_dotenv +import time +from dotenv import dotenv_values # from cast_service import CastSoundService from http.server import HTTPServer, SimpleHTTPRequestHandler -from helpers import check_eyes_open, set_hatch, check_mouth_open, maintain_aspect_ratio_resize, gamma_correction -load_dotenv() +from sleepy_baby import SleepyBaby +from sleepy_baby import helpers # Uncomment if want phone notifications during daytime wakings. # Configuration of telegram API key in this dir also needed. # import telegram_send -logfile = os.getenv("SLEEP_DATA_PATH") + '/sleepy_logs.log' -logging.basicConfig(filename=logfile, - filemode='a+', - format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', - datefmt='%H:%M:%S', - level=logging.INFO) - -# Queue shared between the frame publishing thread and the consuming thread -# This is to get around an underlying bug, described at end of this file. -frame_q = deque(maxlen=20) - -class SleepyBaby(): - - # TODO: break up this class, so big ew - - # General high level heuristics: - # 1) no eyes -> no body found -> baby is awake - # 2) no eyes -> body found -> moving -> baby is awake - # 3) no eyes -> body found -> not moving -> baby is sleeping - # 4) eyes -> eyes open -> baby is awake (disregard body movement) - # 5) eyes -> eyes closed -> movement -> baby is awake - # 6) eyes -> eyes closed -> no movement -> baby is asleep - # 7) eyes -> eyes closed -> mouth open -> baby is awake - - def __init__(self): - self.frame_dim = (1920,1080) - self.next_frame = 0 - self.fps = 30 - self.mpPose = mp.solutions.pose - self.mpFace = mp.solutions.face_mesh - self.pose = self.mpPose.Pose(min_detection_confidence=0.7, min_tracking_confidence=0.7) - # TODO: try turning off refine_landmarks for performance, might not be needed - self.face = self.mpFace.FaceMesh(max_num_faces=1, refine_landmarks=True, min_detection_confidence=0.8, min_tracking_confidence=0.8) - self.mpDraw = mp.solutions.drawing_utils - self.mpDrawStyles = mp.solutions.drawing_styles - - self.eyes_open_q = deque(maxlen=30) - self.awake_q = deque(maxlen=40) - self.movement_q = deque(maxlen=40) - self.eyes_open_state = False - - self.multi_face_landmarks = [] - self.is_awake = False - self.ser = None # serial connection to arduino for controlling demon owl - - # If demon owl mode, setup connection to arduino and cast service for playing audio - if os.getenv("OWL", 'False').lower() in ('true', '1'): - print("\nCAWWWWWW\n") - self.cast_service = CastSoundService() - self.ser = serial.Serial('/dev/ttyACM0', 9600, timeout=0) - - self.top_lip = frozenset([ - (324, 308), (78, 191), (191, 80), (80, 81), (81, 82), - (82, 13), (13, 312), (312, 311), (311, 310), - (310, 415), (415, 308), - (375, 291), (61, 185), (185, 40), (40, 39), (39, 37), - (37, 0), (0, 267), - (267, 269), (269, 270), (270, 409), (409, 291), - ]) - self.bottom_lip = frozenset([ - (61, 146), (146, 91), (91, 181), (181, 84), (84, 17), - (17, 314), (314, 405), (405, 321), (321, 375), - (78, 95), (95, 88), (88, 178), (178, 87), (87, 14), - (14, 317), (317, 402), (402, 318), (318, 324), - ]) - - - # Decorator ensures function that can only be called once every `s` seconds. - def debounce(s): - def decorate(f): - t = None - - def wrapped(*args, **kwargs): - nonlocal t - t_ = time.time() - if t is None or t_ - t >= s: - result = f(*args, **kwargs) - t = time.time() - return result - return wrapped - return decorate - - - @debounce(1) - def throttled_handle_no_eyes_found(self): - logging.info('No face found, depreciate queue') - print('No face found, depreciate queue') - if(len(self.eyes_open_q) > 0): - self.eyes_open_q.popleft() - - - @debounce(1) - def throttled_handle_no_body_found(self): - logging.info('No body found, vote awake') - print('No body found, vote awake') - self.awake_q.append(1) - - - def process_baby_image_models(self, img, debug_img): - results = self.face.process(img) - results_pose = self.pose.process(img) - - body_found = True - if results_pose.pose_landmarks: - # 15 left-wrist, 16 right-wrist - shape = img.shape - left_wrist_coords = (shape[1] * results_pose.pose_landmarks.landmark[15].x, shape[0] * results_pose.pose_landmarks.landmark[15].y) - right_wrist_coords = (shape[1] * results_pose.pose_landmarks.landmark[16].x, shape[0] * results_pose.pose_landmarks.landmark[16].y) - - # print('left wrist: ', left_wrist_coords) - # print('right wrist: ', right_wrist_coords) - - self.movement_q.append((left_wrist_coords, right_wrist_coords)) - - debug_img = cv2.putText(debug_img, "Left wrist", (int(left_wrist_coords[0]), int(left_wrist_coords[1])), 2, 1, (255,0,0), 2, 2) - debug_img = cv2.putText(debug_img, "Right wrist", (int(right_wrist_coords[0]), int(right_wrist_coords[1])), 2, 1, (255,0,0), 2, 2) - - if os.getenv("DEBUG", 'False').lower() in ('true', '1'): - CUTOFF_THRESHOLD = 10 # head and face - MY_CONNECTIONS = frozenset([t for t in self.mpPose.POSE_CONNECTIONS if t[0] > CUTOFF_THRESHOLD and t[1] > CUTOFF_THRESHOLD]) - - # if results_pose.pose_landmarks: # if it finds the points - # for landmark_id, landmark in enumerate(results_pose.pose_landmarks): - # if landmark_id <= CUTOFF_THRESHOLD: - # landmark.visibility = 0 - # self.mpDraw.draw_landmarks(debug_img, results_pose.pose_landmarks, MY_CONNECTIONS) - - for id, lm in enumerate(results_pose.pose_landmarks.landmark): - if id <= CUTOFF_THRESHOLD: - lm.visibility = 0 - continue - h, w,c = debug_img.shape - # print(id, lm) - cx, cy = int(lm.x*w), int(lm.y*h) - cv2.circle(debug_img, (cx, cy), 5, (255,0,0), cv2.FILLED) - - self.mpDraw.draw_landmarks(debug_img, results_pose.pose_landmarks, MY_CONNECTIONS, landmark_drawing_spec=self.mpDraw.DrawingSpec(color=(255, 0, 0), thickness=2, circle_radius=2)) - - # self.mpDraw.draw_landmarks(debug_img, results_pose.pose_landmarks, MY_CONNECTIONS) - # for id, lm in enumerate(results_pose.pose_landmarks.landmark): - # if id < CUTOFF_THRESHOLD: - # continue - # self.mpDraw.draw_landmarks(debug_img, results_pose.pose_landmarks, MY_CONNECTIONS) - # h, w,c = debug_img.shape - # # print(id, lm) - # cx, cy = int(lm.x*w), int(lm.y*h) - # cv2.circle(debug_img, (cx, cy), 5, (255,0,0), cv2.FILLED) +#Load configuration from .env file +config = dotenv_values() +config['DEBUG'] = (config['DEBUG'].lower()=="true") #Transform in bool +config['WORKING_AREA'] = None if config['AREA_X'] == "" else (int(config['AREA_X']), int(config['AREA_Y']), int(config['AREA_WIDTH']), int(config['AREA_HEIGHT'])) + +# # Queue shared between the frame publishing thread and the consuming thread +# # Had to split frame receive and processing into different threads due to underlying FFMPEG issue. Read more here: +# # https://stackoverflow.com/questions/49233433/opencv-read-errorh264-0x8f915e0-error-while-decoding-mb-53-20-bytestream +# # Current solution is to insert into deque on the thread receiving images, and process on the other +frame_q = deque(maxlen=2) +terminate_event = Event() + +#Create a thread to show the results of the processing +def show_video(sb_obj): + logging.info("show_video thread is started") + while terminate_event.is_set() is False: + if sb_obj.processed_frame is not None: + cv2.imshow("VIDEO", cv2.resize(sb_obj.processed_frame, (960,540))) + cv2.waitKey(1) + sb_obj.processed_frame = None else: - body_found = False - self.throttled_handle_no_body_found() + logging.debug("No image to process") + time.sleep(0.3) + logging.info("show_video thread is terminated by event") + + +class app: + """ Sleepy Baby App """ + def __init__(self, + verbose: bool = config['DEBUG'], + log_on_screen: bool = False, + log_path: str = config['SLEEP_DATA_PATH'], + body_min_detection_confidence: float = 0.8, + body_min_tracking_confidence: float = 0.8, + face_min_detection_confidence: float = 0.7, + face_min_tracking_confidence: float = 0.7, + working_area: tuple = config['WORKING_AREA'], + show_frame: bool=True, + show_wrist_position: bool=True, + show_wrist_text: bool=True, + show_body_details: bool=True, + show_face_details: bool=True, + show_progress_bar: bool=True): + logger_kwargs = { + 'format': '%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', + 'datefmt': '%H:%M:%S', + 'level': logging.DEBUG if verbose else logging.INFO + } + if log_on_screen: + logging.basicConfig(**logger_kwargs) + else: + logfile = log_path + '/sleepy_logs.log' + logging.basicConfig(filename=logfile, filemode='a+', **logger_kwargs) + #Load SleepyBaby + logging.info('Initializing...') + self.sleepy_baby = SleepyBaby(body_min_detection_confidence=body_min_detection_confidence, + body_min_tracking_confidence=body_min_tracking_confidence, + face_min_detection_confidence=face_min_detection_confidence, + face_min_tracking_confidence=face_min_tracking_confidence, + debug=verbose) + if working_area: + self.sleepy_baby.set_working_area(working_area[0], working_area[1], working_area[2], working_area[3]) + self.sleepy_baby.set_output(show_frame=show_frame, + show_wrist_position=show_wrist_position, + show_wrist_text=show_wrist_text, + show_body_details=show_body_details, + show_face_details=show_face_details, + show_progress_bar=show_progress_bar) + + logging.info('Initialization complete.') + + def live(self, source:str = config['VIDEO_PATH'], return_image: bool = True): + """ + Run App based on streaming video + + Parameters + ---------- + source : str + url of streaming video + return_image: bool + define if post processed image should be displayed, default: True + """ + self._process_streaming(source, apply_delay_between_frames=False, return_image=return_image) + + def recorded(self, source:str, return_image: bool = True): + """ + Run App based on streaming video + + Parameters + ---------- + source : str + url of streaming video + return_image: bool + define if post processed image should be displayed, default: True + """ + self._process_streaming(source, apply_delay_between_frames=True, return_image=return_image) + + def photo(self, source:str, output_size: tuple=None, max_width:int = 1920, max_height:int = 1080): + self.sleepy_baby.show_progress_bar = False + img = cv2.imread(source) + if (img.shape[1]>max_width) or (img.shape[0]>max_height): + img = helpers.maintain_aspect_ratio_resize(img, max_width, max_height) + output = self.sleepy_baby.processFrame(img) + if output_size: + output = cv2.resize(output, output_size) + cv2.imshow("Output", output) + cv2.waitKey() + + + + def _process_streaming(self, source, apply_delay_between_frames=False, return_image=True, max_width=1920, max_height=1080): + try: + vcap = cv2.VideoCapture(source) + if vcap.isOpened(): + success = True + logging.info("Start receiving frames.") + fps = vcap.get(cv2.CAP_PROP_FPS) + rescale = (vcap.get(cv2.CAP_PROP_FRAME_WIDTH)>max_width) or (vcap.get(cv2.CAP_PROP_FRAME_HEIGHT)>max_height) + self.sleepy_baby.start_thread(frame_q, terminate_event) + if return_image: + self.show_video_thread = Thread(target=show_video, args=(self.sleepy_baby,)) + self.show_video_thread.start() + while success: + success, img = vcap.read() + if success is False: + terminate_event.set() #Error in streaming reading + if rescale: + img = helpers.maintain_aspect_ratio_resize(img, max_width, max_height) + frame_q.append(img) #cv2.cvtColor(img, cv2.COLOR_BGR2RGB)) + if apply_delay_between_frames: + time.sleep(1.0/fps) + logging.error("Error in frame retrieve. Program will be ended") + else: + logging.error("Unable to open the streaming") + except KeyboardInterrupt: + logging.error("User Abort") + finally: + terminate_event.set() + vcap.release() - LEFT_EYE = [362, 382, 381, 380, 374, 373, 390, 249, 263, 466, 388, 387, 386, 385, 384, 398] - RIGHT_EYE = [33, 7, 163, 144, 145, 153, 154, 155, 133, 173, 157, 158, 159, 160, 161, 246] +fire.Fire(app) - if results.multi_face_landmarks: - self.multi_face_landmarks = results.multi_face_landmarks - eyes_are_open = check_eyes_open(results.multi_face_landmarks[0].landmark, img, debug_img, LEFT_EYE, RIGHT_EYE) - # Additionally check if mouth is closed. If not, consider baby crying. Can rely on queue length to ensure - # yawns don't trigger wake - # If mouth is open, override and just consider it, "eyes open", pushing in direction of "wake vote" - if eyes_are_open == 0: # if eyes are closed, then check if mouth is open - mouth_is_open = check_mouth_open(results.multi_face_landmarks[0].landmark) - if mouth_is_open: - logging.info('Eyes closed, mouth open, crying or yawning, consider awake.') - self.eyes_open_q.append(1) - else: - logging.info('Eyes closed, mouth closed, consider sleeping.') - self.eyes_open_q.append(0) - else: - logging.info('Eyes open, consider awake.') - self.eyes_open_q.append(1) - - else: # no face results, interpret this as baby is not in crib, i.e. awake - self.throttled_handle_no_eyes_found() - - return debug_img, body_found - - - # This is placeholder until improve sensitivity of transitioning between waking and sleeping. - # Explanation: Sometimes when baby is waking up, he'll open and close his eyes for a couple of minutes... - # TODO: Fine-tune sensitivity of voting, for now, don't allow toggling between wake & sleep within N seconds - @debounce(180) - def need_to_clean_this_up(self, wake_status, img): - str_timestamp = str(int(time.time())) - sleep_data_base_path = os.getenv("SLEEP_DATA_PATH") - p = sleep_data_base_path + '/' + str_timestamp + '.png' - if wake_status: # woke up - log_string = "1," + str_timestamp + "\n" - print(log_string) - logging.info(log_string) - with open(sleep_data_base_path + '/sleep_logs.csv', 'a+', encoding="utf-8") as f: - f.write(log_string) - cv2.imwrite(p, img) # store off image of when wake/sleep event occurred. Can help with debugging issues - - # if daytime, send phone notification if baby woke up - # now = datetime.datetime.now() - # now_time = now.time() - # if now_time >= ti(7,00) or now_time <= ti(22,00): # day time - # Thread(target=telegram_send.send(messages=["Baby woke up."]), daemon=True).start() - - self.is_awake = True - - if os.getenv("OWL", 'False').lower() in ('true', '1'): - print("MOVE & MAKE NOISE") - logging.info("MOVE & MAKE NOISE") - time.sleep(5) - self.ser.write(bytes(str(999999) + "\n", "utf-8")) - self.cast_service.play_sound() - else: # fell asleep - log_string = "0," + str_timestamp + "\n" - print(log_string) - logging.info(log_string) - with open(sleep_data_base_path + '/sleep_logs.csv', 'a+', encoding="utf-8") as f: - f.write(log_string) - cv2.imwrite(p, img) - self.is_awake = False - - # now = datetime.datetime.now() - # now_time = now.time() - # if now_time >= ti(22,00) or now_time <= ti(8,00): # night time - # set_hatch(self.is_awake) - - - @debounce(10) - def set_wakeness_status(self, img): - if len(self.awake_q): - avg_awake = sum(self.awake_q) / len(self.awake_q) - if avg_awake >= 0.6 and self.is_awake == False: - self.need_to_clean_this_up(True, img) - elif avg_awake < 0.6 and self.is_awake == True: - self.need_to_clean_this_up(False, img) - - - @debounce(1) - def awake_voting_logic(self, debug_img): - if len(self.eyes_open_q) > len(self.eyes_open_q)/2: # dont vote on eyes unless queue is half full - avg = sum(self.eyes_open_q) / len(self.eyes_open_q) - if avg > 0.75: # eyes open - self.eyes_open_state = True - print("Eyes open: vote awake") - logging.info("\nvote awake") - self.awake_q.append(1) - else: # closed - self.eyes_open_state = False - self.awake_q.append(0) - print("Eyes closed: vote sleeping") - logging.info("\nvote sleeping") - else: - print("Not voting on eyes, eye queue too short.") - - - @debounce(1) - def movement_voting_logic(self, debug_img, body_found): - if not body_found: - print('No body found, depreciate movement queue.') - if len(self.movement_q): - self.movement_q.popleft() - - elif len(self.movement_q) > 5: - left_wrist_list = [c[0] for c in self.movement_q] - left_wrist_x_list = [c[0] for c in left_wrist_list] - left_wrist_y_list = [c[1] for c in left_wrist_list] - - right_wrist_list = [c[1] for c in self.movement_q] - right_wrist_x_list = [c[0] for c in right_wrist_list] - right_wrist_y_list = [c[1] for c in right_wrist_list] - - std_left_wrist_x = statistics.pstdev(left_wrist_x_list) - 1 - std_left_wrist_y = statistics.pstdev(left_wrist_y_list) - 1 - - std_right_wrist_x = statistics.pstdev(right_wrist_x_list) - 1 - std_right_wrist_y = statistics.pstdev(right_wrist_y_list) - 1 - - # average it all together and compare to movement threshold to determine if moving - avg_std = (((std_left_wrist_x + std_left_wrist_y)/2) + ((std_right_wrist_x + std_right_wrist_y)/2))/2 - # print('movement left: ', (std_left_wrist_x + std_left_wrist_y)/2) - # print('movement right: ', (std_right_wrist_x + std_right_wrist_y)/2) - # print('movement value: ', avg_std) - if int(avg_std) < 25: - print("No movement, vote sleeping") - logging.info('No movement, vote sleeping') - self.awake_q.append(0) - else: - print("Movement, vote awake") - logging.info("Movement, vote awake") - self.awake_q.append(1) - - - # every N seconds, check if baby is awake & do stuff - @debounce(5) - def periodic_wakeness_check(self): - print('\n', 'Is baby awake:', self.is_awake, '\n') - logging.info('Is baby awake: {}'.format(str(self.is_awake))) - - - def frame_logic(self, raw_img): - img = raw_img - - debug_img = img.copy() - img.flags.writeable = False - converted_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) - - # beef - res = self.process_baby_image_models(converted_img, debug_img) - debug_img = res[0] - body_found = res[1] - - self.awake_voting_logic(debug_img) - self.movement_voting_logic(debug_img, body_found) - self.set_wakeness_status(debug_img) - self.periodic_wakeness_check() - - if os.getenv("DEBUG", 'False').lower() in ('true', '1'): - avg_awake = sum(self.awake_q) / len(self.awake_q) - - # draw progress bar - bar_y_offset = 0 - bar_y_offset = 100 - - bar_width = 200 - w = img.shape[1] - start_point = (int(w/2 - bar_width/2), 350 + bar_y_offset) - - end_point = (int(w/2 + bar_width/2), 370 + bar_y_offset) - adj_avg_awake = 1.0 if avg_awake / .6 >= 1.0 else avg_awake / .6 - progress_end_point = (int(w/2 - bar_width/2 + (bar_width*(adj_avg_awake))), 370 + bar_y_offset) - - color = (255, 255, 117) - progress_color = (0, 0, 255) - thickness = -1 - - debug_img = cv2.rectangle(debug_img, start_point, end_point, color, thickness) - debug_img = cv2.rectangle(debug_img, start_point, progress_end_point, progress_color, thickness) - display_perc = int((avg_awake * 100) / 0.6) - display_perc = 100 if display_perc >= 100 else display_perc - debug_img = cv2.putText(debug_img, str(display_perc) + "%", (int(w/2 - bar_width/2), 330 + bar_y_offset), 2, 1, (255,0,0), 2, 2) - debug_img = cv2.putText(debug_img, "Awake", (int(w/2 - bar_width/2 + 85), 330 + bar_y_offset), 2, 1, (255,0,0), 2, 2) - - return debug_img - - - # This basically does the same thing as the live version, but is very useful for testing - def recorded(self): - cap = cv2.VideoCapture(os.getenv("VIDEO_PATH")) - success, img = cap.read() - while success: - frame = None - while frame is None: - cur_time = time.time() - if cur_time > self.next_frame: - frame = img - self.next_frame = max( - self.next_frame + 1.0 / self.fps, cur_time + 0.5 / self.fps - ) - - success, img = cap.read() - - if all(e is not None for e in [frame, img]): - # bounds to actual run models/analysis on...no need to look for babies outside of the crib - x = 800 - y = 250 - h = 650 - w = 600 - - if img.shape[0] > 1080 and img.shape[1] > 1920: # max res 1080p - img = maintain_aspect_ratio_resize(img, width=self.frame_dim[0], height=self.frame_dim[1]) - - img_to_process = img[y:y+h, x:x+w] - - debug_img = self.frame_logic(img_to_process) - - # reapply cropped and modified/marked up img back to img which is displayed - img[y:y+h, x:x+w] = debug_img - - if os.getenv("DEBUG", 'False').lower() in ('true', '1'): - asleep = sum(self.awake_q) / len(self.awake_q) < 0.6 - text = 'Sleepy Baby' if asleep else 'Wakey Baby' - text_color = (255,191,0) if asleep else (0,140,255) - cv2.putText(img, text, (int(img.shape[0]/2) + 250, int(img.shape[1]/2)), 2, 3, text_color, 2, 2) - - cv2.rectangle(img=img, pt1=(x, y), pt2=(x+w, y+h), color=[153,50,204], thickness=2) - tmp = img[y:y+h, x:x+w] - img = gamma_correction(img, .4) - - for face_landmarks in self.multi_face_landmarks: - - # INDICIES: https://github.com/tensorflow/tfjs-models/blob/838611c02f51159afdd77469ce67f0e26b7bbb23/face-landmarks-detection/src/mediapipe-facemesh/keypoints.ts - # https://github.com/google/mediapipe/blob/master/mediapipe/python/solutions/face_mesh_connections.py - - self.mpDraw.draw_landmarks( - image=tmp, - landmark_list=face_landmarks, - connections=self.mpFace.FACEMESH_RIGHT_EYE, - landmark_drawing_spec=None, - connection_drawing_spec=self.mpDraw.DrawingSpec(color=(255, 150, 255), thickness=1, circle_radius=1)) - # connection_drawing_spec=self.mpDrawStyles - # .get_default_face_mesh_contours_style()) - self.mpDraw.draw_landmarks( - image=tmp, - landmark_list=face_landmarks, - connections=self.mpFace.FACEMESH_LEFT_EYE, - landmark_drawing_spec=None, - connection_drawing_spec=self.mpDraw.DrawingSpec(color=(255, 255, 0), thickness=1, circle_radius=1)) - # connection_drawing_spec=self.mpDrawStyles - # .get_default_face_mesh_contours_style()) - - self.mpDraw.draw_landmarks( - image=tmp, - landmark_list=face_landmarks, - connections=self.top_lip, - landmark_drawing_spec=None,#self.mpDraw.DrawingSpec(color=(255, 150, 255), thickness=2, circle_radius=2), - connection_drawing_spec=self.mpDraw.DrawingSpec(color=(255, 150, 255), thickness=1, circle_radius=1)) - self.mpDraw.draw_landmarks( - image=tmp, - landmark_list=face_landmarks, - connections=self.bottom_lip, - landmark_drawing_spec=None, - connection_drawing_spec=self.mpDraw.DrawingSpec(color=(255, 255, 0), thickness=1, circle_radius=1)) - - img[y:y+h, x:x+w] = tmp - - try: - img = cv2.resize(img, (960, 540)) - cv2.imshow('baby', img) - if cv2.waitKey(1) & 0xFF == ord('q'): - break - except Exception as e: - print("Something went wrong: ", e) - - - def live(self, consumer_q): - img = None - while True: - if len(consumer_q) > 0: - try: - img = consumer_q.pop() # consume image from queue - except IndexError as e: - print('No images in queue: ', e) - continue - - # bounds to actual run models/analysis on...no need to look for babies outside of the crib - x = 700 - y = 125 - h = 1000 - w = 800 - - if img.shape[0] > 1080 and img.shape[1] > 1920: # max res 1080p - img = maintain_aspect_ratio_resize(img, width=self.frame_dim[0], height=self.frame_dim[1]) - - img_to_process = img[y:y+h, x:x+w] - - debug_img = self.frame_logic(img_to_process) - - # reapply cropped and modified/marked up img back to img which is displayed - img[y:y+h, x:x+w] = debug_img - - if os.getenv("DEBUG", 'False').lower() in ('true', '1'): - try: - cv2.rectangle(img=img, pt1=(x, y), pt2=(x+w, y+h), color=[153,50,204], thickness=2) - tmp = img[y:y+h, x:x+w] - img = gamma_correction(img, .4) - - for face_landmarks in self.multi_face_landmarks: - - # INDICIES: https://github.com/tensorflow/tfjs-models/blob/838611c02f51159afdd77469ce67f0e26b7bbb23/face-landmarks-detection/src/mediapipe-facemesh/keypoints.ts - # https://github.com/google/mediapipe/blob/master/mediapipe/python/solutions/face_mesh_connections.py - - self.mpDraw.draw_landmarks( - image=tmp, - landmark_list=face_landmarks, - connections=self.mpFace.FACEMESH_RIGHT_EYE, - landmark_drawing_spec=None, - connection_drawing_spec=self.mpDraw.DrawingSpec(color=(255, 150, 255), thickness=1, circle_radius=1)) - # connection_drawing_spec=self.mpDrawStyles - # .get_default_face_mesh_contours_style()) - - self.mpDraw.draw_landmarks( - image=tmp, - landmark_list=face_landmarks, - connections=self.mpFace.FACEMESH_LEFT_EYE, - landmark_drawing_spec=None, - connection_drawing_spec=self.mpDraw.DrawingSpec(color=(255, 255, 0), thickness=1, circle_radius=1)) - # connection_drawing_spec=self.mpDrawStyles - # .get_default_face_mesh_contours_style()) - - img[y:y+h, x:x+w] = tmp - - img = cv2.resize(img, (960, 540)) - cv2.imshow('baby', maintain_aspect_ratio_resize(img, width=self.frame_dim[0], height=self.frame_dim[1])) - - if cv2.waitKey(1) & 0xFF == ord('q'): - break - except Exception as e: - print("Something went wrong: ", e) - - -#################################### -# TODO: move out of this file, break it up - -print('Initializing...') -sleepy_baby = SleepyBaby() -print('\nInitialization complete.') - - -# Below http server is used for the web app to request latest sleep data -class CORSRequestHandler(SimpleHTTPRequestHandler): - def end_headers(self): - self.send_header('Access-Control-Allow-Origin', '*') - self.send_header('Access-Control-Allow-Methods', 'GET') - self.send_header('Cache-Control', 'no-store, no-cache, must-revalidate') - return super(CORSRequestHandler, self).end_headers() - -def start_server(): - httpd = HTTPServer(('0.0.0.0', 8000), CORSRequestHandler) - httpd.serve_forever() - -_thread.start_new_thread(start_server, ()) - - -def receive(producer_q): - print("Start receiving frames.") - cam_ip = os.environ['CAM_IP'] - cam_pw = os.environ['CAM_PW'] - connect_str = "rtsp://admin:" + cam_pw + "@" + cam_ip - connect_str2 = connect_str + ":554" + "//h264Preview_01_main" # this might be different depending on camera used - - os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp' # Use tcp instead of udp if stream is unstable - c = cv2.VideoCapture(connect_str) - - next_frame = 0 - fps = 30 - while(c.isOpened()): - ret, img = c.read() - if ret: - producer_q.append(img) - - -# Had to split frame receive and processing into different threads due to underlying FFMPEG issue. Read more here: -# https://stackoverflow.com/questions/49233433/opencv-read-errorh264-0x8f915e0-error-while-decoding-mb-53-20-bytestream -# Current solution is to insert into deque on the thread receiving images, and process on the other -p1 = Thread(target=receive, args=(frame_q,)) -p2 = Thread(target=sleepy_baby.live, args=(frame_q,)) -p1.start() -p2.start() - -# Note: to test w/ recorded footage, comment out above threads, and uncomment next line -# TODO: use command line args rather than commenting out code -# sleepy_baby.recorded() \ No newline at end of file + + + + + + +# # Below http server is used for the web app to request latest sleep data +# #class CORSRequestHandler(SimpleHTTPRequestHandler): +# # def end_headers(self): +# # self.send_header('Access-Control-Allow-Origin', '*') +# # self.send_header('Access-Control-Allow-Methods', 'GET') +# # self.send_header('Cache-Control', 'no-store, no-cache, must-revalidate') +# # return super(CORSRequestHandler, self).end_headers() + +# #def start_server(): +# # httpd = HTTPServer(('0.0.0.0', 8000), CORSRequestHandler) +# # httpd.serve_forever() + +# #_thread.start_new_thread(start_server, ()) + + +# #def receive(producer_q): +# # print("Start receiving frames.") +# # cam_ip = os.environ['CAM_IP'] +# # cam_pw = os.environ['CAM_PW'] +# # connect_str = "rtsp://admin:" + cam_pw + "@" + cam_ip +# # connect_str2 = connect_str + ":554" + "//h264Preview_01_main" # this might be different depending on camera used + +# # os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp' # Use tcp instead of udp if stream is unstable +# # c = cv2.VideoCapture(connect_str) + +# # next_frame = 0 +# # fps = 30 +# # while(c.isOpened()): +# # ret, img = c.read() +# # if ret: +# # producer_q.append(img) + + + +# #p1 = Thread(target=receive, args=(frame_q,)) +# #p2 = Thread(target=sleepy_baby.live, args=(frame_q,)) +# #p1.start() +# #p2.start() + + +# # video = 0 + +# # if video: +# # fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v') +# # video = cv2.VideoCapture('test.mp4') +# # success, image = video.read() + +# # writer = cv2.VideoWriter('output.mp4', fourcc, video.get(cv2.CAP_PROP_FPS), (image.shape[0], image.shape[1])) +# # while success: +# # success,image = video.read() +# # if image is not None: +# # frame = Frame(image) +# # analysis, pose, face = sleepy_baby.process_baby_image_models(frame.w_data) +# # frame.add_analysis_frame() +# # frame.add_body_details(pose) +# # frame.add_face_details(face) +# # writer.write(frame.getAugmentedFrame()) +# # cv2.destroyAllWindows() +# # writer.release() +# # video.release() +# # else: +# # img = cv2.imread("test.jpg") +# # frame = Frame(cv2.cvtColor(img, cv2.COLOR_BGR2RGB), 700, 300, 1220,700) +# # analysis, pose, face = sleepy_baby.process_baby_image_models(frame.w_data) +# # frame.add_analysis_frame() +# # frame.add_body_details(pose) +# # frame.add_face_details(face) +# # cv2.imwrite("debug.jpg", frame.getAugmentedFrame()) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 1678df8..74ac39d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ python-dotenv==0.21.1 scikit_learn==1.2.1 statsmodels==0.13.5 tbats==1.1.2 +fire==0.5.0 \ No newline at end of file diff --git a/sleepy_baby/__init__.py b/sleepy_baby/__init__.py new file mode 100644 index 0000000..81bb8d9 --- /dev/null +++ b/sleepy_baby/__init__.py @@ -0,0 +1,330 @@ +import cv2 +import numpy as np +import time +from threading import Timer, Lock, Event, Thread +import os +import mediapipe as mp +from collections import deque +import _thread +import logging +import serial +import queue + +from .frame import Frame +from .helpers import check_eyes_open, check_mouth_open +from .decision_logic import DecisionLogic + + + + +class SleepyBaby: + """ + It analyzes frame provided on process_frame. + + Results are saved inside the object variable "analysis". + It will be used later for decision logic to make the proper evaluation. + """ + + def __init__(self, + body_min_detection_confidence=0.8, + body_min_tracking_confidence=0.8, + face_min_detection_confidence=0.7, + face_min_tracking_confidence=0.7, + refine_landmarks = True, + debug = False): + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.debug("SleepyBaby is starting") + self.debug = debug + self.processed_frame = None #It is used to produce post-processed video + self.process_t = None #Process Thread + self.face = mp.solutions.face_mesh.FaceMesh(max_num_faces=1, + refine_landmarks=refine_landmarks, + min_detection_confidence=face_min_detection_confidence, + min_tracking_confidence=face_min_tracking_confidence) + self.pose = mp.solutions.pose.Pose(min_detection_confidence=body_min_detection_confidence, + min_tracking_confidence=body_min_tracking_confidence) + self.set_working_area() #Set entire area as working area + self.set_output() #Set default values + self.logic = DecisionLogic() + self.logger.info("SleepyBaby is configured") + + def set_output(self, + show_frame=True, + show_wrist_position=True, + show_wrist_text=True, + show_body_details=True, + show_face_details=True, + show_progress_bar=True): + self.show_frame = show_frame + self.show_wrist_position = show_wrist_position + self.show_wrist_text = show_wrist_text + self.show_body_details = show_body_details + self.show_face_details = show_face_details + self.show_progress_bar = show_progress_bar + + def start_thread(self, frame_q, stop_event, pause=0.1, ): + def process_loop(self, frame_q, stop_event, pause): + while stop_event.is_set() is False: + if len(frame_q)>0: + frame = self.processFrame(frame_q.pop(), return_image = self.processed_frame is None) + if frame is not None: + self.processed_frame = frame + time.sleep(pause) + def evaluate_loop(logic, stop_event, pause=1): + while stop_event.is_set() is False: + logic.update() + time.sleep(pause) + self.process_t = Thread(target=process_loop, args=(self, frame_q, stop_event, pause)) + self.process_t.start() + self.evaluate_t = Thread(target=evaluate_loop, args=(self.logic, stop_event)) + self.evaluate_t.start() + + def set_working_area(self, x_offset=0, y_offset=0, width=None, height=None): + self.x_offset = x_offset + self.y_offset = y_offset + self.width = width + self.height = height + self.working_area_inited = True + + def processFrame(self, image, return_image=True): + frame = Frame(image, self.x_offset, self.y_offset, self.width, self.height) + analysis, pose, face = self.process_baby_image_models(frame.w_data) + self.logic.push(analysis) + if return_image: + if self.show_frame: + frame.add_analysis_frame() + if self.show_wrist_position or self.show_wrist_text: + frame.add_wrist_position(pose, self.show_wrist_text) + if self.show_body_details: + frame.add_body_details(pose) + if self.show_face_details: + frame.add_face_details(face) + if self.show_progress_bar: + frame.add_progress_bar(self.logic.avg_awake) + if self.debug: + frame.add_status(self.logic.avg_awake < 0.6) + return frame.getAugmentedFrame() + + def process_baby_image_models(self, frame): + """ + process_baby_image_models analyze frame and get information. + + Results are stored in analysis variable inside object + + Parameters + ---------- + frame : sleepy_baby.frame.Frame + Get Frame object + + Returns + ------- + _type_ + Returns dict with all findings and the objects for pose and face. + """ + + analysis = { + "body_detected": False, + "left_wrist_coords": None, + "right_wrist_coords": None, + "face_detected": False, + "eyes_open": False, + "mouth_open": False + } + results = None + results_pose = self.pose.process(frame) + if results_pose.pose_landmarks: + analysis["body_detected"] = True + # 15 left-wrist, 16 right-wrist + analysis["left_wrist_coords"] = (frame.shape[1] * results_pose.pose_landmarks.landmark[15].x, frame.shape[0] * results_pose.pose_landmarks.landmark[15].y) + analysis["right_wrist_coords"] = (frame.shape[1] * results_pose.pose_landmarks.landmark[16].x, frame.shape[0] * results_pose.pose_landmarks.landmark[16].y) + + results = self.face.process(frame) + if results.multi_face_landmarks: + analysis["face_detected"] = True + analysis["eyes_open"] = check_eyes_open(results.multi_face_landmarks[0].landmark) + analysis["mouth_open"] = check_mouth_open(results.multi_face_landmarks[0].landmark) + else: + analysis["body_found"] = False + return analysis, results_pose.pose_landmarks, results.multi_face_landmarks if results is not None else None + + +class old: + + def __init__(self, x, y, width, height, debug=False): + + + self.eyes_open_q = deque(maxlen=30) + self.awake_q = deque(maxlen=40) + self.movement_q = deque(maxlen=40) + self.eyes_open_state = False + + self.is_awake = False + self.ser = None # serial connection to arduino for controlling demon owl + + self.top_lip = frozenset([ + (324, 308), (78, 191), (191, 80), (80, 81), (81, 82), + (82, 13), (13, 312), (312, 311), (311, 310), + (310, 415), (415, 308), + (375, 291), (61, 185), (185, 40), (40, 39), (39, 37), + (37, 0), (0, 267), + (267, 269), (269, 270), (270, 409), (409, 291), + ]) + self.bottom_lip = frozenset([ + (61, 146), (146, 91), (91, 181), (181, 84), (84, 17), + (17, 314), (314, 405), (405, 321), (321, 375), + (78, 95), (95, 88), (88, 178), (178, 87), (87, 14), + (14, 317), (317, 402), (402, 318), (318, 324), + ]) + + + + + # This basically does the same thing as the live version, but is very useful for testing + def recorded(self): + cap = cv2.VideoCapture(os.getenv("VIDEO_PATH")) + success, img = cap.read() + while success: + frame = None + while frame is None: + cur_time = time.time() + if cur_time > self.next_frame: + frame = img + self.next_frame = max( + self.next_frame + 1.0 / self.fps, cur_time + 0.5 / self.fps + ) + + success, img = cap.read() + + if all(e is not None for e in [frame, img]): + # bounds to actual run models/analysis on...no need to look for babies outside of the crib + x = 800 + y = 250 + h = 650 + w = 600 + + if img.shape[0] > 1080 and img.shape[1] > 1920: # max res 1080p + img = maintain_aspect_ratio_resize(img, width=self.frame_dim[0], height=self.frame_dim[1]) + + img_to_process = img[y:y+h, x:x+w] + + debug_img = self.frame_logic(img_to_process) + + # reapply cropped and modified/marked up img back to img which is displayed + img[y:y+h, x:x+w] = debug_img + + if os.getenv("DEBUG", 'False').lower() in ('true', '1'): + asleep = sum(self.awake_q) / len(self.awake_q) < 0.6 + text = 'Sleepy Baby' if asleep else 'Wakey Baby' + text_color = (255,191,0) if asleep else (0,140,255) + cv2.putText(img, text, (int(img.shape[0]/2) + 250, int(img.shape[1]/2)), 2, 3, text_color, 2, 2) + + cv2.rectangle(img=img, pt1=(x, y), pt2=(x+w, y+h), color=[153,50,204], thickness=2) + tmp = img[y:y+h, x:x+w] + img = gamma_correction(img, .4) + + for face_landmarks in self.multi_face_landmarks: + + # INDICIES: https://github.com/tensorflow/tfjs-models/blob/838611c02f51159afdd77469ce67f0e26b7bbb23/face-landmarks-detection/src/mediapipe-facemesh/keypoints.ts + # https://github.com/google/mediapipe/blob/master/mediapipe/python/solutions/face_mesh_connections.py + + self.mpDraw.draw_landmarks( + image=tmp, + landmark_list=face_landmarks, + connections=self.mpFace.FACEMESH_RIGHT_EYE, + landmark_drawing_spec=None, + connection_drawing_spec=self.mpDraw.DrawingSpec(color=(255, 150, 255), thickness=1, circle_radius=1)) + # connection_drawing_spec=self.mpDrawStyles + # .get_default_face_mesh_contours_style()) + self.mpDraw.draw_landmarks( + image=tmp, + landmark_list=face_landmarks, + connections=self.mpFace.FACEMESH_LEFT_EYE, + landmark_drawing_spec=None, + connection_drawing_spec=self.mpDraw.DrawingSpec(color=(255, 255, 0), thickness=1, circle_radius=1)) + # connection_drawing_spec=self.mpDrawStyles + # .get_default_face_mesh_contours_style()) + + self.mpDraw.draw_landmarks( + image=tmp, + landmark_list=face_landmarks, + connections=self.top_lip, + landmark_drawing_spec=None,#self.mpDraw.DrawingSpec(color=(255, 150, 255), thickness=2, circle_radius=2), + connection_drawing_spec=self.mpDraw.DrawingSpec(color=(255, 150, 255), thickness=1, circle_radius=1)) + self.mpDraw.draw_landmarks( + image=tmp, + landmark_list=face_landmarks, + connections=self.bottom_lip, + landmark_drawing_spec=None, + connection_drawing_spec=self.mpDraw.DrawingSpec(color=(255, 255, 0), thickness=1, circle_radius=1)) + + img[y:y+h, x:x+w] = tmp + + try: + img = cv2.resize(img, (960, 540)) + cv2.imshow('baby', img) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + except Exception as e: + print("Something went wrong: ", e) + + + + + + + + def process_image(self, frame): + """ + This function will process image. + + By default, only a interesting area defined by (x,y,h,w) is considered in processing. + This avoids to waste resources to look for a baby outside crib + + Parameters + ---------- + img : numpy.ndarray + frame to be processed + + Returns + ------- + numpy.ndarray + image with overlays + """ + + #resize image if needed #FIXME: not working + #if frame.shape[0] > self.frame_dim[0] and frame.shape[1] > self.frame_dim[1]: # max res 1080p + # frame = maintain_aspect_ratio_resize(frame, width=self.frame_dim[0], height=self.frame_dim[1]) + + frame.flags.writeable = False #make the original frame read-only + self.frame = frame + working_area = cv2.cvtColor(frame[self.y:self.y+self.h, self.x:self.x+self.w], cv2.COLOR_BGR2RGB) #crop image and create a new image for processing + + analysis = self.process_baby_image_models(working_area) + debug = frame.copy() + debug = self.add_body_details_to_image(debug, analysis) + debug = self.add_progress_bar_to_image(debug, 0.5) + debug = self.add_face_details_to_image(debug, analysis) + return debug + + #self.awake_voting_logic(debug_img) + #self.movement_voting_logic(debug_img, body_found) + #self.set_wakeness_status(debug_img) + #self.periodic_wakeness_check() + + + def live(self, consumer_q): + img = None + while True: + if len(consumer_q) > 0: + try: + img = consumer_q.pop() # consume image from queue + except IndexError as e: + print('No images in queue: ', e) + continue + + img = self.process_image(img) + cv2.imshow('baby', maintain_aspect_ratio_resize(img, width=self.frame_dim[0], height=self.frame_dim[1])) + + if cv2.waitKey(1) & 0xFF == ord('q'): + break + \ No newline at end of file diff --git a/sleepy_baby/decision_logic.py b/sleepy_baby/decision_logic.py new file mode 100644 index 0000000..1fd0aa6 --- /dev/null +++ b/sleepy_baby/decision_logic.py @@ -0,0 +1,193 @@ +import logging +from collections import deque +import statistics + +import cv2 +import numpy as np + + +class DecisionLogic: + + # TODO: break up this class, so big ew + + # General high level heuristics: + # 1) no eyes -> no body found -> baby is awake + # 2) no eyes -> body found -> moving -> baby is awake + # 3) no eyes -> body found -> not moving -> baby is sleeping + # 4) eyes -> eyes open -> baby is awake (disregard body movement) + # 5) eyes -> eyes closed -> movement -> baby is awake + # 6) eyes -> eyes closed -> no movement -> baby is asleep + # 7) eyes -> eyes closed -> mouth open -> baby is awake + + def __init__(self): + self.logger = logging.getLogger(self.__class__.__qualname__) + self.eyes_open_q = deque(maxlen=30) + self.awake_q = deque(maxlen=40) + self.movement_q = deque(maxlen=40) + self.eyes_open_state = False + self.is_awake = False + self.body_found = False + self.eyes_found = False + self.avg_awake = 0 + + + def push(self, analysis): + self.body_found = analysis['body_detected'] + self.eyes_found = analysis['face_detected'] + if self.body_found: + self.movement_q.append((analysis['left_wrist_coords'], analysis['right_wrist_coords'])) + if self.eyes_found: + if analysis['eyes_open'] is False: + self.eyes_open_q.append(1 if analysis['mouth_open'] else 0) + else: + self.eyes_open_q.append(1) + #no_eyes_found + #no_body_found + + + def update(self, eyes_threshold:float=0.75, wrist_threshold:int=25) -> None: #every second + + if self.body_found is False: #throttled_handle_no_body_found + self.awake_q.append(1) + if (self.eyes_found is False) and (len(self.eyes_open_q)>0): #throttled_handle_no_eyes_found + self.eyes_open_q.popleft() + + #self.awake_voting_logic() + if len(self.eyes_open_q) > self.eyes_open_q.maxlen/2: + avg = sum(self.eyes_open_q) / len(self.eyes_open_q) + if avg > 0.75: #eyes_open + self.eyes_open_state = True + self.logger.info("Eyes Open: vote awake") + self.awake_q.append(1) + else: + self.eyes_open_state = False + self.awake_q.append(0) + self.logger.info("Eyes closed: vote sleeping") + else: + self.logger.debug("Not voting on eyes, eye que is too short.") + + #self.movement_voting_logic(body_found) + if self.body_found is False: + self.logger.debug("No body found, depreciate movement queue.") + if len(self.movement_q)>0: + self.movement_q.popleft() + elif (movement_list_len := len(self.movement_q)) > 5: + positions = np.reshape(self.movement_q, (movement_list_len, 4)).T + st_dev = [statistics.pstdev(pos) for pos in positions] + avg_std = sum(st_dev)/4 + + if int(avg_std) < wrist_threshold: + self.logger.info('No movement, vote sleeping') + self.awake_q.append(0) + else: + print("Movement, vote awake") + self.logger.info("Movement, vote awake") + self.awake_q.append(1) + + #self.set_wakeness_status() + if len(self.awake_q)>0: + self.avg_awake = sum(self.awake_q) / len(self.awake_q) + if self.avg_awake >= 0.6 and self.is_awake == False: + self.logger.info("Awake Event") + self.is_awake = True + #self.need_to_clean_this_up(True) #TODO + elif self.avg_awake <0.6 and self.is_awake == True: + self.logger.info("Sleep Event") + self.is_awake = False + #self.need_to_clean_this_up(True) #TODO + + #self.periodic_wakeness_check() + + + # This is placeholder until improve sensitivity of transitioning between waking and sleeping. + # Explanation: Sometimes when baby is waking up, he'll open and close his eyes for a couple of minutes... + # TODO: Fine-tune sensitivity of voting, for now, don't allow toggling between wake & sleep within N seconds + #@debounce(180) + def need_to_clean_this_up(self, wake_status, img): + str_timestamp = str(int(time.time())) + sleep_data_base_path = os.getenv("SLEEP_DATA_PATH") + p = sleep_data_base_path + '/' + str_timestamp + '.png' + if wake_status: # woke up + log_string = "1," + str_timestamp + "\n" + print(log_string) + logging.info(log_string) + with open(sleep_data_base_path + '/sleep_logs.csv', 'a+', encoding="utf-8") as f: + f.write(log_string) + cv2.imwrite(p, img) # store off image of when wake/sleep event occurred. Can help with debugging issues + + # if daytime, send phone notification if baby woke up + # now = datetime.datetime.now() + # now_time = now.time() + # if now_time >= ti(7,00) or now_time <= ti(22,00): # day time + # Thread(target=telegram_send.send(messages=["Baby woke up."]), daemon=True).start() + + self.is_awake = True + + if os.getenv("OWL", 'False').lower() in ('true', '1'): + print("MOVE & MAKE NOISE") + logging.info("MOVE & MAKE NOISE") + time.sleep(5) + self.ser.write(bytes(str(999999) + "\n", "utf-8")) + self.cast_service.play_sound() + else: # fell asleep + log_string = "0," + str_timestamp + "\n" + print(log_string) + logging.info(log_string) + with open(sleep_data_base_path + '/sleep_logs.csv', 'a+', encoding="utf-8") as f: + f.write(log_string) + cv2.imwrite(p, img) + self.is_awake = False + + # now = datetime.datetime.now() + # now_time = now.time() + # if now_time >= ti(22,00) or now_time <= ti(8,00): # night time + # set_hatch(self.is_awake) + + + + + + def frame_logic(self, raw_img): + img = raw_img + + debug_img = img.copy() + img.flags.writeable = False + converted_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) + + # beef + res = self.process_baby_image_models(converted_img, debug_img) + debug_img = res[0] + body_found = res[1] + + self.awake_voting_logic(debug_img) + self.movement_voting_logic(debug_img, body_found) + self.set_wakeness_status(debug_img) + self.periodic_wakeness_check() + + if os.getenv("DEBUG", 'False').lower() in ('true', '1'): + avg_awake = sum(self.awake_q) / len(self.awake_q) + + # draw progress bar + bar_y_offset = 0 + bar_y_offset = 100 + + bar_width = 200 + w = img.shape[1] + start_point = (int(w/2 - bar_width/2), 350 + bar_y_offset) + + end_point = (int(w/2 + bar_width/2), 370 + bar_y_offset) + adj_avg_awake = 1.0 if avg_awake / .6 >= 1.0 else avg_awake / .6 + progress_end_point = (int(w/2 - bar_width/2 + (bar_width*(adj_avg_awake))), 370 + bar_y_offset) + + color = (255, 255, 117) + progress_color = (0, 0, 255) + thickness = -1 + + debug_img = cv2.rectangle(debug_img, start_point, end_point, color, thickness) + debug_img = cv2.rectangle(debug_img, start_point, progress_end_point, progress_color, thickness) + display_perc = int((avg_awake * 100) / 0.6) + display_perc = 100 if display_perc >= 100 else display_perc + debug_img = cv2.putText(debug_img, str(display_perc) + "%", (int(w/2 - bar_width/2), 330 + bar_y_offset), 2, 1, (255,0,0), 2, 2) + debug_img = cv2.putText(debug_img, "Awake", (int(w/2 - bar_width/2 + 85), 330 + bar_y_offset), 2, 1, (255,0,0), 2, 2) + + return debug_img \ No newline at end of file diff --git a/sleepy_baby/frame.py b/sleepy_baby/frame.py new file mode 100644 index 0000000..c6207d2 --- /dev/null +++ b/sleepy_baby/frame.py @@ -0,0 +1,186 @@ +import logging +import numpy as np +import mediapipe as mp +import cv2 +from .helpers import gamma_correction + +mp_utils = mp.solutions.drawing_utils + +class Frame: + def __init__(self, frame: cv2.Mat, x_offset: int =0, y_offset: int = 0, width:int = None, height:int= None): + self.logger = logging.getLogger(self.__class__.__qualname__) + self.frame = frame + if (width is not None) and (height is not None): + self.set_working_area(x_offset, y_offset, width, height) + else: + self.set_working_area(0,0, frame.shape[1], frame.shape[0]) + + def set_working_area(self, x_offset: int, y_offset: int, width: int, height:int): + """ + set_working_area will define a sub-area of frame to be analyze. + + This will help hardware to be faster and have a lower power consumption + + Parameters + ---------- + x_offset : int + offset for crop image on x-axis + y_offset : int + offset for crop image on y-axis + width : int + width of the interesting area + height : int + height of the interesting area + """ + self.x_offset = x_offset + self.y_offset = y_offset + self.height = height + self.width = width + self.clean_working_frame() + + def clean_working_frame(self) -> None: + """Create a new working picture""" + self.w_data = self.get_working_image() + + def get_working_image(self) -> np.ndarray: + """ + Return the subset of the frame where analysis is done + + Returns + ------- + np.ndarray + Working area + """ + return self.frame[self.y_offset:self.y_offset+self.height, self.x_offset:self.x_offset+self.width].copy() + + def getAugmentedFrame(self, gamma: float =.8) -> np.ndarray: + """ + It generates the a new frame integrating the modified working area. + + Returns + ------- + np.ndarray + Image integrated + """ + frame = cv2.LUT(self.frame.copy(), gamma_correction(gamma)) + frame[self.y_offset:self.y_offset+self.height, self.x_offset:self.x_offset+self.width] = self.w_data + return frame + + + def add_body_details(self, + pose_landmarks, + landmark_color=(255, 150, 255), + landmark_thickness=2, + landmark_circle_radius=2): + if pose_landmarks: + self.logger.debug("Body detected in the frame") + CUTOFF_THRESHOLD = 10 # head and face + MY_CONNECTIONS = [t for t in mp.solutions.pose.POSE_CONNECTIONS if t[0] > CUTOFF_THRESHOLD and t[1] > CUTOFF_THRESHOLD] + for id, lm in enumerate(pose_landmarks.landmark): + if id <= CUTOFF_THRESHOLD: + lm.visibility = 0 + continue + mp_utils.draw_landmarks(self.w_data, + pose_landmarks, + MY_CONNECTIONS, + landmark_drawing_spec=mp_utils.DrawingSpec( color=landmark_color, + thickness=landmark_thickness, + circle_radius=landmark_circle_radius) + ) + else: + self.logger.debug("No body detected in frame") + + def add_wrist_position(self, pose_landmarks, show_text=True, text_color=(255,0,0), point_color=(255,0,0), point_radius=2): + if pose_landmarks: + left_wrist = (int(self.width * pose_landmarks.landmark[15].x), int(self.height * pose_landmarks.landmark[15].y)) + right_wrist = (int(self.width * pose_landmarks.landmark[16].x), int(self.height * pose_landmarks.landmark[16].y)) + + cv2.circle(self.w_data, left_wrist, radius=point_radius, color=point_color, thickness=-1) + cv2.circle(self.w_data, right_wrist, radius=point_radius, color=point_color, thickness=-1) + if show_text: + self.w_data = cv2.putText(self.w_data, "Left wrist", left_wrist, 2, 1, text_color, 2, 2) + self.w_data = cv2.putText(self.w_data, "Right wrist", right_wrist, 2, 1, text_color, 2, 2) + + def add_analysis_frame(self): + self.logger.debug("Draw the analysis frame") + self.w_data = cv2.rectangle(self.w_data, [0,0], (self.w_data.shape[1], self.w_data.shape[0]), color=(0,255,0), thickness=5) + + def add_face_details(self, + multi_face_landmarks, + details_thickness = 1, + left_eye_color=(255, 255, 0), + right_eye_color=(255, 150, 255), + top_lip_color = (255, 150, 255), + bottom_lip_color = (255, 255, 0) + ): + """ + add_face_details_to_image adds face details to image passed in arguments. + + Parameters + ---------- + frame : numpy.ndarray + starting image + multi_face_landmarks : dict + dictionary containing evaluation + + Returns + ------- + numpy.ndarray + image with some draws overlayed + """ + if multi_face_landmarks: + self.logger.debug("Face found in the frame") + for face_landmarks in multi_face_landmarks: + # INDICIES: https://github.com/tensorflow/tfjs-models/blob/838611c02f51159afdd77469ce67f0e26b7bbb23/face-landmarks-detection/src/mediapipe-facemesh/keypoints.ts + # https://github.com/google/mediapipe/blob/master/mediapipe/python/solutions/face_mesh_connections.py + + if right_eye_color is not None: + mp.solutions.drawing_utils.draw_landmarks( + image=self.w_data, + landmark_list=face_landmarks, + connections=mp.solutions.face_mesh.FACEMESH_RIGHT_EYE, + landmark_drawing_spec=None, + connection_drawing_spec=mp_utils.DrawingSpec(color=right_eye_color, thickness=details_thickness, circle_radius=1)) + + if left_eye_color is not None: + mp.solutions.drawing_utils.draw_landmarks( #a[0:9]+a[20:29] + image=self.w_data, + landmark_list=face_landmarks, + connections=mp.solutions.face_mesh.FACEMESH_LEFT_EYE, + landmark_drawing_spec=None, + connection_drawing_spec=mp_utils.DrawingSpec(color=left_eye_color, thickness=details_thickness, circle_radius=1)) + + if top_lip_color is not None: + mp.solutions.drawing_utils.draw_landmarks( + image=self.w_data, + landmark_list=face_landmarks, + connections=list(map(lambda x: x[29:40]+x[9:20], [list(mp.solutions.face_mesh.FACEMESH_LIPS)]))[0], + landmark_drawing_spec=None, + connection_drawing_spec=mp_utils.DrawingSpec(color=top_lip_color, thickness=details_thickness, circle_radius=1)) + + if bottom_lip_color is not None: + mp.solutions.drawing_utils.draw_landmarks( + image=self.w_data, + landmark_list=face_landmarks, + connections=list(map(lambda x: x[0:9]+x[20:29], [list(mp.solutions.face_mesh.FACEMESH_LIPS)]))[0], + landmark_drawing_spec=None, + connection_drawing_spec=mp_utils.DrawingSpec(color=bottom_lip_color, thickness=details_thickness, circle_radius=1)) + else: + self.logger.debug("No face found") + + def add_progress_bar(self, percent, bar_width=500, bar_height = 20, bar_y_offset=100, backcolor=(255, 255, 117), forecolor=(0,0,255), textcolor=(255,0,0)): + # draw progress bar + adj_percent = min(1.0, percent / 0.6) + start_point = (int(self.width/2 - bar_width/2), self.height - bar_y_offset) + end_point = (start_point[0] + bar_width, start_point[1] + bar_height) + mid_point = (start_point[0] + int(bar_width * adj_percent), start_point[1] + bar_height) + text_y_position = start_point[1] - int(bar_height / 5) + + self.w_data = cv2.rectangle(self.w_data, start_point, end_point, backcolor, thickness = -1) + self.w_data = cv2.rectangle(self.w_data, start_point, mid_point, forecolor, thickness = -1) + self.w_data = cv2.putText(self.w_data, str(int(adj_percent * 100)) + "% Awake", (start_point[0], text_y_position), 2, 1, textcolor, 2, 2) + + def add_status(self, asleep): + text = 'Sleepy Baby' if asleep else 'Wakey Baby' + text_color = (255,191,0) if asleep else (0,140,255) + cv2.putText(self.w_data, text, (int(self.w_data.shape[1]/4), int(self.w_data.shape[0]/2)), 2, 3, text_color, 2, 2) \ No newline at end of file diff --git a/sleepy_baby/helpers.py b/sleepy_baby/helpers.py new file mode 100644 index 0000000..99b3cdb --- /dev/null +++ b/sleepy_baby/helpers.py @@ -0,0 +1,117 @@ +import numpy as np +import cv2 +from pyhatchbabyrest import PyHatchBabyRest +from functools import lru_cache + +def get_point_as_array(point): + """ + get_point_as_array transforms landmarks coordinate in numpy array. + + Parameters + ---------- + point : mediapipe.framework.formats.landmark_pb2.NormalizedLandmark + Landmarks coordinate + + Returns + ------- + numpy.array + array of coordinates [x, y, z] + """ + return np.array([point.x, point.y, point.z]) + +def get_distance_between_landmarks(landmark, ref0, ref1): + p0 = get_point_as_array(landmark[ref0]) + p1 = get_point_as_array(landmark[ref1]) + return np.linalg.norm(p0 - p1) + +def get_eyes_positions(landmarks): #TODO: comment it + #Indexes of landmarks are reported at https://raw.githubusercontent.com/google/mediapipe/a908d668c730da128dfa8d9f6bd25d519d006692/mediapipe/modules/face_geometry/data/canonical_face_model_uv_visualization.png + rh_right = get_point_as_array(landmarks[33]) + rh_left = get_point_as_array(landmarks[133]) + rv_top = get_point_as_array(landmarks[159]) + rv_bottom = get_point_as_array(landmarks[145]) + + lh_right = get_point_as_array(landmarks[362]) + lh_left = get_point_as_array(landmarks[263]) + lv_top = get_point_as_array(landmarks[386]) + lv_bottom = get_point_as_array(landmarks[374]) + + return {"Left Eye": {"right": lh_right, + "left": lh_left, + "top": lv_top, + "bottom": lv_bottom}, + "Right Eye": {"right": rh_right, + "left": rh_left, + "top": rv_top, + "bottom": rv_bottom} + } + +# Given x/y coords of eyes, returns a ratio representing "openness" of eyes +def closed_ratio(landmarks): #TODO: commen the function + #Indexes of landmarks are reported at https://raw.githubusercontent.com/google/mediapipe/a908d668c730da128dfa8d9f6bd25d519d006692/mediapipe/modules/face_geometry/data/canonical_face_model_uv_visualization.png + right_eye_width = get_distance_between_landmarks(landmarks, 33, 133) + right_eye_height = get_distance_between_landmarks(landmarks, 159, 145) + left_eye_width = get_distance_between_landmarks(landmarks, 362, 263) + left_eye_height = get_distance_between_landmarks(landmarks, 386, 374) + right_eye_ratio = right_eye_width / right_eye_height + left_eye_ratio = left_eye_width / left_eye_height + return (right_eye_ratio + left_eye_ratio) / 2 + +def check_eyes_open(landmarks, ratio_threshold=5): #TODO: comment it + return closed_ratio(landmarks) <= ratio_threshold + + +def set_hatch(is_awake): + print("attempting to boost hatch brightness") + rest = PyHatchBabyRest(os.getenv('HATCH_IP')) + rest.set_brightness(5) + print("brightness: ", rest.brightness) + + return + +def get_top_lip_height(landmarks): + #Indexes of landmarks are reported at https://raw.githubusercontent.com/google/mediapipe/a908d668c730da128dfa8d9f6bd25d519d006692/mediapipe/modules/face_geometry/data/canonical_face_model_uv_visualization.png + return get_height(landmarks, [(39,81), (0,13), (269,311)]) + +def get_bottom_lip_height(landmarks): + #Indexes of landmarks are reported at https://raw.githubusercontent.com/google/mediapipe/a908d668c730da128dfa8d9f6bd25d519d006692/mediapipe/modules/face_geometry/data/canonical_face_model_uv_visualization.png + return get_height(landmarks, [(181,178), (17,14), (405,402)]) + +def get_mouth_height(landmarks): + #Indexes of landmarks are reported at https://raw.githubusercontent.com/google/mediapipe/a908d668c730da128dfa8d9f6bd25d519d006692/mediapipe/modules/face_geometry/data/canonical_face_model_uv_visualization.png + return get_height(landmarks, [(178,81), (14,13), (402,311)]) + +def get_height(landmarks, tuples): + heights = [] + for tuple in tuples: + p0 = get_point_as_array(landmarks[tuple[0]]) + p1 = get_point_as_array(landmarks[tuple[1]]) + heights.append(np.linalg.norm(p0 - p1)) + return np.mean(heights) + +def check_mouth_open(landmarks, ratio = 0.8): + top_lip_height = get_top_lip_height(landmarks) + bottom_lip_height = get_bottom_lip_height(landmarks) + mouth_height = get_mouth_height(landmarks) + + # if mouth is open more than lip height * ratio, return true. + return mouth_height > min(top_lip_height, bottom_lip_height) * ratio + +# Resizes a image and maintains aspect ratio +def maintain_aspect_ratio_resize(image, width:int=None, height:int=None, inter=cv2.INTER_AREA): + # Return original image if no need to resize + if width is None and height is None: + return image + (h, w) = image.shape[:2] # Grab the image size and initialize dimensions + # Select smallest format: + if ((width or w) / w) < ((height or h) / h) and (width is not None): + dim = (width, int(h * width / w)) + else: + dim = (int(w * height / height), height) + return cv2.resize(image, dim, interpolation=inter) + +@lru_cache(maxsize=10) +def gamma_correction(gamma): + invGamma = 1 / gamma + table = [((i / 255) ** invGamma) * 255 for i in range(256)] + return np.array(table, np.uint8) \ No newline at end of file