forked from autonomousvision/transfuser
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodel.py
188 lines (151 loc) · 6.07 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
from collections import deque
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
from torchvision import models
class ImageCNN(nn.Module):
""" Encoder network for image input list.
Args:
c_dim (int): output dimension of the latent embedding
normalize (bool): whether the input images should be normalized
"""
def __init__(self, c_dim, normalize=True):
super().__init__()
self.normalize = normalize
self.features = models.resnet34(pretrained=True)
self.features.fc = nn.Sequential()
def forward(self, inputs):
c = 0
for x in inputs:
if self.normalize:
x = normalize_imagenet(x)
c += self.features(x)
return c
def normalize_imagenet(x):
""" Normalize input images according to ImageNet standards.
Args:
x (tensor): input images
"""
x = x.clone()
x[:, 0] = (x[:, 0] - 0.485) / 0.229
x[:, 1] = (x[:, 1] - 0.456) / 0.224
x[:, 2] = (x[:, 2] - 0.406) / 0.225
return x
class LidarEncoder(nn.Module):
"""
Encoder network for LiDAR input list
Args:
num_classes: output feature dimension
in_channels: input channels
"""
def __init__(self, num_classes=512, in_channels=2):
super().__init__()
self._model = models.resnet18()
self._model.fc = nn.Sequential()
_tmp = self._model.conv1
self._model.conv1 = nn.Conv2d(in_channels, out_channels=_tmp.out_channels,
kernel_size=_tmp.kernel_size, stride=_tmp.stride, padding=_tmp.padding, bias=_tmp.bias)
def forward(self, inputs):
features = 0
for lidar_data in inputs:
lidar_feature = self._model(lidar_data)
features += lidar_feature
return features
class PIDController(object):
def __init__(self, K_P=1.0, K_I=0.0, K_D=0.0, n=20):
self._K_P = K_P
self._K_I = K_I
self._K_D = K_D
self._window = deque([0 for _ in range(n)], maxlen=n)
self._max = 0.0
self._min = 0.0
def step(self, error):
self._window.append(error)
self._max = max(self._max, abs(error))
self._min = -abs(self._max)
if len(self._window) >= 2:
integral = np.mean(self._window)
derivative = (self._window[-1] - self._window[-2])
else:
integral = 0.0
derivative = 0.0
return self._K_P * error + self._K_I * integral + self._K_D * derivative
class LateFusion(nn.Module):
'''
Image + LiDAR network with waypoint output and pid controller
'''
def __init__(self, config, device):
super().__init__()
self.config = config
self.device = device
self.turn_controller = PIDController(K_P=config.turn_KP, K_I=config.turn_KI, K_D=config.turn_KD, n=config.turn_n)
self.speed_controller = PIDController(K_P=config.speed_KP, K_I=config.speed_KI, K_D=config.speed_KD, n=config.speed_n)
self.image_encoder = ImageCNN(512, normalize=True).to(self.device)
self.lidar_encoder = LidarEncoder(num_classes=512, in_channels=2).to(self.device)
self.join = nn.Sequential(
nn.Linear(512, 256),
nn.ReLU(inplace=True),
nn.Linear(256, 128),
nn.ReLU(inplace=True),
nn.Linear(128, 64),
nn.ReLU(inplace=True),
).to(self.device)
self.decoder = nn.GRUCell(input_size=4, hidden_size=64).to(self.device)
self.output = nn.Linear(64, 2).to(self.device)
def forward(self, feature_emb, target_point):
'''
Predicts future waypoints from image features and target point (goal location)
Args:
feature_emb (list): list of feature tensors
target_point (tensor): goal location registered to ego-frame
'''
feature_emb = sum(feature_emb)
z = self.join(feature_emb)
output_wp = list()
# initial input variable to GRU
x = torch.zeros(size=(z.shape[0], 2), dtype=z.dtype).to(self.device)
# autoregressive generation of output waypoints
for _ in range(self.config.pred_len):
x_in = torch.cat([x, target_point], dim=1)
z = self.decoder(x_in, z)
dx = self.output(z)
x = dx + x
output_wp.append(x)
pred_wp = torch.stack(output_wp, dim=1)
return pred_wp
def control_pid(self, waypoints, velocity):
'''
Predicts vehicle control with a PID controller.
Args:
waypoints (tensor): predicted waypoints
velocity (tensor): speedometer input
'''
assert(waypoints.size(0)==1)
waypoints = waypoints[0].data.cpu().numpy()
# flip y is (forward is negative in our waypoints)
waypoints[:,1] *= -1
speed = velocity[0].data.cpu().numpy()
aim = (waypoints[1] + waypoints[0]) / 2.0
angle = np.degrees(np.pi / 2 - np.arctan2(aim[1], aim[0])) / 90
steer = self.turn_controller.step(angle)
steer = np.clip(steer, -1.0, 1.0)
desired_speed = np.linalg.norm(waypoints[0] - waypoints[1]) * 2.0
brake = desired_speed < self.config.brake_speed or (speed / desired_speed) > self.config.brake_ratio
delta = np.clip(desired_speed - speed, 0.0, self.config.clip_delta)
throttle = self.speed_controller.step(delta)
throttle = np.clip(throttle, 0.0, self.config.max_throttle)
throttle = throttle if not brake else 0.0
metadata = {
'speed': float(speed.astype(np.float64)),
'steer': float(steer),
'throttle': float(throttle),
'brake': float(brake),
'wp_2': tuple(waypoints[1].astype(np.float64)),
'wp_1': tuple(waypoints[0].astype(np.float64)),
'desired_speed': float(desired_speed.astype(np.float64)),
'angle': float(angle.astype(np.float64)),
'aim': tuple(aim.astype(np.float64)),
'delta': float(delta.astype(np.float64)),
}
return steer, throttle, brake, metadata