-
Notifications
You must be signed in to change notification settings - Fork 103
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
11 changed files
with
1,475 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
import ast | ||
import os | ||
from pathlib import Path | ||
|
||
# r = open(Path(os.path.dirname(os.path.realpath(__file__))) / "test.py", 'r') | ||
# print(ast.dump(ast.parse(r.read()), indent=2)) | ||
# breakpoint() | ||
|
||
from inspect import cleandoc | ||
|
||
from scenic.syntax.compiler import compileScenicAST | ||
from scenic.syntax.parser import parse_file | ||
|
||
filename = Path(os.path.dirname(os.path.realpath(__file__))) / "dev.contract" | ||
scenic_ast = parse_file(filename) | ||
python_ast, _ = compileScenicAST(scenic_ast) | ||
print(ast.unparse(python_ast)) | ||
exec(compile(python_ast, filename, "exec")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,252 @@ | ||
""" A contract describing an automatic cruise control system""" | ||
|
||
import math | ||
import random | ||
from typing import Union | ||
import builtins | ||
import numpy | ||
|
||
from scenic.core.geometry import normalizeAngle | ||
from scenic.syntax.veneer import * | ||
from scenic.syntax.translator import scenarioFromFile | ||
from scenic.domains.driving.controllers import PIDLongitudinalController, PIDLateralController | ||
from scenic.domains.driving.actions import RegulatedControlAction | ||
|
||
# ## World File ## | ||
ENVIRONMENT = scenarioFromFile(localPath("highway.scenic"), mode2D=True) | ||
|
||
SEED=3 | ||
random.seed(SEED) | ||
numpy.random.seed(SEED) | ||
|
||
## Components ## | ||
|
||
# sensors components | ||
component NoisyDistanceSystem(stddev, init_seed=1, overestimate=True): | ||
""" A component that provides noisy distance to the car in front.""" | ||
sensors: | ||
self.leadDist: builtins.float as sensors_distance | ||
|
||
outputs: | ||
dist: builtins.float | ||
|
||
state: | ||
seed: builtins.int = init_seed | ||
|
||
body: | ||
noise = random.gauss(sigma=stddev) | ||
if overestimate: | ||
noise = abs(noise) | ||
noisy_distance = sensors_distance + noise | ||
state.seed = random.randint(1,1000000) | ||
return {"dist": noisy_distance} | ||
|
||
component Speedometer(): | ||
""" Fetches and outputss ground truth speed.""" | ||
sensors: | ||
self.speed: builtins.float as speed_val | ||
|
||
outputs: | ||
speed: builtins.float | ||
|
||
body: | ||
return {"speed": speed_val} | ||
|
||
component DirectionSystem(): | ||
""" A component that provides ground truth directional information.""" | ||
sensors: | ||
self.targetDir: builtins.float as sensors_direction | ||
self.heading: builtins.float as sensors_heading | ||
|
||
outputs: | ||
direction: builtins.float | ||
heading: builtins.float | ||
|
||
body: | ||
return {"direction": sensors_direction, "heading": sensors_heading} | ||
|
||
# Controller Signal Systems | ||
component PIDThrottleSystem(target_dist, max_speed): | ||
""" A simple PID controller that attempts to maintain a set distance | ||
from the car in front of it while regulating its speed. | ||
""" | ||
inputs: | ||
dist: builtins.float | ||
speed: builtins.float | ||
|
||
outputs: | ||
throttle: builtins.float | ||
|
||
state: | ||
# This is for speed, so maybe we need a specific one for distance? | ||
# Not sure what assumptions are made. | ||
pid_controller: PIDLongitudinalController = PIDLongitudinalController(K_D=0.1, K_I=0) | ||
|
||
body: | ||
throttle = state.pid_controller.run_step(dist-target_dist) | ||
|
||
# Basic speed limiter, don't accelerate if we're already going too fast. | ||
if speed >= max_speed: | ||
throttle = min(0, throttle) | ||
|
||
return {"throttle": float(throttle)} | ||
|
||
component ThrottleSafetyFilter(min_dist, min_slowdown, max_brake=5, buffer_padding=0): | ||
""" A component that modulates actions, passing them through unchanged | ||
unless we are dangerously close to the car in front of us, in which case | ||
the actions are swapped to brake with maximum force. | ||
""" | ||
sensors: | ||
self.timestep: builtins.float as timestep | ||
|
||
inputs: | ||
dist: builtins.float | ||
speed: builtins.float | ||
throttle: builtins.float | ||
|
||
outputs: | ||
modulated_throttle: builtins.float | ||
|
||
state: | ||
last_dist: Union[None, builtins.float] = None | ||
|
||
body: | ||
# In first timestep, don't take any action | ||
if state.last_dist is None: | ||
state.last_dist = dist | ||
return {"modulated_throttle": 0.0} | ||
|
||
# If we are in the "danger zone", brake HARD. Otherwise, pass through the inputs actions action. | ||
rel_speed = (state.last_dist - dist)/timestep | ||
stopping_time = math.ceil(rel_speed/min_slowdown)+1 | ||
rel_dist_covered = stopping_time*speed + (max_brake - min_slowdown)*(stopping_time*(stopping_time+1))/2 | ||
danger_dist = min_dist + buffer_padding + max(0, rel_dist_covered) | ||
|
||
# Update last_dist | ||
state.last_dist = dist | ||
|
||
print() | ||
print("REL", rel_speed) | ||
print("REAL_DIST:", dist, " DANGER_DIST:", danger_dist) | ||
|
||
if dist < danger_dist: | ||
print("EMERGENCY BRAKE!!!") | ||
return {"modulated_throttle": -1.0} | ||
else: | ||
return {"modulated_throttle": float(throttle)} | ||
|
||
component PIDSteeringSystem(): | ||
inputs: | ||
direction: builtins.float | ||
heading: builtins.float | ||
|
||
outputs: | ||
steer: builtins.float | ||
|
||
state: | ||
# This is for speed, so maybe we need a specific one for distance? | ||
# Not sure what assumptions are made. | ||
pid_controller: PIDLateralController = PIDLateralController() | ||
|
||
body: | ||
direction_err = normalizeAngle(normalizeAngle(heading) - normalizeAngle(direction)) | ||
steer = state.pid_controller.run_step(direction_err) | ||
|
||
return {"steer": steer} | ||
|
||
# Controller Boilerplate | ||
|
||
component ActionGenerator(): | ||
""" Given a throttle and steer signal, outputs a RegulatedControlAction.""" | ||
inputs: | ||
throttle: builtins.float | ||
steer: builtins.float | ||
|
||
outputs: | ||
control_action: RegulatedControlAction | ||
|
||
state: | ||
past_steer: builtins.float = 0.0 | ||
|
||
body: | ||
action = RegulatedControlAction(throttle, steer, state.past_steer, | ||
max_throttle=1, max_brake=1, max_steer=1) | ||
state.past_steer = steer | ||
return {"control_action": action} | ||
|
||
|
||
component ControlSystem(target_dist, max_speed, min_dist, min_slowdown): | ||
""" The control system for a car, combining a PID controller with a | ||
safety filter to generate actions. | ||
""" | ||
inputs: | ||
dist: builtins.float | ||
speed: builtins.float | ||
direction: builtins.float | ||
heading: builtins.float | ||
|
||
outputs: | ||
control_action: RegulatedControlAction | ||
|
||
compose: | ||
# Create sub-components | ||
pid_ts = PIDThrottleSystem(target_dist, max_speed) | ||
tsf = ThrottleSafetyFilter(min_dist, min_slowdown) | ||
pid_ss = PIDSteeringSystem() | ||
ag = ActionGenerator() | ||
|
||
# Connect sensors inputss | ||
connect dist to pid_ts.dist | ||
connect speed to pid_ts.speed | ||
connect dist to tsf.dist | ||
connect speed to tsf.speed | ||
connect direction to pid_ss.direction | ||
connect heading to pid_ss.heading | ||
|
||
# Connect pid throttle to filter | ||
connect pid_ts.throttle to tsf.throttle | ||
|
||
# Connect control signals to action generator | ||
connect tsf.modulated_throttle to ag.throttle | ||
connect pid_ss.steer to ag.steer | ||
|
||
# outputs the generated action | ||
connect ag.control_action to control_action | ||
|
||
component CarControls(): | ||
""" This component receives actions for the car and executes them | ||
Convention is that any non-None action passed into an action component | ||
is taken each turn. | ||
""" | ||
actions: | ||
control_action: RegulatedControlAction | ||
|
||
component Car(stddev, target_dist, max_speed, min_dist, min_slowdown): | ||
compose: | ||
ps = NoisyDistanceSystem(stddev) | ||
sm = Speedometer() | ||
ds = DirectionSystem() | ||
cs = ControlSystem(target_dist, max_speed, min_dist, min_slowdown) | ||
cc = CarControls() | ||
|
||
# Connect sensors inputss to controller | ||
connect ps.dist to cs.dist | ||
connect sm.speed to cs.speed | ||
connect ds.direction to cs.direction | ||
connect ds.heading to cs.heading | ||
|
||
# Connect controller actions to car controls | ||
connect cs.control_action to cc.control_action | ||
|
||
# Instantiate Car component and link to object. | ||
# NOTE: This will set the behavior of the object (based off the Car component). | ||
# If the linked Scenic object already has a behavior defined, we should override | ||
# it or throw an error. | ||
STDDEV = 3 | ||
TARGET_DIST = 10 | ||
MAX_SPEED = 26.8224 # 60 mph in m/s | ||
MIN_DIST = 0 | ||
MIN_SLOWDOWN = 4.57 # 15 feet per second in m/s | ||
|
||
implement EgoCar with Car(STDDEV, TARGET_DIST, MAX_SPEED, MIN_DIST, MIN_SLOWDOWN) as car | ||
runComponentsSimulation(ENVIRONMENT, [car]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
param map = localPath('../../assets/maps/CARLA/Town06.xodr') | ||
param carla_map = 'Town06' | ||
model scenic.simulators.newtonian.driving_model | ||
|
||
import math | ||
import shapely | ||
|
||
from scenic.core.distributions import distributionFunction | ||
from scenic.core.type_support import toVector | ||
|
||
STARTING_DISTANCE = 5 | ||
|
||
roads = network.roads | ||
select_road = Uniform(*roads) | ||
select_lane = Uniform(*select_road.lanes) | ||
|
||
# Lead distance functions | ||
def leadDistanceInner(pos, tpos, lane, maxDistance): | ||
pos = lane.centerline.project(toVector(pos)) | ||
tpos = toVector(tpos) | ||
if not lane.containsPoint(tpos): | ||
# Check if we are in the same lane as the target. If not, | ||
# advance to the start of the any possible successor lane. | ||
covered_dist = lane.centerline.length - shapely.line_locate_point(lane.centerline.lineString, shapely.Point(*pos)) | ||
succ_lanes = [m.connectingLane if m.connectingLane else m.endLane for m in lane.maneuvers] | ||
new_pos = lane.centerline.end | ||
|
||
remMaxDistance = maxDistance-covered_dist | ||
if remMaxDistance <= 0: | ||
return float("inf") | ||
|
||
rem_dist = min((leadDistanceInner(new_pos, tpos, new_lane, remMaxDistance) for new_lane in succ_lanes), default=maxDistance) | ||
return covered_dist + rem_dist | ||
|
||
# If we're in the same lane as the target, return the accumulated distance plus | ||
# the remaining distance to the point | ||
passed_dist = shapely.line_locate_point(lane.centerline.lineString, shapely.Point(*pos)) | ||
total_dist = shapely.line_locate_point(lane.centerline.lineString, shapely.Point(*tpos)) | ||
return total_dist - passed_dist | ||
|
||
def leadDistance(source, target, maxDistance=250): | ||
# Find all lanes this point could be a part of and recurse on them. | ||
viable_lanes = [lane for lane in network.lanes if lane.containsPoint(source.position)] | ||
|
||
return min(min((leadDistanceInner(source, target, lane, maxDistance) for lane in viable_lanes), default=maxDistance), maxDistance) | ||
|
||
behavior BrakeChecking(): | ||
while True: | ||
do FollowLaneBehavior() for Range(1,4) seconds | ||
while self.speed > 0.1: | ||
take SetBrakeAction(1) | ||
|
||
# Set up lead and ego cars | ||
leadCar = new Car on select_lane.centerline, | ||
with behavior BrakeChecking() | ||
|
||
|
||
class EgoCar(Car): | ||
targetDir[dynamic, final]: float(roadDirection[self.position].yaw) | ||
|
||
ego = new EgoCar at roadDirection.followFrom(toVector(leadCar), -STARTING_DISTANCE, stepSize=0.1), | ||
with leadDist STARTING_DISTANCE, with behavior FollowLaneBehavior(), with name "EgoCar", with timestep 0.1 | ||
|
||
# Create/activate monitor to store lead distance | ||
monitor UpdateDistance(tailCar, leadCar): | ||
while True: | ||
tailCar.leadDist = float(leadDistance(tailCar, leadCar)) | ||
wait | ||
|
||
require monitor UpdateDistance(ego, leadCar) | ||
|
||
record ego.leadDist | ||
record ego.targetDir |
Oops, something went wrong.