forked from MHarbi/bagedit
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbag2csv.py
executable file
·52 lines (43 loc) · 1.54 KB
/
bag2csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/usr/bin/env python3
import argparse
import rosbag
import pandas as pd
from tqdm import tqdm
def process_msg(msg, row=None, prefix="", t=None, ref_time=None):
if row is None:
row = {}
if t is not None:
row["bag_time"] = t
if ref_time is not None:
row["offset_time"] = t - ref_time
if isinstance(msg, int) or isinstance(msg, float) or isinstance(msg, str):
row[prefix] = msg
elif isinstance(msg, list) or isinstance(msg, tuple):
for i, e in enumerate(msg):
process_msg(e, row=row, prefix=f"{prefix}[{str(i)}]")
else:
if len(prefix) > 0 and not prefix.endswith('.'):
prefix = prefix + '.'
for k in msg.__slots__:
process_msg(getattr(msg, k), row=row, prefix=f"{prefix}{k}")
return row
def to_csv(bag, topic, out):
"""
:type bag: rosbag.Bag
"""
rows = []
ref_time = bag.get_start_time()
for topic, msg, t in tqdm(bag.read_messages(topics=[topic]), desc="Extracting messages"):
rows.append(process_msg(msg, t=t.to_sec(), ref_time=ref_time))
df = pd.DataFrame(rows)
df.to_csv(out, index=False)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Extract images from a ROS bag.")
parser.add_argument("bag_file", help="Input ROS bag.")
parser.add_argument("topic")
parser.add_argument("output_file")
args = parser.parse_args()
bag = rosbag.Bag(args.bag_file, "r")
with open(args.output_file, 'w') as out:
to_csv(bag, args.topic, out)
bag.close()