Skip to content

Commit

Permalink
Merge pull request #8 from CPFL/develop
Browse files Browse the repository at this point in the history
v0.2
  • Loading branch information
hiro-ya-iv authored Apr 13, 2018
2 parents 76dcdce + 5fdfab2 commit 73a4163
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 62 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ ros-mqtt-bridge provides bridge between ROS and MQTT(message serialized by JSON)
## Installing

```console
pip install git+https://github.com/CPFL/ros_mqtt_bridge.git
$ pip install -e git+https://github.com/CPFL/ros_mqtt_bridge.git@v0.2#egg=ros_mqtt_bridge
```

## example
Expand Down
7 changes: 3 additions & 4 deletions examples/launchers/mqtt_to_ros.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
parser.add_argument("-P", "--port", type=int, default=1883, help="mqtt broker port")
parser.add_argument("-FT", "--from_topic", type=str, required=True, help="from topic")
parser.add_argument("-TT", "--to_topic", type=str, required=True, help="to topic")
parser.add_argument("-MMN", "--message_module_name", type=str, required=True, help="message module name")
parser.add_argument("-MCN", "--message_class_name", type=str, required=True, help="message class name")
parser.add_argument("-MT", "--message_type", type=str, required=True, help="message type")
args = parser.parse_args()


if __name__ == '__main__':
mqtt_to_ros = MQTTToROS(
args.host, args.port, args.from_topic, args.to_topic, args.message_module_name, args.message_class_name)
mqtt_to_ros = MQTTToROS(args.from_topic, args.to_topic, args.message_type)
mqtt_to_ros.set_mqtt_connection_args(host=args.host, port=args.port)
mqtt_to_ros.start()
7 changes: 3 additions & 4 deletions examples/launchers/ros_to_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
parser.add_argument("-P", "--port", type=int, default=1883, help="mqtt broker port")
parser.add_argument("-FT", "--from_topic", type=str, required=True, help="from topic")
parser.add_argument("-TT", "--to_topic", type=str, required=True, help="to topic")
parser.add_argument("-MMN", "--message_module_name", type=str, required=True, help="message module name")
parser.add_argument("-MCN", "--message_class_name", type=str, required=True, help="message class name")
parser.add_argument("-MT", "--message_type", type=str, required=True, help="message type")
args = parser.parse_args()


if __name__ == '__main__':
ros_to_mqtt = ROSToMQTT(
args.host, args.port, args.from_topic, args.to_topic, args.message_module_name, args.message_class_name)
ros_to_mqtt = ROSToMQTT(args.from_topic, args.to_topic, args.message_type)
ros_to_mqtt.set_mqtt_connection_args(host=args.host, port=args.port)
ros_to_mqtt.start()
2 changes: 1 addition & 1 deletion examples/std_msgs_string/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ $ rostopic pub -1 /ros/test/std_msgs_string std_msgs/String "data: 'test text fr
```

```
$ mosquitto_pub -d -t /mqtt/test/std_msgs_string -m "{'data': 'test text from mqtt'}"
$ mosquitto_pub -d -t /mqtt/test/std_msgs_string -m '{"data": "test text from mqtt"}'
```
6 changes: 2 additions & 4 deletions examples/std_msgs_string/std_msgs_string.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@


def launch_ros_to_mqtt_bridge():
ros_to_mqtt = ROSToMQTT(
"localhost", 1883, "/ros/test/std_msgs_string", "/mqtt/test/std_msgs_string", "std_msgs.msg", "msg.String")
ros_to_mqtt = ROSToMQTT("/ros/test/std_msgs_string", "/mqtt/test/std_msgs_string", "std_msgs/String")
print("start ros_to_mqtt_bridge.")
ros_to_mqtt.start()


def launch_mqtt_to_ros_bridge():
mqtt_to_ros = MQTTToROS(
"localhost", 1883, "/mqtt/test/std_msgs_string", "/ros/test/std_msgs_string", "std_msgs.msg", "msg.String")
mqtt_to_ros = MQTTToROS("/mqtt/test/std_msgs_string", "/ros/test/std_msgs_string", "std_msgs/String")
print("start mqtt_to_ros_bridge.")
mqtt_to_ros.start()

Expand Down
86 changes: 86 additions & 0 deletions ros_mqtt_bridge/args_setters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/env python
# coding: utf-8

from copy import deepcopy
from ssl import PROTOCOL_TLSv1_2
from paho.mqtt.client import MQTTv311


class ArgsSetters(object):

DEFAULT_HOST = "localhost"

def __init__(self, message_type):
self.args = {
"ros": {
"data_class": self.get_data_class_from_message_type(message_type),
"init_node": {},
"publisher": {
"queue_size": 1
},
"wait_for_message": {},
"rate": {
"hz": 2
}
},
"mqtt": {
"client": {},
"connect": {
"host": ArgsSetters.DEFAULT_HOST,
},
"publish": {},
"subscribe": {},
"tls": None
}
}

def set_ros_publisher_args(
self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None,
queue_size=None):
self.args["ros"]["publisher"] = deepcopy(locals())
self.args["ros"]["publisher"].pop("self")

def set_ros_init_node_args(
self, name, argv=None, anonymous=False, log_level=None, disable_rostime=False, disable_rosout=False,
disable_signals=False, xmlrpc_port=0, tcpros_port=0):
self.args["ros"]["init_node"] = deepcopy(locals())
self.args["ros"]["init_node"].pop("self")

def set_ros_wait_for_message_args(self, topic, topic_type, timeout=None):
self.args["ros"]["wait_for_message"] = deepcopy(locals())
self.args["ros"]["wait_for_message"].pop("self")

def set_ros_rate_args(self, hz, reset=False):
self.args["ros"]["rate"] = deepcopy(locals())
self.args["ros"]["rate"].pop("self")

def set_mqtt_client_args(
self, client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp"):
self.args["mqtt"]["client"] = deepcopy(locals())
self.args["mqtt"]["client"].pop("self")

def set_mqtt_connect_args(self, host, port=1883, keepalive=60, bind_address=""):
self.args["mqtt"]["connect"] = deepcopy(locals())
self.args["mqtt"]["connect"].pop("self")

def set_mqtt_publish_args(self, topic, payload=None, qos=0, retain=False):
self.args["mqtt"]["publish"] = deepcopy(locals())
self.args["mqtt"]["publish"].pop("self")

def set_mqtt_subscribe_args(self, topic, qos=0):
self.args["mqtt"]["subscribe"] = deepcopy(locals())
self.args["mqtt"]["subscribe"].pop("self")

def set_mqtt_tls_args(
self, ca_certs=None, certfile=None, keyfile=None, cert_reqs=None, tls_version=PROTOCOL_TLSv1_2,
ciphers=None):
self.args["mqtt"]["tls"] = deepcopy(locals())
self.args["mqtt"]["tls"].pop("self")

@staticmethod
def get_data_class_from_message_type(message_type):
splitted_message_type = message_type.split("/")
message_module_name = splitted_message_type[0] + ".msg"
message_class_name = "msg." + splitted_message_type[1]
message_module = __import__(message_module_name)
return eval("message_module." + message_class_name)
56 changes: 32 additions & 24 deletions ros_mqtt_bridge/mqtt_to_ros.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,25 @@
import rospy
import paho.mqtt.client as mqtt

from ros_mqtt_bridge.args_setters import ArgsSetters
from ros_mqtt_bridge.attr_dict import AttrDict


class MQTTToROS(object):
class MQTTToROS(ArgsSetters):

DEFAULT_NODE_NAME = "mqtt_to_ros"
def __init__(self, from_topic, to_topic, message_type):
super(MQTTToROS, self).__init__(message_type)

def __init__(
self, host, port, from_topic, to_topic, message_module_name, message_class_name,
node_name=None, keepalive=60, queue_size=1, rospy_rate=10
):
self.__from_topic = from_topic
self.__client = mqtt.Client(protocol=mqtt.MQTTv311)
self.__client.on_connect = self.__on_connect
self.__client.on_message = self.__on_message
self.__client.connect(host, port=port, keepalive=keepalive)
self.__mqtt_client = None
self.__ros_publisher = None

message_module = __import__(message_module_name)
message_class = eval("message_module." + message_class_name)
self.__ros_publisher = rospy.Publisher(to_topic, message_class, queue_size=queue_size)
if node_name is None:
rospy.init_node(MQTTToROS.DEFAULT_NODE_NAME, anonymous=True)
else:
rospy.init_node(node_name)
self.__rospy_rate = rospy.Rate(rospy_rate)

def __del__(self):
self.__client.disconnect()
self.args["mqtt"]["subscribe"]["topic"] = from_topic
self.args["ros"]["publisher"]["name"] = to_topic
self.args["ros"]["publisher"]["data_class"] = self.args["ros"]["data_class"]

def __on_connect(self, _client, _userdata, _flags, response_code):
if response_code == 0:
self.__client.subscribe(self.__from_topic)
self.__mqtt_client.subscribe(**self.args["mqtt"]["subscribe"])
else:
print('connect status {0}'.format(response_code))

Expand All @@ -46,8 +33,29 @@ def __on_message(self, _client, _user_data, message_data):
message_attrdict = AttrDict.set_recursively(message_dict)
self.__ros_publisher.publish(**message_attrdict)

def connect_ros(self):
if "name" not in self.args["ros"]["init_node"]:
self.args["ros"]["init_node"]["name"] = "ros_mqtt_bridge"
self.args["ros"]["init_node"]["anonymous"] = True
self.__ros_publisher = rospy.Publisher(**self.args["ros"]["publisher"])
rospy.init_node(**self.args["ros"]["init_node"])

def connect_mqtt(self):
self.__mqtt_client = mqtt.Client(**self.args["mqtt"]["client"])
if self.args["mqtt"]["tls"] is not None:
self.set_mqtt_tls()
self.__mqtt_client.on_connect = self.__on_connect
self.__mqtt_client.on_message = self.__on_message
self.__mqtt_client.connect(**self.args["mqtt"]["connect"])

def set_mqtt_tls(self):
self.__mqtt_client.tls_set(**self.args["mqtt"]["tls"])
self.__mqtt_client.tls_insecure_set(True)

def start(self):
self.__client.loop_start()
self.connect_ros()
self.connect_mqtt()
self.__mqtt_client.loop_start()
try:
rospy.spin()
except rospy.ROSInterruptException:
Expand Down
53 changes: 30 additions & 23 deletions ros_mqtt_bridge/ros_to_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,47 @@
import rospy
import paho.mqtt.client as mqtt

from ros_mqtt_bridge.args_setters import ArgsSetters

class ROSToMQTT(object):

DEFAULT_NODE_NAME = "ros_to_mqtt"
class ROSToMQTT(ArgsSetters):

def __init__(
self, host, port, from_topic, to_topic, message_module_name, message_class_name,
node_name=None, keepalive=3600, rospy_rate=10, qos=0
):
self.__from_topic = from_topic
self.__to_topic = to_topic
self.__qos = qos
def __init__(self, from_topic, to_topic, message_type):
super(ROSToMQTT, self).__init__(message_type)

message_module = __import__(message_module_name)
self.__message_class = eval("message_module." + message_class_name)
self.__mqtt_client = None

if node_name is None:
rospy.init_node(ROSToMQTT.DEFAULT_NODE_NAME, anonymous=True)
else:
rospy.init_node(node_name)
self.__rospy_rate = rospy.Rate(rospy_rate)
self.args["mqtt"]["publish"]["topic"] = to_topic
self.args["ros"]["wait_for_message"]["topic"] = from_topic
self.args["ros"]["wait_for_message"]["topic_type"] = self.args["ros"]["data_class"]

self.__client = mqtt.Client(protocol=mqtt.MQTTv311)
self.__client.connect(host, port=port, keepalive=keepalive)
def connect_ros(self):
if "name" not in self.args["ros"]["init_node"]:
self.args["ros"]["init_node"]["name"] = "ros_mqtt_bridge"
self.args["ros"]["init_node"]["anonymous"] = True
rospy.init_node(**self.args["ros"]["init_node"])

def connect_mqtt(self):
self.__mqtt_client = mqtt.Client(**self.args["mqtt"]["client"])
if self.args["mqtt"]["tls"] is not None:
self.set_mqtt_tls()
self.__mqtt_client.connect(**self.args["mqtt"]["connect"])

def set_mqtt_tls(self):
self.__mqtt_client.tls_set(**self.args["mqtt"]["tls"])
self.__mqtt_client.tls_insecure_set(True)

def start(self):
self.connect_mqtt()
self.connect_ros()
self.__rospy_rate = rospy.Rate(**self.args["ros"]["rate"])
while not rospy.is_shutdown():
try:
message_yaml = str(rospy.wait_for_message(self.__from_topic, self.__message_class))
payload = json.dumps(yaml.load(message_yaml))
self.__client.publish(self.__to_topic, payload=payload, qos=self.__qos)
message_yaml = str(rospy.wait_for_message(**self.args["ros"]["wait_for_message"]))
self.args["mqtt"]["publish"]["payload"] = json.dumps(yaml.load(message_yaml))
self.__mqtt_client.publish(**self.args["mqtt"]["publish"])
self.__rospy_rate.sleep()
except rospy.ROSException:
pass
except rospy.ROSInterruptException:
break

self.__rospy_rate.sleep()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name='ros_mqtt_bridge',
version='0.1',
version='0.2',
description='ros_mqtt_bridge provides bridge between ROS and MQTT(message serialized by JSON).',
long_description=long_description,
url='https://github.com/CPFL/ros_mqtt_bridge',
Expand Down

0 comments on commit 73a4163

Please sign in to comment.