diff --git a/examples/contracts/compile.py b/examples/contracts/compile.py new file mode 100644 index 000000000..788dc9fe0 --- /dev/null +++ b/examples/contracts/compile.py @@ -0,0 +1,18 @@ +import ast +import os +from pathlib import Path + +# r = open(Path(os.path.dirname(os.path.realpath(__file__))) / "test.py", 'r') +# print(ast.dump(ast.parse(r.read()), indent=2)) +# breakpoint() + +from inspect import cleandoc + +from scenic.syntax.compiler import compileScenicAST +from scenic.syntax.parser import parse_file + +filename = Path(os.path.dirname(os.path.realpath(__file__))) / "dev.contract" +scenic_ast = parse_file(filename) +python_ast, _ = compileScenicAST(scenic_ast) +print(ast.unparse(python_ast)) +exec(compile(python_ast, filename, "exec")) diff --git a/examples/contracts/dev.contract b/examples/contracts/dev.contract new file mode 100644 index 000000000..2d3d4e36f --- /dev/null +++ b/examples/contracts/dev.contract @@ -0,0 +1,252 @@ +""" A contract describing an automatic cruise control system""" + +import math +import random +from typing import Union +import builtins +import numpy + +from scenic.core.geometry import normalizeAngle +from scenic.syntax.veneer import * +from scenic.syntax.translator import scenarioFromFile +from scenic.domains.driving.controllers import PIDLongitudinalController, PIDLateralController +from scenic.domains.driving.actions import RegulatedControlAction + +# ## World File ## +ENVIRONMENT = scenarioFromFile(localPath("highway.scenic"), mode2D=True) + +SEED=3 +random.seed(SEED) +numpy.random.seed(SEED) + +## Components ## + +# sensors components +component NoisyDistanceSystem(stddev, init_seed=1, overestimate=True): + """ A component that provides noisy distance to the car in front.""" + sensors: + self.leadDist: builtins.float as sensors_distance + + outputs: + dist: builtins.float + + state: + seed: builtins.int = init_seed + + body: + noise = random.gauss(sigma=stddev) + if overestimate: + noise = abs(noise) + noisy_distance = sensors_distance + noise + state.seed = random.randint(1,1000000) + return {"dist": noisy_distance} + +component Speedometer(): + """ Fetches and outputss ground truth speed.""" + sensors: + self.speed: builtins.float as speed_val + + outputs: + speed: builtins.float + + body: + return {"speed": speed_val} + +component DirectionSystem(): + """ A component that provides ground truth directional information.""" + sensors: + self.targetDir: builtins.float as sensors_direction + self.heading: builtins.float as sensors_heading + + outputs: + direction: builtins.float + heading: builtins.float + + body: + return {"direction": sensors_direction, "heading": sensors_heading} + +# Controller Signal Systems +component PIDThrottleSystem(target_dist, max_speed): + """ A simple PID controller that attempts to maintain a set distance + from the car in front of it while regulating its speed. + """ + inputs: + dist: builtins.float + speed: builtins.float + + outputs: + throttle: builtins.float + + state: + # This is for speed, so maybe we need a specific one for distance? + # Not sure what assumptions are made. + pid_controller: PIDLongitudinalController = PIDLongitudinalController(K_D=0.1, K_I=0) + + body: + throttle = state.pid_controller.run_step(dist-target_dist) + + # Basic speed limiter, don't accelerate if we're already going too fast. + if speed >= max_speed: + throttle = min(0, throttle) + + return {"throttle": float(throttle)} + +component ThrottleSafetyFilter(min_dist, min_slowdown, max_brake=5, buffer_padding=0): + """ A component that modulates actions, passing them through unchanged + unless we are dangerously close to the car in front of us, in which case + the actions are swapped to brake with maximum force. + """ + sensors: + self.timestep: builtins.float as timestep + + inputs: + dist: builtins.float + speed: builtins.float + throttle: builtins.float + + outputs: + modulated_throttle: builtins.float + + state: + last_dist: Union[None, builtins.float] = None + + body: + # In first timestep, don't take any action + if state.last_dist is None: + state.last_dist = dist + return {"modulated_throttle": 0.0} + + # If we are in the "danger zone", brake HARD. Otherwise, pass through the inputs actions action. + rel_speed = (state.last_dist - dist)/timestep + stopping_time = math.ceil(rel_speed/min_slowdown)+1 + rel_dist_covered = stopping_time*speed + (max_brake - min_slowdown)*(stopping_time*(stopping_time+1))/2 + danger_dist = min_dist + buffer_padding + max(0, rel_dist_covered) + + # Update last_dist + state.last_dist = dist + + print() + print("REL", rel_speed) + print("REAL_DIST:", dist, " DANGER_DIST:", danger_dist) + + if dist < danger_dist: + print("EMERGENCY BRAKE!!!") + return {"modulated_throttle": -1.0} + else: + return {"modulated_throttle": float(throttle)} + +component PIDSteeringSystem(): + inputs: + direction: builtins.float + heading: builtins.float + + outputs: + steer: builtins.float + + state: + # This is for speed, so maybe we need a specific one for distance? + # Not sure what assumptions are made. + pid_controller: PIDLateralController = PIDLateralController() + + body: + direction_err = normalizeAngle(normalizeAngle(heading) - normalizeAngle(direction)) + steer = state.pid_controller.run_step(direction_err) + + return {"steer": steer} + +# Controller Boilerplate + +component ActionGenerator(): + """ Given a throttle and steer signal, outputs a RegulatedControlAction.""" + inputs: + throttle: builtins.float + steer: builtins.float + + outputs: + control_action: RegulatedControlAction + + state: + past_steer: builtins.float = 0.0 + + body: + action = RegulatedControlAction(throttle, steer, state.past_steer, + max_throttle=1, max_brake=1, max_steer=1) + state.past_steer = steer + return {"control_action": action} + + +component ControlSystem(target_dist, max_speed, min_dist, min_slowdown): + """ The control system for a car, combining a PID controller with a + safety filter to generate actions. + """ + inputs: + dist: builtins.float + speed: builtins.float + direction: builtins.float + heading: builtins.float + + outputs: + control_action: RegulatedControlAction + + compose: + # Create sub-components + pid_ts = PIDThrottleSystem(target_dist, max_speed) + tsf = ThrottleSafetyFilter(min_dist, min_slowdown) + pid_ss = PIDSteeringSystem() + ag = ActionGenerator() + + # Connect sensors inputss + connect dist to pid_ts.dist + connect speed to pid_ts.speed + connect dist to tsf.dist + connect speed to tsf.speed + connect direction to pid_ss.direction + connect heading to pid_ss.heading + + # Connect pid throttle to filter + connect pid_ts.throttle to tsf.throttle + + # Connect control signals to action generator + connect tsf.modulated_throttle to ag.throttle + connect pid_ss.steer to ag.steer + + # outputs the generated action + connect ag.control_action to control_action + +component CarControls(): + """ This component receives actions for the car and executes them + Convention is that any non-None action passed into an action component + is taken each turn. + """ + actions: + control_action: RegulatedControlAction + +component Car(stddev, target_dist, max_speed, min_dist, min_slowdown): + compose: + ps = NoisyDistanceSystem(stddev) + sm = Speedometer() + ds = DirectionSystem() + cs = ControlSystem(target_dist, max_speed, min_dist, min_slowdown) + cc = CarControls() + + # Connect sensors inputss to controller + connect ps.dist to cs.dist + connect sm.speed to cs.speed + connect ds.direction to cs.direction + connect ds.heading to cs.heading + + # Connect controller actions to car controls + connect cs.control_action to cc.control_action + +# Instantiate Car component and link to object. +# NOTE: This will set the behavior of the object (based off the Car component). +# If the linked Scenic object already has a behavior defined, we should override +# it or throw an error. +STDDEV = 3 +TARGET_DIST = 10 +MAX_SPEED = 26.8224 # 60 mph in m/s +MIN_DIST = 0 +MIN_SLOWDOWN = 4.57 # 15 feet per second in m/s + +implement EgoCar with Car(STDDEV, TARGET_DIST, MAX_SPEED, MIN_DIST, MIN_SLOWDOWN) as car +runComponentsSimulation(ENVIRONMENT, [car]) diff --git a/examples/contracts/highway.scenic b/examples/contracts/highway.scenic new file mode 100644 index 000000000..9da7a9b54 --- /dev/null +++ b/examples/contracts/highway.scenic @@ -0,0 +1,73 @@ +param map = localPath('../../assets/maps/CARLA/Town06.xodr') +param carla_map = 'Town06' +model scenic.simulators.newtonian.driving_model + +import math +import shapely + +from scenic.core.distributions import distributionFunction +from scenic.core.type_support import toVector + +STARTING_DISTANCE = 5 + +roads = network.roads +select_road = Uniform(*roads) +select_lane = Uniform(*select_road.lanes) + +# Lead distance functions +def leadDistanceInner(pos, tpos, lane, maxDistance): + pos = lane.centerline.project(toVector(pos)) + tpos = toVector(tpos) + if not lane.containsPoint(tpos): + # Check if we are in the same lane as the target. If not, + # advance to the start of the any possible successor lane. + covered_dist = lane.centerline.length - shapely.line_locate_point(lane.centerline.lineString, shapely.Point(*pos)) + succ_lanes = [m.connectingLane if m.connectingLane else m.endLane for m in lane.maneuvers] + new_pos = lane.centerline.end + + remMaxDistance = maxDistance-covered_dist + if remMaxDistance <= 0: + return float("inf") + + rem_dist = min((leadDistanceInner(new_pos, tpos, new_lane, remMaxDistance) for new_lane in succ_lanes), default=maxDistance) + return covered_dist + rem_dist + + # If we're in the same lane as the target, return the accumulated distance plus + # the remaining distance to the point + passed_dist = shapely.line_locate_point(lane.centerline.lineString, shapely.Point(*pos)) + total_dist = shapely.line_locate_point(lane.centerline.lineString, shapely.Point(*tpos)) + return total_dist - passed_dist + +def leadDistance(source, target, maxDistance=250): + # Find all lanes this point could be a part of and recurse on them. + viable_lanes = [lane for lane in network.lanes if lane.containsPoint(source.position)] + + return min(min((leadDistanceInner(source, target, lane, maxDistance) for lane in viable_lanes), default=maxDistance), maxDistance) + +behavior BrakeChecking(): + while True: + do FollowLaneBehavior() for Range(1,4) seconds + while self.speed > 0.1: + take SetBrakeAction(1) + +# Set up lead and ego cars +leadCar = new Car on select_lane.centerline, + with behavior BrakeChecking() + + +class EgoCar(Car): + targetDir[dynamic, final]: float(roadDirection[self.position].yaw) + +ego = new EgoCar at roadDirection.followFrom(toVector(leadCar), -STARTING_DISTANCE, stepSize=0.1), + with leadDist STARTING_DISTANCE, with behavior FollowLaneBehavior(), with name "EgoCar", with timestep 0.1 + +# Create/activate monitor to store lead distance +monitor UpdateDistance(tailCar, leadCar): + while True: + tailCar.leadDist = float(leadDistance(tailCar, leadCar)) + wait + +require monitor UpdateDistance(ego, leadCar) + +record ego.leadDist +record ego.targetDir diff --git a/src/scenic/contracts/components.py b/src/scenic/contracts/components.py new file mode 100644 index 000000000..6456022da --- /dev/null +++ b/src/scenic/contracts/components.py @@ -0,0 +1,341 @@ +from abc import ABC, abstractmethod +import graphlib +import warnings + +from scenic.core.dynamics.actions import Action +from scenic.core.specifiers import Specifier + + +class Component(ABC): + pass + + +class BaseComponent(Component): + def __init__(self, **kwargs): + # Extract linked Scenic object, if one exists. + assert "_SCENIC_INTERNAL_LINKED_OBJ_NAME" in kwargs + self.linkedObjectName = kwargs["_SCENIC_INTERNAL_LINKED_OBJ_NAME"] + del kwargs["_SCENIC_INTERNAL_LINKED_OBJ_NAME"] + + # Save args + self.kwargs = kwargs + + # Ensure that all input/output types are actually types + for input_type in self.inputs_types.values(): + assert isinstance(input_type, type) + for output_type in self.outputs_types.values(): + assert isinstance(output_type, type) + + # Initialize state + self.reset() + + def link(self, scene): + self.linkedObject = lookuplinkedObject(scene, self.linkedObjectName) + + def reset(self): + self.linkedObject = None + self.state = ComponentState(self.state_types) + + for name, val in self.state_inits.items(): + setattr(self.state, name, val) + + def run(self, inputs): + # Check validity of inputs + assert isinstance(inputs, dict) and set(inputs.keys()) == set( + self.inputs_types.keys() + ) + for input_name, input_val in inputs.items(): + assert isinstance(input_val, self.inputs_types[input_name]) + + # Extract sensor values and check validity + sensors = {} + for sensor_name, sensor_source in self.sensors_values.items(): + assert sensor_source[1] == "self" + assert hasattr(self.linkedObject, sensor_source[0]) + sensor_val = getattr(self.linkedObject, sensor_source[0]) + assert isinstance(sensor_val, self.sensors_types[sensor_name]) + sensors[sensor_name] = sensor_val + + # Run component to get outputs + outputs = self.run_inner( + self.state, *inputs.values(), *sensors.values(), **self.kwargs + ) + + # Check validity of outputs + assert isinstance(outputs, dict) and set(outputs.keys()) == set( + self.outputs_types.keys() + ) + for output_name, output_val in outputs.items(): + assert isinstance(output_val, self.outputs_types[output_name]) + + return (outputs, []) + + @staticmethod + @abstractmethod + def run_inner(): + pass + + +class ActionComponent(Component): + def __init__(self, **kwargs): + # Extract linked Scenic object, if one exists. + assert "_SCENIC_INTERNAL_LINKED_OBJ_NAME" in kwargs + self.linkedObjectName = kwargs["_SCENIC_INTERNAL_LINKED_OBJ_NAME"] + del kwargs["_SCENIC_INTERNAL_LINKED_OBJ_NAME"] + + # Save args + self.kwargs = kwargs + + # Ensure that all action types are actually types + for action_type in self.inputs_types.values(): + assert isinstance(action_type, type) + assert issubclass(action_type, Action) + + # Initialize state + self.reset() + + def link(self, scene): + self.linkedObject = lookuplinkedObject(scene, self.linkedObjectName) + + def reset(self): + self.linkedObject = None + + def run(self, actions): + for action_name, action in actions.items(): + assert action_name in self.inputs_types + assert isinstance(action, self.inputs_types[action_name]) + + return ({}, list(actions.values())) + + +class ComposeComponent(Component): + def __init__(self, **kwargs): + # Extract linked Scenic object, if one exists. + assert "_SCENIC_INTERNAL_LINKED_OBJ_NAME" in kwargs + self.linkedObjectName = kwargs["_SCENIC_INTERNAL_LINKED_OBJ_NAME"] + del kwargs["_SCENIC_INTERNAL_LINKED_OBJ_NAME"] + + # Save args + self.kwargs = kwargs + + # Ensure that all input/output types are actually types + for input_type in self.inputs_types.values(): + assert isinstance(input_type, type) + for output_type in self.outputs_types.values(): + assert isinstance(output_type, type) + + # Ensure that all subcomponent are actually components + for sc in self.subcomponents.values(): + assert isinstance(sc, Component) + + # Validate and construct component dataflow graph + self.dataflow_graph = {node: [] for node in self.subcomponents.keys()} + self.dataflow_graph["SELF_INPUT"] = [] + self.dataflow_graph["SELF_OUTPUT"] = [] + self.input_sources = {} + for source, target in self.connections: + # Extract parent components for source and target and check compatibility + if source[1] is None: + source_parent = "SELF_INPUT" + assert source[0] in self.inputs_types.keys() + source_type = self.inputs_types[source[0]] + else: + assert source[1] in self.subcomponents + source_parent = source[1] + assert source[0] in self.subcomponents[source[1]].outputs_types.keys() + source_type = self.subcomponents[source[1]].outputs_types[source[0]] + + if target[1] is None: + target_parent = "SELF_OUTPUT" + assert target[0] in self.outputs_types.keys() + target_type = self.outputs_types[target[0]] + else: + assert target[1] in self.subcomponents + target_parent = target[1] + assert target[0] in self.subcomponents[target[1]].inputs_types.keys() + target_type = self.subcomponents[target[1]].inputs_types[target[0]] + + # Add edge to dataflow graph + self.dataflow_graph[target_parent].append((source_parent)) + + # Log source of this input + self.input_sources[(target[0], target_parent)] = (source[0], source_parent) + + # Ensure all source ports have a source + for output_name in self.outputs_types.keys(): + assert (output_name, "SELF_OUTPUT") in self.input_sources + + for sc_name, sc_val in self.subcomponents.items(): + for input_name in self.subcomponents[sc_name].inputs_types.keys(): + assert (input_name, sc_name) in self.input_sources + + # TODO: Raise a warning if any of this component's inputs are not used. + + # Find valid component evaluation order or raise error + sorter = graphlib.TopologicalSorter(self.dataflow_graph) + self.evaluation_order = [ + sc for sc in sorter.static_order() if sc not in {"SELF_INPUT", "SELF_OUTPUT"} + ] + + def link(self, scene): + self.linkedObject = lookuplinkedObject(scene, self.linkedObjectName) + for sc in self.subcomponents.values(): + sc.link(scene) + + def reset(self): + self.linkedObject = None + for sc in self.subcomponents.values(): + sc.reset() + + def run(self, inputs): + # Check validity of inputs + assert isinstance(inputs, dict) and set(inputs.keys()) == set( + self.inputs_types.keys() + ) + for input_name, input_val in inputs.items(): + assert isinstance(input_val, self.inputs_types[input_name]) + + # Initialize actions list/values dictionary and load in inputs + actions = [] + values = { + (input_name, "SELF_INPUT"): input_val + for input_name, input_val in inputs.items() + } + + # Run all subcomponents + for sc_name in self.evaluation_order: + # Extract the subcomponent and its input values + sc = self.subcomponents[sc_name] + sc_inputs = { + input_name: values[self.input_sources[(input_name, sc_name)]] + for input_name in sc.inputs_types.keys() + } + + # Run the subcomponent and process the result + sc_outputs, sc_actions = sc.run(sc_inputs) + values.update( + { + (sc_output_name, sc_name): sc_output_val + for sc_output_name, sc_output_val in sc_outputs.items() + } + ) + actions += sc_actions + + # Extract and check validity of outputs + values.update( + { + (output_name, "SELF_OUTPUT"): values[ + self.input_sources[(output_name, "SELF_OUTPUT")] + ] + for output_name in self.outputs_types.keys() + } + ) + outputs = { + name[0]: val for name, val in values.items() if name[1] == "SELF_OUTPUT" + } + + assert isinstance(outputs, dict) and set(outputs.keys()) == set( + self.outputs_types.keys() + ) + for output_name, output_val in outputs.items(): + assert isinstance(output_val, self.outputs_types[output_name]) + + return outputs, actions + + +## Component state +class ComponentState: + def __init__(self, state_types): + object.__setattr__(self, "_state_types", {n: t for n, t in state_types.items()}) + + def __getattr__(self, name): + if name not in self._state_types.keys(): + raise ValueError(f"Attempted to access non-existant state variable '{name}'.") + + return object.__getattribute__(self, name) + + def __setattr__(self, name, value): + if name not in self._state_types.keys(): + raise ValueError(f"Attempted to set non-existant state variable '{name}'.") + + if not isinstance(value, self._state_types[name]): + raise ValueError( + f"Attempted to set state variable '{name}' to {value} of type {type(value)}." + ) + + object.__setattr__(self, name, value) + + +# Component behavior +class ComponentBehavior: + def __init__(self, behavior): + assert len(behavior.inputs_types) == 0 + self.behavior = behavior + self._agent = None + self._isRunning = False + + def _assignTo(self, agent): + if self._agent and agent is self._agent._dynamicProxy: + # Assigned again (e.g. by override) to same agent; do nothing. + return + self._start(agent) + + def _start(self, agent): + self._agent = agent + self._isRunning = True + + def _step(self): + _, actions = self.behavior.run({}) + print("THROTTLE", actions[0].throttle) + return tuple(actions) + + def _stop(self, reason=None): + self._agent = None + self._isRunning = False + + +# Utility functions +def lookuplinkedObject(scene, name): + target_objects = [ + obj for obj in scene.objects if hasattr(obj, "name") and obj.name == name + ] + + if len(target_objects) == 0: + raise RuntimeError(f"No object in scenario with name '{name}'") + + if len(target_objects) > 1: + raise RuntimeError(f"Multiple objects in scenario with name '{name}'") + + return target_objects[0] + + +def runComponentsSimulation(scenario, components, time=200): + # Generate a scene and override component behaviors. + scene, _ = scenario.generate() + + for component in components: + component.link(scene) + + if component.linkedObject.behavior: + warnings.warn( + f"Overriding behavior of {component.linkedObjectName} with component behavior." + ) + + component.linkedObject._override( + [ + Specifier( + "ComponentBehaviorOverride", + {"behavior": 1}, + {"behavior": ComponentBehavior(component)}, + ) + ] + ) + + # Instantiate simulator and run simulation + simulator = scenario.getSimulator() + + simulation = simulator.simulate(scene, maxSteps=time) + + # Reset components + for component in components: + component.reset() diff --git a/src/scenic/core/dynamics/scenarios.py b/src/scenic/core/dynamics/scenarios.py index 7f33c3ec9..75bd10f6a 100644 --- a/src/scenic/core/dynamics/scenarios.py +++ b/src/scenic/core/dynamics/scenarios.py @@ -8,6 +8,7 @@ import rv_ltl +from scenic.contracts.components import ComponentBehavior import scenic.core.dynamics as dynamics from scenic.core.errors import InvalidScenarioError from scenic.core.lazy_eval import DelayedArgument, needsLazyEvaluation @@ -202,7 +203,7 @@ def _start(self): # Initialize behavior coroutines of agents for agent in self._agents: behavior = agent.behavior - assert isinstance(behavior, Behavior), behavior + assert isinstance(behavior, (Behavior, ComponentBehavior)), behavior behavior._assignTo(agent) # Initialize monitor coroutines for monitor in self._monitors: diff --git a/src/scenic/core/object_types.py b/src/scenic/core/object_types.py index cb1c9b93f..7348d0e21 100644 --- a/src/scenic/core/object_types.py +++ b/src/scenic/core/object_types.py @@ -26,6 +26,7 @@ import shapely.affinity import trimesh +from scenic.contracts.components import ComponentBehavior from scenic.core.distributions import ( MultiplexerDistribution, RandomControlFlowError, @@ -1067,7 +1068,11 @@ def __init__(self, *args, **kwargs): @classmethod def _specify(cls, context, prop, value): # Normalize types of some built-in properties - if prop == "behavior" and value != None: + if ( + prop == "behavior" + and value != None + and not isinstance(value, ComponentBehavior) + ): import scenic.syntax.veneer as veneer # TODO improve? value = toType( diff --git a/src/scenic/domains/driving/actions.py b/src/scenic/domains/driving/actions.py index 6649fe79a..60262044d 100644 --- a/src/scenic/domains/driving/actions.py +++ b/src/scenic/domains/driving/actions.py @@ -227,8 +227,8 @@ def __init__( throttle = min(throttle, max_throttle) brake = 0 else: - throttle = 0 brake = min(abs(throttle), max_brake) + throttle = 0 # Steering regulation: changes cannot happen abruptly, can't steer too much. diff --git a/src/scenic/syntax/ast.py b/src/scenic/syntax/ast.py index 57e5779ef..044ad7570 100644 --- a/src/scenic/syntax/ast.py +++ b/src/scenic/syntax/ast.py @@ -1250,3 +1250,41 @@ def __init__(self, left: ast.AST, right: ast.AST, *args: any, **kwargs: any) -> self.left = left self.right = right self._fields = ["left", "right"] + + +## Scenic + Contracts ## +# Components +class ComponentDef(AST): + def __init__( + self, + name, + args, + docstring, + sensors, + inputs, + outputs, + actions, + state, + body, + composition, + *_args, + **kwargs, + ) -> None: + self.name = name + self.args = args + self.docstring = docstring + self.sensors = sensors + self.inputs = inputs + self.outputs = outputs + self.actions = actions + self.state = state + self.body = body + self.composition = composition + + +class ImplementStmt(AST): + def __init__(self, name, component, linked_name, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.name = name + self.component = component + self.linked_name = linked_name diff --git a/src/scenic/syntax/compiler.py b/src/scenic/syntax/compiler.py index ea0417735..940f478e2 100644 --- a/src/scenic/syntax/compiler.py +++ b/src/scenic/syntax/compiler.py @@ -1746,3 +1746,661 @@ def visit_CanSeeOp(self, node: s.CanSeeOp): ], keywords=[], ) + + # Scenic + Contracts + def toComponentName(self, name): + return f"_SCENIC_INTERNAL_COMPONENT_{name}" + + def makeComponentBaseDef( + self, + name, + args, + docstring, + sensors, + inputs, + outputs, + state, + body, + ): + component_body = [] + + ## '__init__' Function ## + # Update args to add option for a Scenic object to link to. + init_arguments = ast.arguments( + args.posonlyargs, + [ast.arg("self")] + args.args + [ast.arg("_SCENIC_INTERNAL_LINKED_OBJ_NAME")], + args.vararg, + args.kwonlyargs, + args.kw_defaults, + args.kwarg, + args.defaults + [ast.Constant(value=None)], + ) + + init_body = [] + + # Store info on sensors, inputs, outputs, and state. + init_body.append( + ast.Assign( + targets=[ + ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="sensors_values", + ctx=ast.Store(), + ) + ], + value=ast.Dict(keys=[], values=[]), + ) + ) + init_body.append( + ast.Assign( + targets=[ + ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="sensors_types", + ctx=ast.Store(), + ) + ], + value=ast.Dict(keys=[], values=[]), + ) + ) + for s in sensors: + init_body.append( + ast.Assign( + targets=[ + ast.Subscript( + value=ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="sensors_values", + ctx=loadCtx, + ), + slice=ast.Constant(value=s[0]), + ctx=ast.Store(), + ) + ], + value=ast.Constant(s[1]), + ) + ) + init_body.append( + ast.Assign( + targets=[ + ast.Subscript( + value=ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="sensors_types", + ctx=loadCtx, + ), + slice=ast.Constant(value=s[0]), + ctx=ast.Store(), + ) + ], + value=s[2], + ) + ) + + init_body.append( + ast.Assign( + targets=[ + ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="inputs_types", + ctx=ast.Store(), + ) + ], + value=ast.Dict(keys=[], values=[]), + ) + ) + for i in inputs: + init_body.append( + ast.Assign( + targets=[ + ast.Subscript( + value=ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="inputs_types", + ctx=loadCtx, + ), + slice=ast.Constant(value=i[0]), + ctx=ast.Store(), + ) + ], + value=i[1], + ) + ) + + init_body.append( + ast.Assign( + targets=[ + ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="outputs_types", + ctx=ast.Store(), + ) + ], + value=ast.Dict(keys=[], values=[]), + ) + ) + for o in outputs: + init_body.append( + ast.Assign( + targets=[ + ast.Subscript( + value=ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="outputs_types", + ctx=loadCtx, + ), + slice=ast.Constant(value=o[0]), + ctx=ast.Store(), + ) + ], + value=o[1], + ) + ) + + init_body.append( + ast.Assign( + targets=[ + ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="state_inits", + ctx=ast.Store(), + ) + ], + value=ast.Dict(keys=[], values=[]), + ) + ) + init_body.append( + ast.Assign( + targets=[ + ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="state_types", + ctx=ast.Store(), + ) + ], + value=ast.Dict(keys=[], values=[]), + ) + ) + for s in state: + init_body.append( + ast.Assign( + targets=[ + ast.Subscript( + value=ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="state_inits", + ctx=loadCtx, + ), + slice=ast.Constant(value=s[0]), + ctx=ast.Store(), + ) + ], + value=s[1], + ) + ) + init_body.append( + ast.Assign( + targets=[ + ast.Subscript( + value=ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="state_types", + ctx=loadCtx, + ), + slice=ast.Constant(value=s[0]), + ctx=ast.Store(), + ) + ], + value=s[2], + ) + ) + + # Make super() init call + super_kwargs = itertools.chain( + init_arguments.posonlyargs, init_arguments.args, init_arguments.kwonlyargs + ) + super_kwargs = [ + ast.keyword(arg=arg.arg, value=ast.Name(id=arg.arg, ctx=loadCtx)) + for arg in super_kwargs + if arg.arg != "self" + ] + init_body.append( + ast.Expr( + value=ast.Call( + func=ast.Attribute( + value=ast.Call( + func=ast.Name(id="super", ctx=loadCtx), args=[], keywords=[] + ), + attr="__init__", + ctx=loadCtx, + ), + args=[], + keywords=super_kwargs, + ) + ) + ) + + # Create init function and add it to the component body + init_func = ast.FunctionDef( + name="__init__", + args=init_arguments, + body=init_body, + decorator_list=[], + returns=None, + type_params=[], + ) + component_body.append(init_func) + + ## 'run' Function ## + new_args = ( + [ast.arg(arg="state")] + + [ast.arg(arg=i[0]) for i in inputs] + + [ast.arg(s[0]) for s in sensors] + ) + + run_args = ast.arguments( + new_args + args.posonlyargs, + args.args, + args.vararg, + args.kwonlyargs, + [], + args.kwarg, + [], + ) + run_body = [] + + # Add component body code + run_body += body + + run_func = ast.FunctionDef( + name="run_inner", + args=run_args, + body=run_body, + decorator_list=[ast.Name(id="staticmethod", ctx=loadCtx)], + returns=None, + type_params=[], + ) + component_body.append(run_func) + + # Create and return the new component class + component_class = ast.ClassDef( + name=self.toComponentName(name), + bases=[ast.Name(id="BaseComponent", ctx=loadCtx)], + body=component_body, + keywords=[], + decorator_list=[], + type_params=[], + ) + return component_class + + def makeComponentActionDef( + self, + name: str, + args: Optional[ast.arguments], + docstring: Optional[str], + actions: List[Tuple[str, ast.AST]], + ): + ## '__init__' Function ## + # Update args to add option for a Scenic object to link to. + init_arguments = ast.arguments( + args.posonlyargs, + [ast.arg("self")] + args.args + [ast.arg("_SCENIC_INTERNAL_LINKED_OBJ_NAME")], + args.vararg, + args.kwonlyargs, + args.kw_defaults, + args.kwarg, + args.defaults + [ast.Constant(value=None)], + ) + + init_body = [] + init_body.append( + ast.Assign( + targets=[ + ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="inputs_types", + ctx=ast.Store(), + ) + ], + value=ast.Dict(keys=[], values=[]), + ) + ) + for a in actions: + init_body.append( + ast.Assign( + targets=[ + ast.Subscript( + value=ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="inputs_types", + ctx=loadCtx, + ), + slice=ast.Constant(value=a[0]), + ctx=ast.Store(), + ) + ], + value=a[1], + ) + ) + + # Make super() init call + super_kwargs = itertools.chain( + init_arguments.posonlyargs, init_arguments.args, init_arguments.kwonlyargs + ) + super_kwargs = [ + ast.keyword(arg=arg.arg, value=ast.Name(id=arg.arg, ctx=loadCtx)) + for arg in super_kwargs + if arg.arg != "self" + ] + init_body.append( + ast.Expr( + value=ast.Call( + func=ast.Attribute( + value=ast.Call( + func=ast.Name(id="super", ctx=loadCtx), args=[], keywords=[] + ), + attr="__init__", + ctx=loadCtx, + ), + args=[], + keywords=super_kwargs, + ) + ) + ) + + init_func = ast.FunctionDef( + name="__init__", + args=init_arguments, + body=init_body, + decorator_list=[], + returns=None, + type_params=[], + ) + component_body = [init_func] + return ast.ClassDef( + name=self.toComponentName(name), + bases=[ast.Name(id="ActionComponent", ctx=loadCtx)], + body=component_body, + keywords=[], + decorator_list=[], + type_params=[], + ) + + def makeComponentComposeDef( + self, + name: str, + args: Optional[ast.arguments], + docstring: Optional[str], + inputs: List[Tuple[str, ast.AST]], + outputs: List[Tuple[str, ast.AST]], + composition: List[ast.AST], + ): + component_body = [] + + ## '__init__' Function ## + # Update args to add option for a Scenic object to link to. + init_arguments = ast.arguments( + args.posonlyargs, + [ast.arg("self")] + args.args, + args.vararg, + args.kwonlyargs + [ast.arg("_SCENIC_INTERNAL_LINKED_OBJ_NAME")], + args.kw_defaults + [ast.Constant(value=None)], + args.kwarg, + args.defaults, + ) + + init_body = [] + init_body.append( + ast.Assign( + targets=[ + ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="inputs_types", + ctx=ast.Store(), + ) + ], + value=ast.Dict(keys=[], values=[]), + ) + ) + for i in inputs: + init_body.append( + ast.Assign( + targets=[ + ast.Subscript( + value=ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="inputs_types", + ctx=loadCtx, + ), + slice=ast.Constant(value=i[0]), + ctx=ast.Store(), + ) + ], + value=i[1], + ) + ) + + init_body.append( + ast.Assign( + targets=[ + ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="outputs_types", + ctx=ast.Store(), + ) + ], + value=ast.Dict(keys=[], values=[]), + ) + ) + for o in outputs: + init_body.append( + ast.Assign( + targets=[ + ast.Subscript( + value=ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="outputs_types", + ctx=loadCtx, + ), + slice=ast.Constant(value=o[0]), + ctx=ast.Store(), + ) + ], + value=o[1], + ) + ) + + subcomponent_stmts = [(a, b) for n, a, b in composition if n == "subcomponent"] + connect_stmts = [(a, b) for n, a, b in composition if n == "connect"] + + init_body.append( + ast.Assign( + targets=[ + ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="subcomponents", + ctx=ast.Store(), + ) + ], + value=ast.Dict(keys=[], values=[]), + ) + ) + + # Ensure there are no duplicate subcomponent names + subcomponent_names = [name for name, _ in subcomponent_stmts] + assert len(set(subcomponent_names)) == len(subcomponent_names) + + for sub_name, expr in subcomponent_stmts: + assert isinstance(expr, ast.Call) + assert isinstance(expr.func, ast.Name) + # Add reference to linked object + expr.keywords.append( + ast.keyword( + arg="_SCENIC_INTERNAL_LINKED_OBJ_NAME", + value=ast.Name(id="_SCENIC_INTERNAL_LINKED_OBJ_NAME", ctx=loadCtx), + ) + ) + # Map subcomponent name to internal name + expr.func.id = self.toComponentName(expr.func.id) + + init_body.append( + ast.Assign( + targets=[ + ast.Subscript( + value=ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="subcomponents", + ctx=loadCtx, + ), + slice=ast.Constant(value=sub_name), + ctx=ast.Store(), + ) + ], + value=expr, + ) + ) + + init_body.append( + ast.Assign( + targets=[ + ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="connections", + ctx=ast.Store(), + ) + ], + value=ast.List(elts=[], ctx=loadCtx), + ) + ) + for source_target in connect_stmts: + init_body.append( + ast.Expr( + value=ast.Call( + func=ast.Attribute( + value=ast.Attribute( + value=ast.Name(id="self", ctx=loadCtx), + attr="connections", + ctx=loadCtx, + ), + attr="append", + ctx=loadCtx, + ), + args=[ast.Constant(value=source_target)], + keywords=[], + ) + ) + ) + + # Make super() init call + super_kwargs = itertools.chain( + init_arguments.posonlyargs, init_arguments.args, init_arguments.kwonlyargs + ) + super_kwargs = [ + ast.keyword(arg=arg.arg, value=ast.Name(id=arg.arg, ctx=loadCtx)) + for arg in super_kwargs + if arg.arg != "self" + ] + init_body.append( + ast.Expr( + value=ast.Call( + func=ast.Attribute( + value=ast.Call( + func=ast.Name(id="super", ctx=loadCtx), args=[], keywords=[] + ), + attr="__init__", + ctx=loadCtx, + ), + args=[], + keywords=super_kwargs, + ) + ) + ) + + # Create init function and add it to the component body + init_func = ast.FunctionDef( + name="__init__", + args=init_arguments, + body=init_body, + decorator_list=[], + returns=None, + type_params=[], + ) + component_body.append(init_func) + + return ast.ClassDef( + name=self.toComponentName(name), + bases=[ast.Name(id="ComposeComponent", ctx=loadCtx)], + body=component_body, + keywords=[], + decorator_list=[], + type_params=[], + ) + + def visit_ComponentDef(self, node: s.ComponentDef): + if node.body: + assert (not node.actions) and (not node.composition) + + return self.makeComponentBaseDef( + node.name, + node.args, + node.docstring, + node.sensors, + node.inputs, + node.outputs, + node.state, + node.body, + ) + + elif node.actions: + assert ( + (not node.sensors) + and (not node.inputs) + and (not node.outputs) + and (not node.state) + and (not node.body) + and (not node.composition) + ) + + return self.makeComponentActionDef( + node.name, + node.args, + node.docstring, + node.actions, + ) + + elif node.composition: + assert (not node.sensors) and (not node.state) and (not node.body) + + return self.makeComponentComposeDef( + node.name, + node.args, + node.docstring, + node.inputs, + node.outputs, + node.composition, + ) + + else: + assert False + + def visit_ImplementStmt(self, node: s.ImplementStmt): + assert isinstance(node.component, ast.Call) + + # Rename target to use mapped name + node.component.func = ast.Name( + id=self.toComponentName(node.component.func.id), ctx=loadCtx + ) + + # Add linking object keyword argument + node.component.keywords.append( + ast.keyword( + arg="_SCENIC_INTERNAL_LINKED_OBJ_NAME", + value=ast.Constant(value=node.linked_name), + ) + ) + + implementation_stmt = ast.Assign( + targets=[ast.Name(id=node.name, ctx=ast.Store())], value=node.component + ) + + return implementation_stmt diff --git a/src/scenic/syntax/scenic.gram b/src/scenic/syntax/scenic.gram index 2c0e64783..eccf94821 100644 --- a/src/scenic/syntax/scenic.gram +++ b/src/scenic/syntax/scenic.gram @@ -560,6 +560,7 @@ scenic_stmt: | scenic_do_stmt | scenic_abort_stmt | scenic_simulator_stmt + | scenic_implement_stmt scenic_compound_stmt: | scenic_tracked_assign_new_stmt @@ -570,6 +571,7 @@ scenic_compound_stmt: | scenic_scenario_def | scenic_try_interrupt_stmt | scenic_override_stmt + | scenic_component_def # SIMPLE STATEMENTS # ================= @@ -905,6 +907,79 @@ scenic_override_stmt: s.Override(target=e, specifiers=ss + t) } + +# Component +# --------- + +scenic_component_def: + | "component" a=NAME '(' b=[params] ')' &&':' c=scenic_component_def_block { + s.ComponentDef( + a.string, + args=b or self.make_arguments(None, [], None, [], None), + docstring=c[0], + sensors=c[1], + inputs=c[2], + outputs=c[3], + actions=c[4], + state=c[5], + body=c[6], + composition=c[7], + LOCATIONS, + ) + } + +scenic_component_def_block: + | NEWLINE INDENT a=[x=STRING NEWLINE { x.string }] b=[scenic_component_sensors_block] \ + c=[scenic_component_inputs_block] d=[scenic_component_outputs_block] \ + e=[scenic_component_actions_block] f=[scenic_component_state_block] \ + g=[scenic_component_body_block] h=[scenic_component_composition_block] DEDENT \ + {(a, b or [], c or [], d or [], e or [], f or [], g or [], h or [])} + +scenic_component_sensors_block: + | "sensors" &&':' NEWLINE INDENT a=(scenic_component_sensor)+ DEDENT {a} + +scenic_component_inputs_block: + | "inputs" &&':' NEWLINE INDENT a=(scenic_component_port)+ DEDENT {a} + +scenic_component_outputs_block: + | "outputs" &&':' NEWLINE INDENT a=(scenic_component_port)+ DEDENT {a} + +scenic_component_actions_block: + | "actions" &&':' NEWLINE INDENT a=(scenic_component_port)+ DEDENT {a} + +scenic_component_state_block: + | "state" &&':' NEWLINE INDENT a=(scenic_component_state)+ DEDENT {a} + +scenic_component_body_block: + | "body" &&':' a=block {a} + +scenic_component_composition_block: + | "compose" &&':' NEWLINE INDENT a=(scenic_composition_stmt)+ DEDENT {a} + + +scenic_component_sensor: + | source=scenic_component_qualified_name ':' type=primary 'as' name=NAME NEWLINE {(name.string, source, type)} + + +scenic_component_port: + | a=NAME ':' b=primary NEWLINE {(a.string, b)} + +scenic_component_state: + | a=NAME ':' type=primary '=' val=primary NEWLINE {(a.string, val, type)} + + +scenic_composition_stmt: + | "connect" a=scenic_component_qualified_name 'to' b=scenic_component_qualified_name NEWLINE {("connect", a, b)} + | a=NAME '=' b=primary NEWLINE {("subcomponent", a.string, b)} + +scenic_component_qualified_name: + | parent=NAME '.' a=NAME {(a.string, parent.string)} + | a=NAME {(a.string, None)} + + +scenic_implement_stmt: + | 'implement' linked_name=NAME 'with' component=primary 'as' name=NAME {s.ImplementStmt(name.string, component, linked_name.string, LOCATIONS)} + # Function definitions # -------------------- diff --git a/src/scenic/syntax/veneer.py b/src/scenic/syntax/veneer.py index 2e7650efa..e4d71caf3 100644 --- a/src/scenic/syntax/veneer.py +++ b/src/scenic/syntax/veneer.py @@ -177,6 +177,11 @@ "Always", "Eventually", "Next", + # Contracts + "BaseComponent", + "ActionComponent", + "ComposeComponent", + "runComponentsSimulation", ) # various Python types and functions used in the language but defined elsewhere @@ -249,6 +254,12 @@ import traceback import typing +from scenic.contracts.components import ( + ActionComponent, + BaseComponent, + ComposeComponent, + runComponentsSimulation, +) from scenic.core.distributions import ( Distribution, MultiplexerDistribution,