Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StopSign with mediapipe #1197

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions donkeycar/parts/object_detector/action_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import time
import logging
from donkeycar.parts.object_detector.detector_manager import ActionProtocol

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


ACTION_DEMO_TRIGGER_TIMES = 10

class ActionDemo(ActionProtocol):
def __init__(self, **kwargs):
self.__run_trigger = 0
super().__init__(**kwargs)

def manage(self, angle, throttle, found: bool, position):
reset_action = False
self.__run_trigger += 1
if not found or self.__run_trigger >= ACTION_DEMO_TRIGGER_TIMES:
self.__run_trigger = 0
reset_action = True
return angle, throttle, reset_action
89 changes: 89 additions & 0 deletions donkeycar/parts/object_detector/action_stop_and_go.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import time
import logging
from donkeycar.parts.object_detector.detector_manager import ActionProtocol

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class StopManager():
# Stop states
IDLE = 0
INITIATE = 1
POS_ONE = 3
NEG_ONE = 2
NEG_TWO = 4
THROTTLE_INC = 0.2

def __init__(self):
self.stop_state = self.IDLE
self.last_throttle = 0.0

def stop(self):
if self.stop_state == self.IDLE:
self.stop_state = self.INITIATE

def is_idle(self):
return self.stop_state == self.IDLE

def throttle(self):
# if self.stop_state == self.IDLE:
# pass
throttle = 0.0
if self.stop_state == self.INITIATE:
self.stop_state = self.NEG_ONE
throttle = -1.0
elif self.stop_state == self.NEG_ONE:
self.stop_state = self.POS_ONE
throttle = 0.0
elif self.stop_state == self.POS_ONE:
self.stop_state = self.NEG_TWO
throttle = -1.0
elif self.stop_state == self.NEG_TWO:
throttle = self.last_throttle + self.THROTTLE_INC
if throttle >= 0.0:
throttle = 0.0
self.stop_state = self.IDLE
self.last_throttle = throttle
return throttle


class ActionStopAndGo(ActionProtocol):
# Stop and Go protocol States
RUNNING = 0
STOPPING = 1
PAUSING = 2
PASSING = 3

def __init__(self, pause_time=2.0, **kwargs):
super().__init__(**kwargs)
self.pause = pause_time
self.state = self.RUNNING
self.timeout = 0.0
self.stopper = StopManager()

def manage(self, angle, throttle, found: bool, position):
reset_action = False
logger.debug(f'self.state: {self.state}')
if self.state == self.RUNNING:
if found:
self.state = self.STOPPING
self.stopper.stop()
else:
reset_action = True
if self.state == self.STOPPING:
throttle = self.stopper.throttle()
if self.stopper.is_idle():
self.state = self.PAUSING
self.timeout = time.time() + self.pause
elif self.state == self.PAUSING:
if time.time() < self.timeout:
throttle = 0.0
else:
self.state = self.PASSING
elif self.state == self.PASSING:
if not found:
self.state = self.RUNNING
reset_action = True

return angle, throttle, reset_action
155 changes: 155 additions & 0 deletions donkeycar/parts/object_detector/detector_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""
detector_manager.py
Donkeycar Parts to manage a sequence of events based upon object detection

DetectorManager is a Donkeycar part that manages the object detection and the actions.

ActionProtocol is a base class for the actions that can be managed by the DetectorManager.
* action_demo.py: An example that shows how to create an action.
* action_stop_and_go.py: action_stop_and_go.py: First stops the car, then pauses for delay, then passes the Stop Sign.

Mediapipe_Object_Detector: detects objects in the image using the mediapipe object detection model.

How to use:
1. Download the object detection model to the Car directory
https://storage.googleapis.com/mediapipe-models/object_detector/efficientdet_lite0/int8/1/efficientdet_lite0.tflite

2. configure myconfig.py
2.1 Set OBJECT_DETECTOR = True to enable object detection.
2.2 OD_ACTION_DEMO = True, which allows recognition of OD_ACTION_DEMO_LABEL, default value is person.
2.3 OD_ACTION_STOP_AND_GO = True to enable the stop sign feature.
"""

import time
import logging
from donkeycar.parts.object_detector.mediapipe_object_detetor import MediapipeObjectDetector

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

MARK_TEXT_MARGIN = 10 # pixels
MARK_TEXT_ROW_SIZE = 30
MARK_TEXT_SIZE = 1
MARK_TEXT_THICKNESS = 1
MARK_TEXT_COLOR = (0, 255, 0)

class ActionProtocol:
def __init__(self, od_label: str):
self.od_label = od_label

def manage(self, angle, throttle, found: bool, position):
reset_action = True
return angle, throttle, reset_action

class DetectorManager:

def __init__(self,
od_model_path,
score=0.5,
image_width=160,
run_hz=1, # 1 per second
vehicle_hz=20,
show_bounding_box = True):

self.on = True
self.width = image_width
self.img_center = self.width / 2

self.running_action = None

self.run_counter = 0
self.run_trigger = int(vehicle_hz / run_hz)
self.run_inprogress = False

self.show_bounding_box = show_bounding_box

self.image = None
self.bbox = None
self.score = 0
self.label = None
self.position = 0.0

self.__actions = {}
self._od_labels =[]

self.detector = MediapipeObjectDetector(
od_model_path=od_model_path,
max_results=3,
score_threshold=score)

def run(self, angle, throttle, image):
self.run_counter += 1
start = time.time()
if self.run_counter >= self.run_trigger:
logger.debug(f'self.run_counter: {self.run_counter}')
self.image = image
self._detect()
if self.show_bounding_box and self.bbox is not None:
self._mark(self.image, self.bbox, self.label)

angle, throttle = self._dispatch_action(self.label,angle, throttle)
logger.debug(f'run_time_cost:{(time.time() - start):5.3f}')
return angle, throttle, image

def shutdown(self):
logger.info(
f'Detector - average detection time {self.detector.average_perf():5.3f}')
self.on = False

def addAction(self,action: ActionProtocol):
logger.info(f'addAction label:{action.od_label}')
self._od_labels.append(action.od_label)
self.__actions[action.od_label] = action

def _mark(self, image, bbox, label):
import cv2
# top left corner of rectangle
start_point = (bbox.origin_x, bbox.origin_y)
# bottom right corner of rectangle
end_point = (bbox.origin_x + bbox.width, bbox.origin_y + bbox.height)
color = (255, 0, 0) # Red color
thickness = 1
image = cv2.rectangle(image, start_point, end_point, color, thickness)

text_location = (MARK_TEXT_MARGIN + bbox.origin_x,
MARK_TEXT_MARGIN + MARK_TEXT_ROW_SIZE + bbox.origin_y)
cv2.putText(image, label, text_location, cv2.FONT_HERSHEY_DUPLEX,
MARK_TEXT_SIZE, MARK_TEXT_COLOR, MARK_TEXT_THICKNESS, cv2.LINE_AA)

def _detect(self):
self.bbox = None
self.score = 0
self.label = None
self.position = 0.0
if self.image is not None:
results = self.detector.detect(self.image)
for label, bbox, score in results:
if label in self._od_labels:
self.bbox = bbox
self.score = score
self.label = label
self.position = ((self.bbox.origin_x + (self.bbox.width / 2)) - self.img_center) / self.img_center
logger.debug(f'object label:{self.label }, bbox:{self.bbox}, score:{self.score}, position:{self.position }')
break

def _dispatch_action(self, label, angle, throttle):
action_label = self.running_action

if action_label == None: # if no action is running then check if there is an action for the label
if label in self.__actions:
self.running_action = label
action_label = label

if action_label != None: # if there is an action running then manage it
# if the label is the same as the action label then found is True
found = True if label == action_label else False

angle, throttle, reset_action = self.__actions[action_label].manage(angle, throttle, found, self.position)
if reset_action:
self.run_counter = 0
self.running_action = None
logger.info(f'dispatch action_label:{action_label}, reset_action:{reset_action}, angle:{angle}, throttle:{throttle}')
else:
self.run_counter = 0 # reset the run counter if no action is running

return angle, throttle
77 changes: 77 additions & 0 deletions donkeycar/parts/object_detector/mediapipe_object_detetor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#!/usr/bin/env python3
# Object Detection from Mediapipe
# Mediapipe-studio
# https://mediapipe-studio.webapps.google.com/home
# Mediapipe-examples-object_detection
# https://github.com/google-ai-edge/mediapipe-samples/tree/main/examples/object_detection/raspberry_pi
# installation:
# pip install mediapipe
#
# Model
# efficientdet_lite0: https://storage.googleapis.com/mediapipe-models/object_detector/efficientdet_lite0/int8/1/efficientdet_lite0.tflite
# category list: https://storage.googleapis.com/mediapipe-tasks/object_detector/labelmap.txt
#
# This is a general purpose detection class that uses a model to recognize an object.

import os
import time
import os

import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision

import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class MediapipeObjectDetector:
def __init__(self,
od_model_path, # object detection model path
max_results=3,
score_threshold=0.3):

# Check model file
if not os.path.exists(od_model_path):
raise (
Exception(f'ObjectDetector Model file not found: {od_model_path}'))
logger.debug(f"MediapipeOD load model {od_model_path}")

# Initialize the object detection model
base_options = python.BaseOptions(model_asset_path=od_model_path)
options = vision.ObjectDetectorOptions(base_options=base_options,
max_results=max_results,
score_threshold=score_threshold)
self.detector = vision.ObjectDetector.create_from_options(options)

# Performance timer
self.loops = 0
self.total_time = 0

def average_perf(self):
p = 0 if self.loops == 0 else self.total_time / self.loops
return p

def detect(self, image):
# Create a MediaPipe Image from the RGB image.
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image)
# gray_frame = mp.Image(image_format=mp.ImageFormat.GRAY8,
# data=cv2.cvtColor(cv_mat, cv2.COLOR_RGB2GRAY))

# Detect objects
start = time.time()
detection_result = self.detector.detect(mp_image)

self.loops += 1
cost = time.time() - start
self.total_time += cost
logger.debug(f'detect_time_cost:{cost}')

result = [] # list of (category, bbox, score)
for detection in detection_result.detections:
bbox = detection.bounding_box
for category in detection.categories:
result.append((category.category_name, bbox, category.score))
return result
15 changes: 15 additions & 0 deletions donkeycar/templates/cfg_complete.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -764,3 +764,18 @@
# PI connection
PI_USERNAME = "pi"
PI_HOSTNAME = "donkeypi.local"


# # Object Detector
OBJECT_DETECTOR = False # enable Detector lab
OBJECT_DETECTOR_SHOW_BOUNDING_BOX = True # show bounding box on the web control
OD_MODEL_NAME ='efficientdet_lite0.tflite' # object detection model name, file path is CAR_PATH/DETECTOR_LAB_MODEL_NAME
OD_SCORE = 0.5 # Set the score threshold for detection.
OD_RUN_HZ = 1 # Run detection algorithm n times per drive_loop_hz ex. 1 time every 20 drive loop

OD_ACTION_DEMO = False # enable detection to trigger a demo action
OD_ACTION_DEMO_LABEL = "person" # label to trigger demo action

OD_ACTION_STOP_AND_GO = False # enable detection to stop and go
OD_ACTION_STOP_AND_GO_LABEL = "stop sign" # label to trigger stop and go
OD_ACTION_STOP_AND_GO_PAUSE_TIME = 2.0 # after stop sequence completes, pause for n seconds
Loading