Skip to content

Commit

Permalink
lint: apply ruff
Browse files Browse the repository at this point in the history
Signed-off-by: Wim-Peter Dirks <[email protected]>
  • Loading branch information
Wim-Peter Dirks committed Nov 1, 2024
1 parent 8574af3 commit 8bca272
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 38 deletions.
45 changes: 27 additions & 18 deletions rcdt_detection/launch/detect.launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,30 @@
from launch_ros.substitutions import FindPackageShare


def generate_launch_description():
return LaunchDescription([
Node(
package="rcdt_detection",
executable="object_detection.py",
),
IncludeLaunchDescription(
PythonLaunchDescriptionSource([
PathJoinSubstitution([
FindPackageShare("realsense2_camera"), "launch", "rs_launch.py"])
]),
launch_arguments={
"align_depth.enable": "true",
"enable_sync": "true",
"enable_rgbd": "true",
}.items(),
),
])
def generate_launch_description() -> LaunchDescription:
return LaunchDescription(
[
Node(
package="rcdt_detection",
executable="object_detection.py",
),
IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[
PathJoinSubstitution(
[
FindPackageShare("realsense2_camera"),
"launch",
"rs_launch.py",
]
)
]
),
launch_arguments={
"align_depth.enable": "true",
"enable_sync": "true",
"enable_rgbd": "true",
}.items(),
),
]
)
39 changes: 26 additions & 13 deletions rcdt_detection/nodes/object_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
@dataclasses.dataclass
class ImageCoordinate:
"""Image coordinate where (0, 0) is at the top-left corner."""

x: int
y: int

Expand All @@ -46,13 +47,17 @@ def __init__(self, is_object_of_interest: Callable):
super().__init__("detection_service")

self.is_object_of_interest: Callable = is_object_of_interest
self.segmentation_model: ultralytics.engine.model.Model = load_segmentation_model()
self.segmentation_model: ultralytics.engine.model.Model = (
load_segmentation_model()
)
self.intrinsics: rs2.intrinsics | None = None

self.rgbd_topic: str = "/camera/camera/rgbd"
self.time_to_wait: float = 2.0
self.max_attempts: int = 3
self.service = self.create_service(DetectObjects, "detect_objects", self.detection_callback)
self.service = self.create_service(
DetectObjects, "detect_objects", self.detection_callback
)

def detection_callback(self, _: None, response: PointList) -> PointList:
"""Gets RGBD image, detects objects, and returns them."""
Expand All @@ -65,7 +70,7 @@ def detection_callback(self, _: None, response: PointList) -> PointList:

centroid_image_coordinates = process_rgb_image(
message=message.rgb,
segmentation_model=self.segmentation_model,
segmentation_model=self.segmentation_model,
is_object_of_interest=self.is_object_of_interest,
)
ros_logger.info(f"{centroid_image_coordinates}")
Expand All @@ -75,7 +80,7 @@ def detection_callback(self, _: None, response: PointList) -> PointList:
world_coordinates = process_depth_image(
message=message.depth,
image_coordinates=centroid_image_coordinates,
intrinsics=self.intrinsics
intrinsics=self.intrinsics,
)
ros_logger.info(f"{world_coordinates}")
response.objects = PointList(points=world_coordinates)
Expand All @@ -84,10 +89,10 @@ def detection_callback(self, _: None, response: PointList) -> PointList:

def get_rgbd(self) -> RGBD:
"""Gets RGBD-message from the rgbd topic.
Raises:
LookupError: if after `self.max_attempts` no message could be received.
"""
attempts = 0
while attempts < self.max_attempts:
Expand All @@ -98,11 +103,13 @@ def get_rgbd(self) -> RGBD:
time_to_wait=self.time_to_wait,
)
if success:
break
break
attempts += 1
else:
raise LookupError("unable to get message from RGBD topic")
ros_logger.info(f"Received RGBD message for frame '{message.header.frame_id}' stamped at {repr(message.header.stamp)}.")
ros_logger.info(
f"Received RGBD message for frame '{message.header.frame_id}' stamped at {repr(message.header.stamp)}."
)
return message


Expand All @@ -122,7 +129,7 @@ def calculate_intrinsics(message: CameraInfo) -> rs2.intrinsics:
elif message.distortion_model == "equidistant":
intrinsics.model = rs2.distortion.kannala_brandt4

intrinsics.coeffs = [i for i in message.d]
intrinsics.coeffs = list(message.d)

return intrinsics

Expand All @@ -137,7 +144,7 @@ def process_rgb_image(
Object detection starts with segmenting the image. The result contains segmentation masks
per object. For every segmented object it is determined whether it is an object of interest.
If it is, the image coordinates of its centroid are calculated.
"""
rgb_image = ros_image_to_cv2_image(message)

Expand All @@ -158,14 +165,20 @@ def process_rgb_image(
return centroid_image_coordinates


def process_depth_image(message: Image, image_coordinates: Iterable[ImageCoordinate], intrinsics: rs2.intrinsics) -> list[Point]:
def process_depth_image(
message: Image,
image_coordinates: Iterable[ImageCoordinate],
intrinsics: rs2.intrinsics,
) -> list[Point]:
"""Calculate world coordinate relative to the camera of image coordinates."""
depth_image = ros_image_to_cv2_image(message)

world_coordinates = []
for image_coordinate in image_coordinates:
depth_value = depth_image[image_coordinate.y, image_coordinate.x]
x, y, z = rs2.rs2_deproject_pixel_to_point(intrinsics, [image_coordinate.x, image_coordinate.y], depth_value)
x, y, z = rs2.rs2_deproject_pixel_to_point(
intrinsics, [image_coordinate.x, image_coordinate.y], depth_value
)
world_coordinates.append(Point(x=x, y=y, z=z))

return world_coordinates
Expand Down Expand Up @@ -202,4 +215,4 @@ def main(args: str = None) -> None:
# rosdep update
# rosdep install --from-paths src --ignore-src --reinstall -y
# source install/setup.bash
# WARNING: The scripts f2py and numpy-config are installed in '/home/wp/.local/bin' which is not on PATH.
# WARNING: The scripts f2py and numpy-config are installed in '/home/wp/.local/bin' which is not on PATH.
2 changes: 1 addition & 1 deletion rcdt_detection/rcdt_detection/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# SPDX-FileCopyrightText: Alliander N. V.
#
# SPDX-License-Identifier: Apache-2.0
# SPDX-License-Identifier: Apache-2.0
4 changes: 3 additions & 1 deletion rcdt_detection/rcdt_detection/image_manipulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
cv_bridge = CvBridge()


def ros_image_to_cv2_image(image_message: Image, desired_encoding: str = "passthrough") -> np.array:
def ros_image_to_cv2_image(
image_message: Image, desired_encoding: str = "passthrough"
) -> np.array:
"""Convert ROS image message to cv2 image."""
return cv_bridge.imgmsg_to_cv2(image_message, desired_encoding=desired_encoding)

Expand Down
6 changes: 3 additions & 3 deletions rcdt_detection/rcdt_detection/object_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

def is_brick(mask: np.array) -> bool:
"""Determine if given mask belongs to a brick using mean pixel thresholds."""
b, g, r = np.nanmean(np.where(mask==0.0, np.nan, mask), axis=(0, 1))
return (b > (r + 100.0)) and (b > (g + 10.0)) # (85.0 < r < 110.0) and (100.0 < g < 130.0) and (100.0 < b < 140.0)
b, g, r = np.nanmean(np.where(mask == 0.0, np.nan, mask), axis=(0, 1))
return (b > (r + 100.0)) and (b > (g + 10.0))


def is_blue(mask: np.array) -> bool:
"""Determine if given mask belongs to a blue object using mean pixel thresholds."""
b, g, r = np.nanmean(np.where(mask==0.0, np.nan, mask), axis=(0, 1))
b, g, r = np.nanmean(np.where(mask == 0.0, np.nan, mask), axis=(0, 1))
return (b > (r - 5.0)) and (b > (g - 5.0))
8 changes: 6 additions & 2 deletions rcdt_detection/rcdt_detection/segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
SEGMENTATION_MODEL_PATH: str = "/home/wp/projects/realsense/models/FastSAM-x.pt"


def load_segmentation_model(model_path: str = SEGMENTATION_MODEL_PATH) -> ultralytics.engine.model.Model:
def load_segmentation_model(
model_path: str = SEGMENTATION_MODEL_PATH,
) -> ultralytics.engine.model.Model:
"""Load segmentation model from given path."""
return ultralytics.FastSAM(model_path)


def segment_image(model: ultralytics.engine.model.Model, image: np.array) -> ultralytics.engine.results.Results:
def segment_image(
model: ultralytics.engine.model.Model, image: np.array
) -> ultralytics.engine.results.Results:
"""Segment given image using given model."""
height, width, _ = image.shape
return model(image, imgsz=(height, width))[0]

0 comments on commit 8bca272

Please sign in to comment.