Skip to content

Commit

Permalink
better README
Browse files Browse the repository at this point in the history
  • Loading branch information
Vasco committed Jul 26, 2017
1 parent 12198a5 commit 68fb3ea
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 20 deletions.
212 changes: 194 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ RosbagInputFormat is an open source **splittable** Hadoop InputFormat for the RO
# Usage

1. Download latest release jar file and put it in classpath
2. Extract the index configuration of your ROS bag file e.g.
2. Extract the index configuration of your ROS bag file. **The extracted index is a very very small configuration** file containing a protobuf array that will be given in the job configuration. **Note that the operation will not process and it will not parse** the whole bag file, but will simply seek to the required offset. e.g.
```bash
java -jar lib/rosbaginputformat_2.11-0.9.0.jar -f /srv/data/HMB_4.bag
java -jar lib/rosbaginputformat_2.11-0.9.3.jar -f /srv/data/HMB_4.bag
# will create an idx.bin config file /srv/data/HMB_4.bag.idx.bin
```
3. Put the ROS bag file in HDFS e.g.
Expand All @@ -23,8 +23,6 @@ sc.newAPIHadoopFile(
conf = {"RosbagInputFormat.chunkIdx":"/srv/data/HMB_4.bag.idx.bin"})
```

**The extracted index is a very very small configuration** file containing a protobuf array that will be given in the job configuration. **Note that the operation will not process and it will not parse** the whole bag file, but will simply seek to the required offset.

Example data can be found for instance at https://github.com/udacity/self-driving-car/tree/master/datasets published under MIT License.

# Documentation
Expand All @@ -33,7 +31,7 @@ The [doc/](doc/) folder contains a jupyter notebook with a few basic usage examp
<p align="center"><img src="doc/images/concept.png" height="350">
</p>

# Demo
# Tutorial

## To test locally use the Dockerfile

Expand All @@ -53,43 +51,221 @@ It leaves the user in a bash shell.

Point your browser to the local [URL](http://localhost:8888/) and enjoy the tutorial. The access token is printed in the docker container console.

### Usage from Spark (pyspark)

Example data can be found for instance at https://github.com/udacity/self-driving-car/tree/master/datasets published under MIT License.

Check that the rosbag file version is V2.0

```bash
java -jar lib/rosbaginputformat_2.11-0.9.3.jar --version -f HMB_1.bag
```

### Extract the index as configuration

The index is a very very small configuration file containing a protobuf array that will be given in the job configuration.
Note that the operation will not process and it will not parse the whole bag file, but will simply seek to the required offset.

```bash
# assuming you start the notebook in the doc/ folder
java -jar ../lib/rosbaginputformat_2.11-0.9.0-SNAPSHOT.jar \
-f /srv/data/HMB_4.bag

hdfs dfs -ls
```

This will generate a very small file named HMB_1.bag.idx.bin in the same folder.

### Copy the bag file in HDFS

Using your favorite tool put the bag file in your working HDFS folder.

***Note***: keep the index file as configuration to your jobs, ***do not*** put small files in HDFS.
For convenience we already provide an example file (/srv/data/HMB_4.bag) in the HDFS under /user/root/

```bash
hdfs dfs -put /srv/data/HMB_4.bag
hdfs dfs -ls
```
<p align="center"><img src="doc/images/rosbag-analytics.png">
</p>

+ Hadoop InputFormat and Record Reader for Rosbag
+ Process Rosbag with Spark, Yarn, MapReduce, Hadoop Streaming API, …
+ Spark RDD are cached and optimised for analysis

### Process the ROS bag file in Spark using the RosbagInputFormat

***Note***: your HDFS address might differ.
```python
fin = sc.newAPIHadoopFile(
path = "hdfs://127.0.0.1:9000/user/root/HMB_4.bag",
inputFormatClass = "de.valtech.foss.RosbagMapInputFormat",
keyClass = "org.apache.hadoop.io.LongWritable",
valueClass = "org.apache.hadoop.io.MapWritable",
conf = {“RosbagInputFormat.chunkIdx”:”/srv/data/HMB_4.bag.idx.bin"})
```

### Interpret the Messages

To interpret the messages we need the connections.
We could get the connections as configuration as well. At the moment we decided to collect the connections into Spark driver in a dictionary and use it in the subsequent RDD actions.

We could get the connections as configuration as well. At the moment we decided to collect the connections into Spark driver in a dictionary and use it in the subsequent RDD actions. **Note** that in the next version of the RosbagInputFormater alternative implementations will be given.

Collect the connections from all Spark partitions of the bag file into the Spark driver.
Collect the connections from all Spark partitions of the bag file into the Spark driver
```python
conn_a = fin.filter(lambda r: r[1]['header']['op'] == 7).map(lambda r: r[1]).collect()
conn_a = fin.filter(
lambda r: r[1]['header']['op'] == 7
).map(
lambda r: r[1]
).collect()
conn_d = {str(k['header']['topic']):k for k in conn_a}

# see topic names
conn_d.keys()
```

From all ROS bag splits we collect into Spark driver the connection messages (op=7 in header) where the ROS definitions are stored. This operation happens in parallel of course.

### Load the python map functions from src/main/python/functions.py
```python
%run -i src/main/python/functions.py
```bash
%run -i ../src/main/python/functions.py
```
At the moment the file contains a single mapper function named msg_map.

### Use of msg_map to apply a function on all messages

Python rosbag.bag needs to be installed on all Spark workers. The msg_map function (from src/main/python/functions.py) takes three arguments:

r = the message or RDD record Tuple
func = a function (default str) to apply to the ROS message
conn = a connection to specify what topic to process
Python rosbag.bag needs to be installed on all Spark workers. The msg_map function (from src/main/python/functions.py) takes three arguments:
1. r = the message or RDD record Tuple
2. func = a function (default str) to apply to the ROS message
3. conn = a connection to specify what topic to process

```python
%matplotlib nbagg
# use %matplotlib notebook in python3
from functools import partial
import pandas as pd
import numpy as np


# Take 3 messages from '/imu/data' topic using default str func
# Take messages from '/imu/data' topic using default str func
rdd = fin.flatMap(
partial(msg_map, conn=conn_d['/imu/data'])
)
```

The connection dictionary is sent over the closure to the workers that uses it in the msg_map.

rdd.take(3)
<p align="center"><img src="doc/images/connection.png" /></p>

```python
print(rdd.take(1)[0])
```

```
header:
seq: 1701626
stamp:
secs: 1479425728
nsecs: 747487068
frame_id: /imu
orientation:
x: -0.0251433756238
y: 0.0284643176884
z: -0.0936542998233
w: 0.994880191333
orientation_covariance: [0.017453292519943295, 0.0, 0.0, 0.0, 0.017453292519943295, 0.0, 0.0, 0.0, 0.15707963267948966]
angular_velocity:
x: 0.0
y: 0.0
z: 0.0
angular_velocity_covariance: [-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0]
linear_acceleration:
x: 1.16041922569
y: 0.595418334007
z: 10.7565326691
linear_acceleration_covariance: [0.0004, 0.0, 0.0, 0.0, 0.0004, 0.0, 0.0, 0.0, 0.0004]
```


### Image data from camera messages

An example of taking messages using a func other than default str.
In our case we apply a lambda to messages from from '/center_camera/image_color/compressed' topic. As usual with Spark the operation will happen in parallel on all workers.

```python
from PIL import Image
from io import BytesIO

res = fin.flatMap(
partial(msg_map, func=lambda r: r.data, conn=conn_d['/center_camera/image_color/compressed'])
).take(50)

Image.open(BytesIO(res[48]))
```

<p align="center"><img src="doc/images/car.png" /></p>


### Plot fuel level

The topic /vehicle/fuel_level_report contains 2215 ROS messages. Let us plot the header.stamp in seconds vs. fuel_level using a pandas dataframe.

```python
def f(msg):
return (msg.header.stamp.secs, msg.fuel_level)

d = fin.flatMap(
partial(msg_map, func=f, conn=conn_d['/vehicle/fuel_level_report'])
).toDF().toPandas()

d.set_index(‘_1').plot()
```
<p align="center"><img src="doc/images/plot.png" /></p>

### Machine Learning models on Spark workers

A dot product Keras "model" for each message from a topic. We will compare it with the one computed with numpy.

***Note*** that the imports happen in the workers and not in driver. On the other hand the connection dictionary is sent over the closure.

```python
def f(msg):
from keras.layers import dot, Dot, Input
from keras.models import Model

linear_acceleration = {
'x': msg.linear_acceleration.x,
'y': msg.linear_acceleration.y,
'z': msg.linear_acceleration.z,
}

linear_acceleration_covariance = np.array(msg.linear_acceleration_covariance)

i1 = Input(shape=(3,))
i2 = Input(shape=(3,))
o = dot([i1,i2], axes=1)

model = Model([i1,i2], o)

# return a tuple with (numpy dot product, keras dot "predict")
return (
np.dot(linear_acceleration_covariance.reshape(3,3),
[linear_acceleration['x'], linear_acceleration['y'], linear_acceleration['z']]),
model.predict([
np.array([[ linear_acceleration['x'], linear_acceleration['y'], linear_acceleration['z'] ]]),
linear_acceleration_covariance.reshape((3,3))])
)

fin.flatMap(partial(msg_map, func=f, conn=conn_d['/vehicle/imu/data_raw'])).take(5)

# tuple with (numpy dot product, keras dot “predict”)
```
One can sample of course and collect the data in the driver to train a model on one single machine.
Note that the msg is the most granular unit but you could replace the flatMap with a mapPartitions to apply such a Keras function to a whole split.

Another option would be to have a map.reduceByKey before the flatMap so that the function argument would be a whole interval instead of a msg. The idea is to key on time.

We hope that the RosbagInputFormat would be useful to you.

## Please do not forget to send us your [feedback](AUTHORS).
![doc/images/browse-tutorial.png](doc/images/browse-tutorial.png)
4 changes: 2 additions & 2 deletions doc/Tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"\n",
"## Check that the rosbag file version is V2.0\n",
"```bash\n",
"java -jar lib/rosbaginputformat_2.11-0.9.0-SNAPSHOT.jar --version -f HMB_1.bag\n",
"java -jar lib/rosbaginputformat_2.11-0.9.3.jar --version -f HMB_1.bag\n",
"```\n",
"\n",
"## Extract the index as configuration\n",
Expand All @@ -32,7 +32,7 @@
"source": [
"%%bash\n",
"# assuming you start the notebook in the doc/ folder \n",
"java -jar ../lib/rosbaginputformat_2.11-0.9.0-SNAPSHOT.jar -f /srv/data/HMB_4.bag"
"java -jar ../lib/rosbaginputformat_2.11-0.9.3.jar -f /srv/data/HMB_4.bag"
]
},
{
Expand Down
Binary file added doc/images/car.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/images/connection.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/images/plot.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/images/ros-bag.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/images/rosbag-analytics.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added lib/rosbaginputformat_2.11-0.9.3.jar
Binary file not shown.

0 comments on commit 68fb3ea

Please sign in to comment.