diff --git a/docs/src/api/action.rst b/docs/src/api/action.rst new file mode 100644 index 00000000..4f6238d1 --- /dev/null +++ b/docs/src/api/action.rst @@ -0,0 +1,21 @@ +Control Actions +================ + +Enums specifying different control input action types. + +.. currentmodule:: f110_gym.envs.action + +.. autosummary:: + :toctree: _generated/ + + LongitudinalActionEnum + LongitudinalAction + AcclAction + SpeedAction + SteerActionEnum + SteerAction + SteeringAngleAction + SteeringSpeedAction + CarAction + from_single_to_multi_action_space + \ No newline at end of file diff --git a/docs/src/api/index.rst b/docs/src/api/index.rst index dba7018a..b284e7ba 100644 --- a/docs/src/api/index.rst +++ b/docs/src/api/index.rst @@ -7,6 +7,7 @@ API Overview env base_classes + action dynamic_models integrators laser_models diff --git a/gym/f110_gym/envs/action.py b/gym/f110_gym/envs/action.py index 93658462..3cf50e45 100644 --- a/gym/f110_gym/envs/action.py +++ b/gym/f110_gym/envs/action.py @@ -9,19 +9,51 @@ class LongitudinalActionEnum(Enum): + """Logitudinal car action enum + + Acceleration: 1 + + Speed: 2 + + Raises + ------ + ValueError + Unknown action type + """ + Accl = 1 Speed = 2 @staticmethod def from_string(action: str): + """Set longitudinal action type from string + + Parameters + ---------- + action : str + longitudinal action type + + Returns + ------- + AcclAction | SpeedAction + Specified longitudinal action type + + Raises + ------ + ValueError + Unknown action type + """ if action == "accl": return AcclAction elif action == "speed": return SpeedAction else: raise ValueError(f"Unknown action type {action}") - + + class LongitudinalAction: + """Longitudinal Action abstract class""" + def __init__(self) -> None: self._type = None @@ -30,26 +62,91 @@ def __init__(self) -> None: @abstractmethod def act(self, longitudinal_action: Any, **kwargs) -> float: + """Convert longitudinal action + + Parameters + ---------- + longitudinal_action : input longitudinal action + + Returns + ------- + float + converted longitudinal action + + Raises + ------ + NotImplementedError + """ raise NotImplementedError("longitudinal act method not implemented") @property def type(self) -> str: + """property, type of action + + Returns + ------- + str + type + """ return self._type @property def space(self) -> gym.Space: - return gym.spaces.Box(low=self.lower_limit, high=self.upper_limit, dtype=np.float32) + """property, action space + + Returns + ------- + gym.Space + action space + """ + return gym.spaces.Box( + low=self.lower_limit, high=self.upper_limit, dtype=np.float32 + ) + class AcclAction(LongitudinalAction): + """Acceleration action type + + Parameters + ---------- + params : Dict + parameter dictionary + """ + def __init__(self, params: Dict) -> None: super().__init__() self._type = "accl" self.lower_limit, self.upper_limit = -params["a_max"], params["a_max"] def act(self, action: Tuple[float, float], state, params) -> float: + """Return action + + Parameters + ---------- + action : Tuple[float, float] + input before conversion + state : np.ndarray + current dynamical state + params : dict + parameter dictionary + + Returns + ------- + float + converted acceleration + """ return action + class SpeedAction(LongitudinalAction): + """Speed action type + + Parameters + ---------- + params : Dict + parameter dictionary + """ + def __init__(self, params: Dict) -> None: super().__init__() self._type = "speed" @@ -58,6 +155,22 @@ def __init__(self, params: Dict) -> None: def act( self, action: Tuple[float, float], state: np.ndarray, params: Dict ) -> float: + """Return action + + Parameters + ---------- + action : Tuple[float, float] + input before conversion + state : np.ndarray + current dynamical state + params : dict + parameter dictionary + + Returns + ------- + float + converted speed + """ accl = pid_accl( action, state[3], @@ -68,7 +181,10 @@ def act( return accl + class SteerAction: + """Steering Action abstract class""" + def __init__(self) -> None: self._type = None @@ -77,17 +193,58 @@ def __init__(self) -> None: @abstractmethod def act(self, steer_action: Any, **kwargs) -> float: + """Convert steering action + + Parameters + ---------- + steer_action : Any + input steering action + + Returns + ------- + float + converted steering action + + Raises + ------ + NotImplementedError + """ raise NotImplementedError("steer act method not implemented") @property def type(self) -> str: + """steering action type + + Returns + ------- + str + type + """ return self._type - + @property def space(self) -> gym.Space: - return gym.spaces.Box(low=self.lower_limit, high=self.upper_limit, dtype=np.float32) + """action space + + Returns + ------- + gym.Space + action space + """ + return gym.spaces.Box( + low=self.lower_limit, high=self.upper_limit, dtype=np.float32 + ) + class SteeringAngleAction(SteerAction): + """Steering angle action type + + Parameters + ---------- + params : Dict + parameter dictionary + """ + def __init__(self, params: Dict) -> None: super().__init__() self._type = "steering_angle" @@ -95,15 +252,40 @@ def __init__(self, params: Dict) -> None: def act( self, action: Tuple[float, float], state: np.ndarray, params: Dict - ) -> float: + ) -> float: + """Return action + + Parameters + ---------- + action : Tuple[float, float] + input before conversion + state : np.ndarray + current dynamical state + params : dict + parameter dictionary + + Returns + ------- + float + converted steering angle + """ sv = pid_steer( action, state[2], params["sv_max"], ) return sv - + + class SteeringSpeedAction(SteerAction): + """Steering angle velocity action type + + Parameters + ---------- + params : Dict + parameter dictionary + """ + def __init__(self, params: Dict) -> None: super().__init__() self._type = "steering_speed" @@ -111,15 +293,61 @@ def __init__(self, params: Dict) -> None: def act( self, action: Tuple[float, float], state: np.ndarray, params: Dict - ) -> float: + ) -> float: + """Return action + + Parameters + ---------- + action : Tuple[float, float] + input before conversion + state : np.ndarray + current dynamical state + params : dict + parameter dictionary + + Returns + ------- + float + converted steering velocity angle + """ return action + class SteerActionEnum(Enum): + """Logitudinal car action enum + + Acceleration: 1 + + Speed: 2 + + Raises + ------ + ValueError + Unknown action type + """ + Steering_Angle = 1 Steering_Speed = 2 @staticmethod def from_string(action: str): + """Set steering action type from string + + Parameters + ---------- + action : str + steering action type + + Returns + ------- + SteeringAngleAction | SteeringSpeedAction + Specified steering action type + + Raises + ------ + ValueError + Unknown action type + """ if action == "steering_angle": return SteeringAngleAction elif action == "steering_speed": @@ -127,11 +355,25 @@ def from_string(action: str): else: raise ValueError(f"Unknown action type {action}") + class CarAction: - def __init__(self, control_mode : list[str, str], params: Dict) -> None: + """Car actions + + Parameters + ---------- + control_mode : list[str, str] + control mode requested + params : Dict + parameter dictionary + + Raises + ------ + ValueError + """ + def __init__(self, control_mode: list[str, str], params: Dict) -> None: long_act_type_fn = None steer_act_type_fn = None - if type(control_mode) == str: # only one control mode specified + if type(control_mode) == str: # only one control mode specified try: long_act_type_fn = LongitudinalActionEnum.from_string(control_mode) except ValueError: @@ -141,24 +383,24 @@ def __init__(self, control_mode : list[str, str], params: Dict) -> None: raise ValueError(f"Unknown control mode {control_mode}") if control_mode == "steering_speed": warnings.warn( - f'Only one control mode specified, using {control_mode} for steering and defaulting to acceleration for longitudinal control' + f"Only one control mode specified, using {control_mode} for steering and defaulting to acceleration for longitudinal control" ) long_act_type_fn = LongitudinalActionEnum.from_string("accl") else: warnings.warn( - f'Only one control mode specified, using {control_mode} for steering and defaulting to speed for longitudinal control' + f"Only one control mode specified, using {control_mode} for steering and defaulting to speed for longitudinal control" ) long_act_type_fn = LongitudinalActionEnum.from_string("speed") else: if control_mode == "accl": warnings.warn( - f'Only one control mode specified, using {control_mode} for longitudinal control and defaulting to steering speed for steering' + f"Only one control mode specified, using {control_mode} for longitudinal control and defaulting to steering speed for steering" ) steer_act_type_fn = SteerActionEnum.from_string("steering_speed") else: warnings.warn( - f'Only one control mode specified, using {control_mode} for longitudinal control and defaulting to steering angle for steering' + f"Only one control mode specified, using {control_mode} for longitudinal control and defaulting to steering angle for steering" ) steer_act_type_fn = SteerActionEnum.from_string("steering_angle") @@ -167,24 +409,54 @@ def __init__(self, control_mode : list[str, str], params: Dict) -> None: steer_act_type_fn = SteerActionEnum.from_string(control_mode[1]) else: raise ValueError(f"Unknown control mode {control_mode}") - - self._longitudinal_action : LongitudinalAction = long_act_type_fn(params) - self._steer_action : SteerAction = steer_act_type_fn(params) + + self._longitudinal_action: LongitudinalAction = long_act_type_fn(params) + self._steer_action: SteerAction = steer_act_type_fn(params) @abstractmethod def act(self, action: Any, **kwargs) -> Tuple[float, float]: + """Return action + + Parameters + ---------- + action : Any + input control actions + + Returns + ------- + Tuple[float, float] + converted control actions + """ longitudinal_action = self._longitudinal_action.act(action[0], **kwargs) steer_action = self._steer_action.act(action[1], **kwargs) return longitudinal_action, steer_action @property def type(self) -> Tuple[str, str]: + """property, car action type + + Returns + ------- + Tuple[str, str] + type + """ return (self._steer_action.type, self._longitudinal_action.type) @property def space(self) -> gym.Space: - low = np.array([self._steer_action.lower_limit, self._longitudinal_action.lower_limit]).astype(np.float32) - high = np.array([self._steer_action.upper_limit, self._longitudinal_action.upper_limit]).astype(np.float32) + """property, car action space + + Returns + ------- + gym.Space + action space + """ + low = np.array( + [self._steer_action.lower_limit, self._longitudinal_action.lower_limit] + ).astype(np.float32) + high = np.array( + [self._steer_action.upper_limit, self._longitudinal_action.upper_limit] + ).astype(np.float32) return gym.spaces.Box(low=low, high=high, shape=(2,), dtype=np.float32) @@ -192,6 +464,20 @@ def space(self) -> gym.Space: def from_single_to_multi_action_space( single_agent_action_space: gym.spaces.Box, num_agents: int ) -> gym.spaces.Box: + """Convert single agent action spaces to multi agent action spaces + + Parameters + ---------- + single_agent_action_space : gym.spaces.Box + action space of a single aget + num_agents : int + number of agents in the simulation + + Returns + ------- + gym.spaces.Box + action spaces of multiple agents + """ return gym.spaces.Box( low=single_agent_action_space.low[None].repeat(num_agents, 0), high=single_agent_action_space.high[None].repeat(num_agents, 0), diff --git a/gym/f110_gym/envs/base_classes.py b/gym/f110_gym/envs/base_classes.py index bda02aec..51479d33 100644 --- a/gym/f110_gym/envs/base_classes.py +++ b/gym/f110_gym/envs/base_classes.py @@ -9,20 +9,34 @@ class RaceCar(object): - """ - Base level race car class, handles the physics and laser scan of a single vehicle - - Data Members: - params (dict): vehicle parameters dictionary - is_ego (bool): ego identifier - time_step (float): physics timestep - num_beams (int): number of beams in laser - fov (float): field of view of laser - state (np.ndarray (7, )): state vector [x, y, theta, vel, steer_angle, ang_vel, slip_angle] - odom (np.ndarray(13, )): odometry vector [x, y, z, qx, qy, qz, qw, linear_x, linear_y, linear_z, angular_x, angular_y, angular_z] - accel (float): current acceleration input - steer_angle_vel (float): current steering velocity input - in_collision (bool): collision indicator + """Base level race car class, handles the physics and laser scan of a single vehicle + + Parameters + ---------- + params : _type_ + vehicle parameters dictionary + seed : _type_ + random seed + action_type : CarAction + action type for the cars + integrator : _type_, optional + integrator type, by default EulerIntegrator() + model : _type_, optional + vehicle model type, by default DynamicModel.ST + is_ego : bool, optional + ego identifier, by default False + time_step : float, optional + physics sim time step, by default 0.01 + num_beams : int, optional + number of beams in the laser scan, by default 1080 + fov : float, optional + field of view of the laser, by default 4.7 + + + Raises + ------ + ValueError + No Control Action Type Specified. """ # static objects that don't need to be stored in class instances @@ -43,25 +57,6 @@ def __init__( num_beams=1080, fov=4.7, ): - """ - TODO rewrite it - - Init function - - Args: - params (dict): vehicle parameters dictionary - seed (int): random seed - is_ego (bool, default=False): ego identifier - time_step (float, default=0.01): physics sim time step - num_beams (int, default=1080): number of beams in the laser scan - fov (float, default=4.7): field of view of the laser - integrator (Integrator, default=EulerIntegrator()): integrator type - model (Model, default=Model.ST): vehicle model type - action_type (Action, default=SpeedAction()): action type - - Returns: - None - """ # initialization self.params = params @@ -138,36 +133,32 @@ def __init__( RaceCar.side_distances[i] = min(to_side, to_fr) def update_params(self, params): - """ - Updates the physical parameters of the vehicle - Note that does not need to be called at initialization of class anymore - - Args: - params (dict): new parameters for the vehicle + """Updates the physical parameters of the vehicle - Returns: - None + Parameters + ---------- + params : dict + new parameters for the vehicle """ self.params = params def set_map(self, map: str | Track): - """ - Sets the map for scan simulator + """Sets the map for scan simulator - Args: - map (str | Track): name of the map, or Track object + Parameters + ---------- + map : str | Track + name of the map, or Track object """ RaceCar.scan_simulator.set_map(map) def reset(self, pose): - """ - Resets the vehicle to a pose - - Args: - pose (np.ndarray (3, )): pose to reset the vehicle to + """Resets the vehicle to a pose - Returns: - None + Parameters + ---------- + pose : np.ndarray + pose to reset the vehicle to """ # clear control inputs self.accel = 0.0 @@ -182,16 +173,18 @@ def reset(self, pose): self.scan_rng = np.random.default_rng(seed=self.seed) def ray_cast_agents(self, scan): - """ - Ray cast onto other agents in the env, modify original scan + """Ray cast onto other agents in the env, modify original scan - Args: - scan (np.ndarray, (n, )): original scan range array + Parameters + ---------- + scan : np.ndarray + original scan range array - Returns: - new_scan (np.ndarray, (n, )): modified scan + Returns + ------- + np.ndarray + modified scan """ - # starting from original scan new_scan = scan @@ -212,19 +205,19 @@ def ray_cast_agents(self, scan): return new_scan def check_ttc(self, current_scan): - """ - Check iTTC against the environment, sets vehicle states accordingly if collision occurs. + """Check iTTC against the environment, sets vehicle states accordingly if collision occurs. Note that this does NOT check collision with other agents. - state is [x, y, steer_angle, vel, yaw_angle, yaw_rate, slip_angle] - - Args: - current_scan + Parameters + ---------- + current_scan : np.ndarray + current laser scan - Returns: - None + Returns + ------- + bool + whether the scan given indicates the vehicle is in collision with environment """ - in_collision = check_ttc_jit( current_scan, self.state[3], @@ -246,17 +239,25 @@ def check_ttc(self, current_scan): return in_collision def update_pose(self, raw_steer, vel): + """Steps the vehicle's physical simulation + + Parameters + ---------- + raw_steer : float + desired steering angle, or desired steering velocity + vel : float + desired longitudinal velocity, or desired longitudinal acceleration + + Returns + ------- + np.ndarray + current laser scan + + Raises + ------ + ValueError + No Control Action Type Specified. """ - Steps the vehicle's physical simulation - - Args: - steer (float): desired steering angle, or desired steering velocity - vel (float): desired longitudinal velocity, or desired longitudinal acceleration - - Returns: - current_scan - """ - # steering delay steer = 0.0 if self.steer_buffer.shape[0] < self.steer_buffer_size: @@ -295,30 +296,26 @@ def update_pose(self, raw_steer, vel): return current_scan def update_opp_poses(self, opp_poses): - """ - Updates the vehicle's information on other vehicles - - Args: - opp_poses (np.ndarray(num_other_agents, 3)): updated poses of other agents + """Updates the vehicle's information on other vehicles - Returns: - None + Parameters + ---------- + opp_poses : np.ndarray + updated poses of other agents """ self.opp_poses = opp_poses def update_scan(self, agent_scans, agent_index): - """ - Steps the vehicle's laser scan simulation + """Steps the vehicle's laser scan simulation Separated from update_pose because needs to update scan based on NEW poses of agents in the environment - Args: - agent scans list (modified in-place), - agent index (int) - - Returns: - None + Parameters + ---------- + agent_scans : list[np.ndarray] + list of scans of each agent + agent_index : int + index of agent """ - current_scan = agent_scans[agent_index] # check ttc @@ -331,21 +328,33 @@ def update_scan(self, agent_scans, agent_index): class Simulator(object): - """ - Simulator class, handles the interaction and update of all vehicles in the environment - - TODO check description - - Data Members: - num_agents (int): number of agents in the environment - time_step (float): physics time step - agent_poses (np.ndarray(num_agents, 3)): all poses of all agents - agents (list[RaceCar]): container for RaceCar objects - collisions (np.ndarray(num_agents, )): array of collision indicator for each agent - collision_idx (np.ndarray(num_agents, )): which agent is each agent in collision with - integrator (Integrator): integrator to use for vehicle dynamics - model (Model): model to use for vehicle dynamics - action_type (Action): action type to use for vehicle dynamics + """Simulator class, handles the interaction and update of all vehicles in the environment + + Parameters + ---------- + params : dict + vehicle parameter dictionary + num_agents : int + number of agents in the environment + seed : int + seed of the rng in scan simulation + action_type : CarAction + action type to use for controlling the vehicles + integrator : Integrator, optional + integrator to use for vehicle dynamics, by default IntegratorType.RK4 + model : Model, optional + vehicle dynamics model to use, by default DynamicModel.ST + time_step : float, optional + physics time step, by default 0.01 + ego_idx : int, optional + ego vehicle's index in list of agents, by default 0 + + Raises + ------ + IndexError + Index given is out of bounds for list of agents. + ValueError + Number of poses for reset does not match number of agents. """ def __init__( @@ -359,21 +368,6 @@ def __init__( time_step=0.01, ego_idx=0, ): - """ - Init function - - Args: - params (dict): vehicle parameter dictionary, includes {'mu', 'C_Sf', 'C_Sr', 'lf', 'lr', 'h', 'm', 'I', 's_min', 's_max', 'sv_min', 'sv_max', 'v_switch', 'a_max', 'v_min', 'v_max', 'length', 'width'} - num_agents (int): number of agents in the environment - seed (int): seed of the rng in scan simulation - time_step (float, default=0.01): physics time step - ego_idx (int, default=0): ego vehicle's index in list of agents - integrator (Integrator, default=Integrator.RK4): integrator to use for vehicle dynamics - model (Model, default=Model.ST): vehicle dynamics model to use - action_type (Action, default=SpeedAction()): action type to use for controlling the vehicle - Returns: - None - """ self.num_agents = num_agents self.seed = seed self.time_step = time_step @@ -404,28 +398,30 @@ def __init__( self.agent_scans = np.empty((self.num_agents, num_beams)) def set_map(self, map: str | Track): - """ - Sets the map of the environment and sets the map for scan simulator of each agent - - Args: - map (str | Track): name of the map, or Track object + """Sets the map of the environment and sets the map for scan simulator of each agent - Returns: - None + Parameters + ---------- + map : str | Track + name of the map, or Track object """ for agent in self.agents: agent.set_map(map) def update_params(self, params, agent_idx=-1): - """ - Updates the params of agents, if an index of an agent is given, update only that agent's params - - Args: - params (dict): dictionary of params, see details in docstring of __init__ - agent_idx (int, default=-1): index for agent that needs param update, if negative, update all agents - - Returns: - None + """Updates the params of agents, if an index of an agent is given, update only that agent's params + + Parameters + ---------- + params : dict + dictionary of params + agent_idx : int, optional + index for agent that needs param update, if negative, update all agents, by default -1 + + Raises + ------ + IndexError + Index given is out of bounds for list of agents. """ self.params = params if agent_idx < 0: @@ -440,15 +436,7 @@ def update_params(self, params, agent_idx=-1): raise IndexError("Index given is out of bounds for list of agents.") def check_collision(self): - """ - Checks for collision between agents using GJK and agents' body vertices - - Args: - None - - Returns: - None - """ + """Checks for collision between agents using GJK and agents' body vertices""" # get vertices of all agents all_vertices = np.empty((self.num_agents, 4, 2)) for i in range(self.num_agents): @@ -460,14 +448,12 @@ def check_collision(self): self.collisions, self.collision_idx = collision_multiple(all_vertices) def step(self, control_inputs): - """ - Steps the simulation environment - - Args: - control_inputs (np.ndarray (num_agents, 2)): control inputs of all agents, first column is desired steering angle, second column is desired velocity + """Steps the simulation environment - Returns: - observations (dict): dictionary for observations: poses of agents, current laser scan of each agent, collision indicators, etc. + Parameters + ---------- + control_inputs : np.ndarray + control inputs of all agents, first column is desired steering angle, second column is desired velocity """ # looping over agents @@ -498,16 +484,18 @@ def step(self, control_inputs): self.collisions[i] = 1.0 def reset(self, poses): - """ - Resets the simulation environment by given poses + """Resets the simulation environment by given poses - Arges: - poses (np.ndarray (num_agents, 3)): poses to reset agents to + Parameters + ---------- + poses : np.ndarray + poses to reset agents to - Returns: - None + Raises + ------ + ValueError + Number of poses for reset does not match number of agents. """ - if poses.shape[0] != self.num_agents: raise ValueError( "Number of poses for reset does not match number of agents."