Skip to content

Commit

Permalink
[#54504] core: cvnode_base.py: Adapt Python nodes to updated messages…
Browse files Browse the repository at this point in the history
… API

Adapts the core CVNodeBase class implemented in Python to address
changes in 'kenning_computer_vision_msgs' ROS2 package.

Inference now is performed only in 'online' mode, where the system
should only react to a single frame.

Signed-off-by: Illia Vysochyn <[email protected]>
  • Loading branch information
ivysochyn committed Feb 7, 2024
1 parent 2362418 commit 711e905
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 124 deletions.
23 changes: 13 additions & 10 deletions cvnode_base/core/cvnode_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
"""Base class for computer vision nodes."""

from abc import ABC, abstractmethod
from typing import List
from typing import Optional, Tuple

from kenning_computer_vision_msgs.msg import SegmentationMsg
from kenning_computer_vision_msgs.msg import SegmentationMsg, VideoFrameMsg
from kenning_computer_vision_msgs.srv import ManageCVNode, SegmentCVNodeSrv
from rclpy.node import Node
from rclpy.qos import QoSProfile
from sensor_msgs.msg import Image
from std_srvs.srv import Trigger


Expand Down Expand Up @@ -125,19 +124,22 @@ def prepare(self) -> bool:
...

@abstractmethod
def run_inference(self, X: List[Image]) -> List[SegmentationMsg]:
def run_inference(
self, X: VideoFrameMsg
) -> Tuple[bool, Optional[SegmentationMsg]]:
"""
Run inference on the input data.
Parameters
----------
X : List[Image]
List of input image messages.
X : VideoFrameMsg
Input data for inference.
Returns
-------
List[SegmentationMsg]
List of postprocessed segmentation messages.
Tuple[bool, Optional[SegmentationMsg]]
Tuple containing success status and segmentation result
if successful.
"""
...

Expand Down Expand Up @@ -212,8 +214,9 @@ def _processCallback(
Processed response for the process service client.
"""
self.get_logger().debug("Executing inference on input data")
response.output = self.run_inference(request.input)
response.success = True
response.success, segmenation = self.run_inference(request.input)
if response.success:
response.segmentation = segmenation
self.get_logger().debug("Inference executed")
return response

Expand Down
30 changes: 7 additions & 23 deletions cvnode_base/nodes/mask_rcnn_detectron.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import csv
import os
from gc import collect
from typing import Dict, List
from typing import Dict

import rclpy
from detectron2 import model_zoo
Expand All @@ -35,27 +35,12 @@ def __init__(self):
self.declare_parameter("model_path", rclpy.Parameter.Type.STRING)
self.declare_parameter("num_classes", rclpy.Parameter.Type.INTEGER)

def run_inference(self, X: List[Image]) -> List[SegmentationMsg]:
"""
Run inference on the input data.
Parameters
----------
X : List[Image]
List of input image messages.
Returns
-------
List[SegmentationMsg]
List of postprocessed segmentation messages.
"""
result = []
for frame in X:
input_data = self.preprocess(frame)
prediction = self.predict(input_data)
result.append(self.postprocess(prediction, frame))
empty_cache()
return result
def run_inference(self, X):
input_data = self.preprocess(X.frame)
prediction = self.predict(input_data)
result = self.postprocess(prediction, X.frame)
empty_cache()
return True, result

def prepare(self) -> bool:
"""
Expand Down Expand Up @@ -158,7 +143,6 @@ def postprocess(self, Y: Dict, frame: Image) -> SegmentationMsg:
Postprocessed model predictions in the form of SegmentationMsg.
"""
msg = SegmentationMsg()
msg._frame = frame
prediction = Y["instances"]
scores = prediction.scores.cpu().detach().numpy()

Expand Down
28 changes: 6 additions & 22 deletions cvnode_base/nodes/mask_rcnn_onnx.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,12 @@ def __init__(self):
self.declare_parameter("model_path", rclpy.Parameter.Type.STRING)
self.declare_parameter("device", rclpy.Parameter.Type.STRING)

def run_inference(self, X: List[Image]) -> List[SegmentationMsg]:
"""
Run inference on the input data.
Parameters
----------
X : List[Image]
List of input image messages.
Returns
-------
List[SegmentationMsg]
List of postprocessed segmentation messages.
"""
result = []
for frame in X:
input_data = self.preprocess(frame)
prediction = self.predict(input_data)
result.append(self.postprocess(prediction, frame))
empty_cache()
return result
def run_inference(self, X):
input_data = self.preprocess(X.frame)
prediction = self.predict(input_data)
result = self.postprocess(prediction, X.frame)
empty_cache()
return True, result

def prepare(self) -> bool:
"""
Expand Down Expand Up @@ -181,7 +166,6 @@ def postprocess(
Postprocessed model predictions in the form of SegmentationMsg.
"""
msg = SegmentationMsg()
msg._frame = frame
boxes, classes, masks, scores = Y[0], Y[1], Y[2].squeeze(), Y[3]
if masks.ndim == 2:
masks = np.expand_dims(masks, 0)
Expand Down
26 changes: 5 additions & 21 deletions cvnode_base/nodes/mask_rcnn_tensorrt.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,11 @@ def __init__(self):
super().__init__(node_name="mask_rcnn_tensorrt_node")
self.declare_parameter("class_names_path", rclpy.Parameter.Type.STRING)

def run_inference(self, X: List[Image]) -> List[SegmentationMsg]:
"""
Run inference on the input data.
Parameters
----------
X : List[Image]
List of input image messages.
Returns
-------
List[SegmentationMsg]
List of postprocessed segmentation messages.
"""
result = []
for frame in X:
input_data = self.preprocess(frame)
prediction = self.predict(input_data)
result.append(self.postprocess(prediction, frame))
return result
def run_inference(self, X):
input_data = self.preprocess(X.frame)
prediction = self.predict(input_data)
result = self.postprocess(prediction, X.frame)
return True, result

def prepare(self) -> bool:
"""
Expand Down Expand Up @@ -120,7 +105,6 @@ def postprocess(
Postprocessed model predictions in the form of SegmentationMsg.
"""
msg = SegmentationMsg()
msg._frame = frame
keep = Y[0].squeeze()
boxes, classes, masks, scores = (
Y[1].squeeze()[:keep],
Expand Down
50 changes: 23 additions & 27 deletions cvnode_base/nodes/yolact_kenning.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,33 +180,29 @@ def prepare(self):
return self.yolact.prepare()

def run_inference(self, X):
result = []
for frame in X:
x = imageToMat(frame, "rgb8").transpose(2, 0, 1)
x = self.yolact.model.preprocess_input([x])
self.yolact.runtime.load_input([x])
self.yolact.runtime.run()
preds = self.yolact.runtime.extract_output()
preds = self.yolact.model.postprocess_outputs(preds)

msg = SegmentationMsg()
msg._frame = frame
if preds:
for y in preds[0]:
box = BoxMsg()
box._xmin = float(y.xmin)
box._xmax = float(y.xmax)
box._ymin = float(y.ymin)
box._ymax = float(y.ymax)
msg._boxes.append(box)
msg._scores.append(y.score)
mask = MaskMsg()
mask._data = y.mask.flatten()
mask._dimension = [y.mask.shape[0], y.mask.shape[1]]
msg._masks.append(mask)
msg._classes.append(y.clsname)
result.append(msg)
return result
x = imageToMat(X.frame, "rgb8").transpose(2, 0, 1)
x = self.yolact.model.preprocess_input([x])
self.yolact.runtime.load_input([x])
self.yolact.runtime.run()
preds = self.yolact.runtime.extract_output()
preds = self.yolact.model.postprocess_outputs(preds)

msg = SegmentationMsg()
if preds:
for y in preds[0]:
box = BoxMsg()
box._xmin = float(y.xmin)
box._xmax = float(y.xmax)
box._ymin = float(y.ymin)
box._ymax = float(y.ymax)
msg._boxes.append(box)
msg._scores.append(y.score)
mask = MaskMsg()
mask._data = y.mask.flatten()
mask._dimension = [y.mask.shape[0], y.mask.shape[1]]
msg._masks.append(mask)
msg._classes.append(y.clsname)
return True, msg

def cleanup(self):
del self.yolact
Expand Down
26 changes: 5 additions & 21 deletions cvnode_base/nodes/yolact_tensorrt.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,11 @@ def __init__(self):
super().__init__(node_name="mask_rcnn_tensorrt_node")
self.declare_parameter("class_names_path", rclpy.Parameter.Type.STRING)

def run_inference(self, X: List[Image]) -> List[SegmentationMsg]:
"""
Run inference on the input data.
Parameters
----------
X : List[Image]
List of input image messages.
Returns
-------
List[SegmentationMsg]
List of postprocessed segmentation messages.
"""
result = []
for frame in X:
input_data = self.preprocess(frame)
prediction = self.predict(input_data)
result.append(self.postprocess(prediction, frame))
return result
def run_inference(self, X):
input_data = self.preprocess(X.frame)
prediction = self.predict(input_data)
result = self.postprocess(prediction, X.frame)
return True, result

def prepare(self) -> bool:
"""
Expand Down Expand Up @@ -127,7 +112,6 @@ def postprocess(
Postprocessed model predictions in the form of SegmentationMsg.
"""
msg = SegmentationMsg()
msg._frame = frame
proto = Y[0]
loc = Y[1]
mask = Y[2]
Expand Down

0 comments on commit 711e905

Please sign in to comment.