diff --git a/README.md b/README.md index cc4e5bf33..786bedbc8 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,9 @@ Stop the experiment (Ctrl+C) when the desired performance is reached and then ev ```bash python -m sf_examples.mujoco.enjoy_mujoco --env=mujoco_ant --experiment=Ant --train_dir=./train_dir + +# Or use an alternative eval script, no rendering but much faster! (use `sample_env_episodes` >= `num_workers` * `num_envs_per_worker`). +python -m sf_examples.mujoco.fast_eval_mujoco --env=mujoco_ant --experiment=Ant --train_dir=./train_dir --sample_env_episodes=128 --num_workers=16 --num_envs_per_worker=2 ``` Do the same in a pixel-based VizDoom environment (might need to run `pip install sample-factory[vizdoom]`, please also see docs for VizDoom-specific instructions): diff --git a/docs/09-environment-integrations/mujoco.md b/docs/09-environment-integrations/mujoco.md index 6801c2205..5290ce05b 100644 --- a/docs/09-environment-integrations/mujoco.md +++ b/docs/09-environment-integrations/mujoco.md @@ -17,13 +17,21 @@ that we can achieve even faster training on a multi-core machine with more optim To train a model in the `Ant-v4` environment: ``` -python -m sf_examples.mujoco.train_mujoco --algo=APPO --env=mujoco_ant --experiment= +python -m sf_examples.mujoco.train_mujoco --env=mujoco_ant --experiment= ``` To visualize the training results, use the `enjoy_mujoco` script: ``` -python -m sf_examples.mujoco.enjoy_mujoco --algo=APPO --env=mujoco_ant --experiment= +python -m sf_examples.mujoco.enjoy_mujoco --env=mujoco_ant --experiment= +``` + +If you're having issues with the Mujoco viewer in a Unix/Linux environment with Conda, try running the following +before executing the `enjoy_mujoco` script: + +``` +export LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libstdc++.so.6 +python -m sf_examples.mujoco.enjoy_mujoco ... ``` Multiple experiments can be run in parallel with the launcher module. `mujoco_all_envs` is an example launcher script that runs all mujoco envs with 10 seeds. diff --git a/sample_factory/algo/sampling/evaluation_sampling_api.py b/sample_factory/algo/sampling/evaluation_sampling_api.py new file mode 100644 index 000000000..77ec2a7e5 --- /dev/null +++ b/sample_factory/algo/sampling/evaluation_sampling_api.py @@ -0,0 +1,305 @@ +from __future__ import annotations + +import time +from collections import OrderedDict +from threading import Thread +from typing import Callable, Dict, Iterable, List, Optional, Tuple + +import numpy as np +from signal_slot.signal_slot import EventLoop, EventLoopObject, EventLoopStatus, signal +from torch import Tensor + +from sample_factory.algo.learning.learner import Learner +from sample_factory.algo.runners.runner import MsgHandler, PolicyMsgHandler, Runner +from sample_factory.algo.sampling.sampler import AbstractSampler, ParallelSampler, SerialSampler +from sample_factory.algo.utils.env_info import EnvInfo +from sample_factory.algo.utils.misc import EPISODIC, SAMPLES_COLLECTED, STATS_KEY, TIMING_STATS, ExperimentStatus +from sample_factory.algo.utils.model_sharing import ParameterServer +from sample_factory.algo.utils.rl_utils import samples_per_trajectory +from sample_factory.algo.utils.shared_buffers import BufferMgr +from sample_factory.algo.utils.tensor_dict import TensorDict +from sample_factory.cfg.configurable import Configurable +from sample_factory.utils.dicts import iterate_recursively +from sample_factory.utils.gpu_utils import set_global_cuda_envvars +from sample_factory.utils.typing import Config, InitModelData, PolicyID, StatusCode +from sample_factory.utils.utils import log + + +class SamplingLoop(EventLoopObject, Configurable): + def __init__(self, cfg: Config, env_info: EnvInfo): + Configurable.__init__(self, cfg) + unique_name = SamplingLoop.__name__ + self.event_loop: EventLoop = EventLoop(unique_loop_name=f"{unique_name}_EvtLoop", serial_mode=cfg.serial_mode) + self.event_loop.owner = self + EventLoopObject.__init__(self, self.event_loop, object_id=unique_name) + # self.event_loop.verbose = True + + # calculate how many episodes for each environment should be taken into account + # we only want to use first N episodes (we don't want to bias ourselves with short episodes) + total_envs = self.cfg.num_workers * self.cfg.num_envs_per_worker + self.max_episode_number = self.cfg.sample_env_episodes / total_envs + + self.env_info = env_info + self.iteration: int = 0 + + self.buffer_mgr: Optional[BufferMgr] = None + self.param_servers: Optional[Dict[PolicyID, ParameterServer]] = None + + self.new_trajectory_callback: Optional[Callable] = None + self.status: Optional[StatusCode] = None + + self.ready: bool = False + self.stopped: bool = False + + # samples_collected counts the total number of observations processed by the algorithm + self.samples_collected = [0 for _ in range(self.cfg.num_policies)] + + self.stats = dict() # regular (non-averaged) stats + self.avg_stats = dict() + + self.policy_avg_stats: Dict[str, List[List]] = dict() + + # global msg handlers for messages from algo components + self.msg_handlers: Dict[str, List[MsgHandler]] = { + TIMING_STATS: [Runner._timing_msg_handler], + STATS_KEY: [Runner._stats_msg_handler], + } + + # handlers for policy-specific messages + self.policy_msg_handlers: Dict[str, List[PolicyMsgHandler]] = { + EPISODIC: [self._episodic_stats_handler], + SAMPLES_COLLECTED: [Runner._samples_stats_handler], + } + + @signal + def model_initialized(self): + ... + + @signal + def trajectory_buffers_available(self): + ... + + @signal + def stop(self): + ... + + def init( + self, buffer_mgr: Optional[BufferMgr] = None, param_servers: Optional[Dict[PolicyID, ParameterServer]] = None + ): + set_global_cuda_envvars(self.cfg) + + self.buffer_mgr = buffer_mgr + if self.buffer_mgr is None: + self.buffer_mgr = BufferMgr(self.cfg, self.env_info) + + self.param_servers = param_servers + if self.param_servers is None: + self.param_servers = dict() + for policy_id in range(self.cfg.num_policies): + self.param_servers[policy_id] = ParameterServer( + policy_id, self.buffer_mgr.policy_versions, self.cfg.serial_mode + ) + + sampler_cls = SerialSampler if self.cfg.serial_mode else ParallelSampler + sampler: AbstractSampler = sampler_cls( + self.event_loop, self.buffer_mgr, self.param_servers, self.cfg, self.env_info + ) + self.event_loop.start.connect(sampler.init) + sampler.started.connect(self.on_sampler_started) + sampler.initialized.connect(self.on_sampler_initialized) + + for policy_id in range(self.cfg.num_policies): + sampler.connect_model_initialized(policy_id, self.model_initialized) + sampler.connect_on_new_trajectories(policy_id, self.on_new_trajectories) + sampler.connect_trajectory_buffers_available(self.trajectory_buffers_available) + sampler.connect_report_msg(self._process_msg) + + for stoppable in sampler.stoppable_components(): + self.stop.connect(stoppable.on_stop) + + def _process_msg(self, msgs): + if isinstance(msgs, (dict, OrderedDict)): + msgs = (msgs,) + + if not (isinstance(msgs, (List, Tuple)) and isinstance(msgs[0], (dict, OrderedDict))): + log.error("While parsing a message: expected a dictionary or list/tuple of dictionaries, found %r", msgs) + return + + for msg in msgs: + # some messages are policy-specific + policy_id = msg.get("policy_id", None) + + for key in msg: + for handler in self.msg_handlers.get(key, ()): + handler(self, msg) + if policy_id is not None: + for handler in self.policy_msg_handlers.get(key, ()): + handler(self, msg, policy_id) + + @staticmethod + def _episodic_stats_handler(runner: Runner, msg: Dict, policy_id: PolicyID) -> None: + # heavily based on the `_episodic_stats_handler` from `Runner` + s = msg[EPISODIC] + + # skip invalid stats, potentially be not setting episode_number one could always add stats + episode_number = s["episode_extra_stats"].get("episode_number", 0) + if episode_number < runner.max_episode_number: + log.debug( + f"Episode ended after {s['len']:.1f} steps. Return: {s['reward']:.1f}. True objective {s['true_objective']:.1f}" + ) + + for _, key, value in iterate_recursively(s): + if key not in runner.policy_avg_stats: + runner.policy_avg_stats[key] = [[] for _ in range(runner.cfg.num_policies)] + + if isinstance(value, np.ndarray) and value.ndim > 0: + runner.policy_avg_stats[key][policy_id].extend(value) + else: + runner.policy_avg_stats[key][policy_id].append(value) + + def wait_until_ready(self): + while not self.ready: + log.debug(f"{self.object_id}: waiting for sampler to be ready...") + time.sleep(0.5) + + def start(self, init_model_data: Optional[Dict[PolicyID, InitModelData]] = None): + """Model initialization should kickstart the sampling loop.""" + for policy_id in range(self.cfg.num_policies): + if init_model_data is None: + self.model_initialized.emit(None) + else: + self.model_initialized.emit(init_model_data[policy_id]) + + def set_new_trajectory_callback(self, cb: Callable) -> None: + self.new_trajectory_callback = cb + + def on_sampler_started(self): + self.ready = True + + def on_sampler_initialized(self): + log.debug(f"{self.object_id}: sampler fully initialized!") + + def on_new_trajectories(self, trajectory_dicts: Iterable[Dict], device: str): + for trajectory_dict in trajectory_dicts: + traj_buffer_idx: int | slice = trajectory_dict["traj_buffer_idx"] + if isinstance(traj_buffer_idx, slice): + trajectory_slice = traj_buffer_idx + else: + trajectory_slice = slice(traj_buffer_idx, traj_buffer_idx + 1) + + # data for this trajectory is now available in the buffer + # always use a slice so that returned tensors are the same dimensionality regardless of whether we + # use batched or non-batched sampling + traj = self.buffer_mgr.traj_tensors_torch[device][trajectory_slice] + self.new_trajectory_callback(traj, [traj_buffer_idx], device) + + def yield_trajectory_buffers(self, available_buffers: Iterable[int | slice], device: str): + # make this trajectory buffer available again + self.buffer_mgr.traj_buffer_queues[device].put_many(available_buffers) + self.iteration += 1 + for policy_id in range(self.cfg.num_policies): + self.trajectory_buffers_available.emit(policy_id, self.iteration) + + def run(self) -> StatusCode: + log.debug("Before event loop...") + + # noinspection PyBroadException + try: + evt_loop_status = self.event_loop.exec() + self.status = ( + ExperimentStatus.INTERRUPTED + if evt_loop_status == EventLoopStatus.INTERRUPTED + else ExperimentStatus.SUCCESS + ) + self.stop.emit() + except Exception: + log.exception(f"Uncaught exception in {self.object_id} evt loop") + self.status = ExperimentStatus.FAILURE + + log.debug(f"{SamplingLoop.__name__} finished with {self.status=}") + return self.status + + def stop_sampling(self): + self.stop.emit() + self.event_loop.stop() + self.stopped = True + + +class EvalSamplingAPI: + def __init__( + self, + cfg: Config, + env_info: EnvInfo, + ): + self.cfg = cfg + self.env_info = env_info + + self.buffer_mgr = None + self.policy_versions_tensor = None + self.param_servers: Dict[PolicyID, ParameterServer] = None + self.init_model_data: Dict[PolicyID, InitModelData] = None + self.learners: Dict[PolicyID, Learner] = None + + self.sampling_loop: SamplingLoop = None + self.total_samples = 0 + + def init(self): + set_global_cuda_envvars(self.cfg) + + self.buffer_mgr = BufferMgr(self.cfg, self.env_info) + self.policy_versions_tensor: Tensor = self.buffer_mgr.policy_versions + + self.param_servers = {} + self.init_model_data = {} + self.learners = {} + for policy_id in range(self.cfg.num_policies): + self.param_servers[policy_id] = ParameterServer( + policy_id, self.policy_versions_tensor, self.cfg.serial_mode + ) + self.learners[policy_id] = Learner( + self.cfg, self.env_info, self.policy_versions_tensor, policy_id, self.param_servers[policy_id] + ) + self.init_model_data[policy_id] = self.learners[policy_id].init() + + self.sampling_loop: SamplingLoop = SamplingLoop(self.cfg, self.env_info) + # don't pass self.param_servers here, learners are normally initialized later + # TODO: fix above issue + self.sampling_loop.init(self.buffer_mgr) + self.sampling_loop.set_new_trajectory_callback(self._on_new_trajectories) + self.sampling_thread: Thread = Thread(target=self.sampling_loop.run) + self.sampling_thread.start() + + self.sampling_loop.wait_until_ready() + + @property + def eval_stats(self): + # it's possible that we would like to return additional stats, like fps or sth + # those could be added here + return self.sampling_loop.policy_avg_stats + + @property + def eval_episodes(self): + return self.eval_stats.get("episode_number", [[] for _ in range(self.cfg.num_policies)]) + + @property + def eval_env_steps(self): + # return number of env steps for each policy + episode_lens = self.eval_stats.get("len", [[] for _ in range(self.cfg.num_policies)]) + return [sum(episode_lens[policy_id]) for policy_id in range(self.cfg.num_policies)] + + def start(self, init_model_data: Optional[Dict[PolicyID, InitModelData]] = None): + if init_model_data is None: + init_model_data = self.init_model_data + self.sampling_loop.start(init_model_data) + + def _on_new_trajectories(self, traj: TensorDict, traj_buffer_indices: Iterable[int | slice], device: str): + self.total_samples += samples_per_trajectory(traj) + + # just release buffers after every trajectory + # we could alternatively have more sophisticated logic here, see i.e. batcher.py or simplified_sampling_api.py + self.sampling_loop.yield_trajectory_buffers(traj_buffer_indices, device) + + def stop(self) -> StatusCode: + self.sampling_loop.stop_sampling() + self.sampling_thread.join() + return self.sampling_loop.status diff --git a/sample_factory/cfg/arguments.py b/sample_factory/cfg/arguments.py index f736342d1..a15f0444e 100644 --- a/sample_factory/cfg/arguments.py +++ b/sample_factory/cfg/arguments.py @@ -273,3 +273,29 @@ def maybe_load_from_checkpoint(cfg: Config) -> AttrDict: return AttrDict(vars(cfg)) return load_from_checkpoint(cfg) + + +def checkpoint_override_defaults(cfg: Config, parser) -> AttrDict: + cfg_filename = cfg_file(cfg) + cfg_filename_old = cfg_file_old(cfg) + + if not os.path.isfile(cfg_filename) and os.path.isfile(cfg_filename_old): + # rename old config file + log.warning(f"Loading legacy config file {cfg_filename_old} instead of {cfg_filename}") + os.rename(cfg_filename_old, cfg_filename) + + if not os.path.isfile(cfg_filename): + raise Exception( + f"Could not load saved parameters for experiment {cfg.experiment} " + f"(file {cfg_filename} not found). Check that you have the correct experiment name " + f"and --train_dir is set correctly." + ) + + with open(cfg_filename, "r") as json_file: + json_params = json.load(json_file) + log.warning("Loading existing experiment configuration from %s", cfg_filename) + loaded_cfg = AttrDict(json_params) + + parser.set_defaults(**loaded_cfg) + + return loaded_cfg diff --git a/sample_factory/cfg/cfg.py b/sample_factory/cfg/cfg.py index 6b1b53e12..ae8e7a30b 100644 --- a/sample_factory/cfg/cfg.py +++ b/sample_factory/cfg/cfg.py @@ -636,6 +636,12 @@ def add_default_env_args(p: ArgumentParser): type=str2bool, help="Whether to use gym RecordEpisodeStatistics wrapper to keep track of reward", ) + p.add_argument( + "--episode_counter", + default=False, + type=str2bool, + help="Add wrapper to each env which will count the number of episodes for each env.", + ) def add_eval_args(parser): @@ -700,6 +706,18 @@ def add_eval_args(parser): type=str, help="Module name used to run training script. Used to generate HF model card", ) + parser.add_argument( + "--sample_env_episodes", + default=256, + type=int, + help="Determines how many episodes will be sampled in eval.", + ) + parser.add_argument( + "--csv_folder_name", + default=None, + type=str, + help="Path where the evaluation csv will be stored.", + ) def add_wandb_args(p: ArgumentParser): diff --git a/sample_factory/envs/create_env.py b/sample_factory/envs/create_env.py index 153f6684e..c38c0fe52 100644 --- a/sample_factory/envs/create_env.py +++ b/sample_factory/envs/create_env.py @@ -4,6 +4,7 @@ from sample_factory.algo.utils.context import global_env_registry from sample_factory.algo.utils.gymnasium_utils import patch_non_gymnasium_env +from sample_factory.envs.env_wrappers import EpisodeCounterWrapper from sample_factory.utils.attr_dict import AttrDict from sample_factory.utils.typing import Config from sample_factory.utils.utils import log @@ -39,4 +40,7 @@ def create_env( env = patch_non_gymnasium_env(env) + if "episode_counter" in cfg and cfg.episode_counter: + env = EpisodeCounterWrapper(env) + return env diff --git a/sample_factory/envs/env_wrappers.py b/sample_factory/envs/env_wrappers.py index e4acbb987..b16af9060 100644 --- a/sample_factory/envs/env_wrappers.py +++ b/sample_factory/envs/env_wrappers.py @@ -434,3 +434,23 @@ class NumpyObsWrapper(gym.ObservationWrapper): def observation(self, observation: Any) -> np.ndarray: return np.array(observation) + + +class EpisodeCounterWrapper(gym.Wrapper): + def __init__(self, env): + gym.Wrapper.__init__(self, env) + self.episode_count = 0 + + def reset(self, **kwargs) -> Tuple[GymObs, Dict]: + return self.env.reset(**kwargs) + + def step(self, action: int) -> GymStepReturn: + obs, reward, terminated, truncated, info = self.env.step(action) + + if terminated | truncated: + extra_stats = info.get("episode_extra_stats", {}) + extra_stats["episode_number"] = self.episode_count + info["episode_extra_stats"] = extra_stats + self.episode_count += 1 + + return obs, reward, terminated, truncated, info diff --git a/sample_factory/eval.py b/sample_factory/eval.py new file mode 100644 index 000000000..3d9a7268b --- /dev/null +++ b/sample_factory/eval.py @@ -0,0 +1,119 @@ +import json +import time +from collections import deque +from pathlib import Path +from typing import Deque + +import numpy as np +import pandas as pd +from signal_slot.signal_slot import StatusCode + +from sample_factory.algo.sampling.evaluation_sampling_api import EvalSamplingAPI +from sample_factory.algo.utils.env_info import EnvInfo, obtain_env_info_in_a_separate_process +from sample_factory.utils.typing import Config +from sample_factory.utils.utils import experiment_dir, log + + +def _print_fps_stats(cfg: Config, fps_stats: Deque): + episodes_sampled = fps_stats[-1][1] + env_steps_sampled = fps_stats[-1][2] + delta_sampled = env_steps_sampled - fps_stats[0][2] + delta_time = fps_stats[-1][0] - fps_stats[0][0] + fps = delta_sampled / delta_time + fps_frameskip = fps * cfg.env_frameskip + fps_frameskip_str = f" ({fps_frameskip:.1f} FPS with frameskip)" if cfg.env_frameskip > 1 else "" + log.info( + f"Episodes collected: {episodes_sampled}, Samples collected: {env_steps_sampled}, throughput: {fps:.1f} FPS{fps_frameskip_str}" + ) + + +def _print_eval_summaries(cfg, eval_stats): + for policy_id in range(cfg.num_policies): + results = {} + for key, stat in eval_stats.items(): + stat_value = np.mean(stat[policy_id]) + + if "/" in key: + # custom summaries have their own sections in tensorboard + avg_tag = key + min_tag = f"{key}_min" + max_tag = f"{key}_max" + elif key in ("reward", "len"): + # reward and length get special treatment + avg_tag = f"{key}/{key}" + min_tag = f"{key}/{key}_min" + max_tag = f"{key}/{key}_max" + else: + avg_tag = f"policy_stats/avg_{key}" + min_tag = f"policy_stats/avg_{key}_min" + max_tag = f"policy_stats/avg_{key}_max" + + results[avg_tag] = float(stat_value) + + # for key stats report min/max as well + if key in ("reward", "true_objective", "len"): + results[min_tag] = float(min(stat[policy_id])) + results[max_tag] = float(max(stat[policy_id])) + + log.info(json.dumps(results, indent=4)) + + +def _save_eval_results(cfg, eval_stats): + for policy_id in range(cfg.num_policies): + data = {} + for key, stat in eval_stats.items(): + data[key] = stat[policy_id] + + csv_output_dir = Path(experiment_dir(cfg=cfg)) + if cfg.csv_folder_name is not None: + csv_output_dir = csv_output_dir / cfg.csv_folder_name + csv_output_dir.mkdir(exist_ok=True, parents=True) + csv_output_path = csv_output_dir / f"eval_p{policy_id}.csv" + + data = pd.DataFrame(data) + data.to_csv(csv_output_path) + + +def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: int = 1024) -> StatusCode: + sampler = EvalSamplingAPI(cfg, env_info) + sampler.init() + sampler.start() + + print_interval_sec = 1.0 + fps_stats = deque([(time.time(), 0, 0)], maxlen=10) + episodes_sampled = 0 + last_print = time.time() + + while episodes_sampled < sample_env_episodes: + try: + if time.time() - last_print > print_interval_sec: + # TODO: for now we only look at the first policy, + policy_id = 0 + episodes_sampled = len(sampler.eval_episodes[policy_id]) + env_steps_sampled = sampler.total_samples + + fps_stats.append((time.time(), episodes_sampled, env_steps_sampled)) + _print_fps_stats(cfg, fps_stats) + last_print = time.time() + + log.info(f"Progress: {episodes_sampled}/{sample_env_episodes} episodes sampled") + except KeyboardInterrupt: + log.info(f"KeyboardInterrupt in {generate_trajectories.__name__}()") + break + + status = sampler.stop() + + # TODO: log results to tensorboard? + _print_eval_summaries(cfg, sampler.eval_stats) + _save_eval_results(cfg, sampler.eval_stats) + + return status + + +def do_eval(cfg: Config) -> StatusCode: + # should always be set to True for this script + cfg.episode_counter = True + # decorrelation isn't needed in eval, it only slows us down + cfg.decorrelate_envs_on_one_worker = False + env_info = obtain_env_info_in_a_separate_process(cfg) + return generate_trajectories(cfg, env_info, cfg.sample_env_episodes) diff --git a/setup.py b/setup.py index 77e3ab12c..46254b904 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ _atari_deps = ["gymnasium[atari, accept-rom-license]"] -_mujoco_deps = ["gymnasium[mujoco]", "mujoco<=2.3.3"] +_mujoco_deps = ["gymnasium[mujoco]", "mujoco<2.5"] _envpool_deps = ["envpool"] _docs_deps = [ @@ -56,6 +56,7 @@ "opencv-python", "wandb>=0.12.9", "huggingface-hub>=0.10.0,<1.0", + "pandas", ], extras_require={ # some tests require Atari and Mujoco so let's make sure dev environment has that diff --git a/sf_examples/mujoco/fast_eval_mujoco.py b/sf_examples/mujoco/fast_eval_mujoco.py new file mode 100644 index 000000000..d0cc8abfd --- /dev/null +++ b/sf_examples/mujoco/fast_eval_mujoco.py @@ -0,0 +1,27 @@ +import sys + +from sample_factory.cfg.arguments import checkpoint_override_defaults, parse_full_cfg, parse_sf_args +from sample_factory.eval import do_eval +from sf_examples.mujoco.mujoco_params import add_mujoco_env_args, mujoco_override_defaults +from sf_examples.mujoco.train_mujoco import register_mujoco_components + + +def main(): + """Script entry point.""" + register_mujoco_components() + parser, cfg = parse_sf_args(evaluation=True) + add_mujoco_env_args(cfg.env, parser) + mujoco_override_defaults(cfg.env, parser) + + # important, instead of `load_from_checkpoint` as in enjoy we want + # to override it here to be able to use argv arguments + checkpoint_override_defaults(cfg, parser) + + cfg = parse_full_cfg(parser) + + status = do_eval(cfg) + return status + + +if __name__ == "__main__": + sys.exit(main())