From 67fab9d362e070f690564a373eff27a46587a934 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Wed, 13 Dec 2023 11:21:43 +0100 Subject: [PATCH 01/18] episode counter --- sample_factory/cfg/cfg.py | 6 ++++++ sample_factory/envs/create_env.py | 4 ++++ sample_factory/envs/env_wrappers.py | 20 ++++++++++++++++++++ 3 files changed, 30 insertions(+) diff --git a/sample_factory/cfg/cfg.py b/sample_factory/cfg/cfg.py index 6b1b53e12..541053662 100644 --- a/sample_factory/cfg/cfg.py +++ b/sample_factory/cfg/cfg.py @@ -636,6 +636,12 @@ def add_default_env_args(p: ArgumentParser): type=str2bool, help="Whether to use gym RecordEpisodeStatistics wrapper to keep track of reward", ) + p.add_argument( + "--episode_counter", + default=False, + type=str2bool, + help="Add wrapper to each env which will count the number of episodes for each env.", + ) def add_eval_args(parser): diff --git a/sample_factory/envs/create_env.py b/sample_factory/envs/create_env.py index 153f6684e..c38c0fe52 100644 --- a/sample_factory/envs/create_env.py +++ b/sample_factory/envs/create_env.py @@ -4,6 +4,7 @@ from sample_factory.algo.utils.context import global_env_registry from sample_factory.algo.utils.gymnasium_utils import patch_non_gymnasium_env +from sample_factory.envs.env_wrappers import EpisodeCounterWrapper from sample_factory.utils.attr_dict import AttrDict from sample_factory.utils.typing import Config from sample_factory.utils.utils import log @@ -39,4 +40,7 @@ def create_env( env = patch_non_gymnasium_env(env) + if "episode_counter" in cfg and cfg.episode_counter: + env = EpisodeCounterWrapper(env) + return env diff --git a/sample_factory/envs/env_wrappers.py b/sample_factory/envs/env_wrappers.py index e4acbb987..b16af9060 100644 --- a/sample_factory/envs/env_wrappers.py +++ b/sample_factory/envs/env_wrappers.py @@ -434,3 +434,23 @@ class NumpyObsWrapper(gym.ObservationWrapper): def observation(self, observation: Any) -> np.ndarray: return np.array(observation) + + +class EpisodeCounterWrapper(gym.Wrapper): + def __init__(self, env): + gym.Wrapper.__init__(self, env) + self.episode_count = 0 + + def reset(self, **kwargs) -> Tuple[GymObs, Dict]: + return self.env.reset(**kwargs) + + def step(self, action: int) -> GymStepReturn: + obs, reward, terminated, truncated, info = self.env.step(action) + + if terminated | truncated: + extra_stats = info.get("episode_extra_stats", {}) + extra_stats["episode_number"] = self.episode_count + info["episode_extra_stats"] = extra_stats + self.episode_count += 1 + + return obs, reward, terminated, truncated, info From f3d39228352a93f782a092efae30ff1a20e992e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Wed, 13 Dec 2023 12:53:21 +0100 Subject: [PATCH 02/18] create skeleton for evaluation with async sampling --- .../algo/sampling/evaluation_sampling_api.py | 267 ++++++++++++++++++ .../sampler/use_evaluation_sampling_api.py | 89 ++++++ 2 files changed, 356 insertions(+) create mode 100644 sample_factory/algo/sampling/evaluation_sampling_api.py create mode 100644 sf_examples/sampler/use_evaluation_sampling_api.py diff --git a/sample_factory/algo/sampling/evaluation_sampling_api.py b/sample_factory/algo/sampling/evaluation_sampling_api.py new file mode 100644 index 000000000..8e30fc456 --- /dev/null +++ b/sample_factory/algo/sampling/evaluation_sampling_api.py @@ -0,0 +1,267 @@ +from __future__ import annotations + +import time +from collections import OrderedDict +from queue import Empty, Full, Queue +from threading import Thread +from typing import Callable, Deque, Dict, Iterable, List, Optional, Tuple + +import numpy as np +from signal_slot.signal_slot import EventLoop, EventLoopObject, EventLoopStatus, signal + +from sample_factory.algo.runners.runner import MsgHandler, PolicyMsgHandler, Runner +from sample_factory.algo.sampling.sampler import AbstractSampler, ParallelSampler, SerialSampler +from sample_factory.algo.utils.env_info import EnvInfo +from sample_factory.algo.utils.misc import EPISODIC, SAMPLES_COLLECTED, STATS_KEY, TIMING_STATS, ExperimentStatus +from sample_factory.algo.utils.model_sharing import ParameterServer +from sample_factory.algo.utils.shared_buffers import BufferMgr +from sample_factory.algo.utils.tensor_dict import TensorDict, clone_tensordict +from sample_factory.cfg.configurable import Configurable +from sample_factory.utils.dicts import iterate_recursively +from sample_factory.utils.gpu_utils import set_global_cuda_envvars +from sample_factory.utils.typing import Config, InitModelData, PolicyID, StatusCode +from sample_factory.utils.utils import log + + +class SamplingLoop(EventLoopObject, Configurable): + def __init__(self, cfg: Config, env_info: EnvInfo): + Configurable.__init__(self, cfg) + unique_name = SamplingLoop.__name__ + self.event_loop: EventLoop = EventLoop(unique_loop_name=f"{unique_name}_EvtLoop", serial_mode=cfg.serial_mode) + self.event_loop.owner = self + EventLoopObject.__init__(self, self.event_loop, object_id=unique_name) + # self.event_loop.verbose = True + + self.env_info = env_info + self.iteration: int = 0 + + self.buffer_mgr: Optional[BufferMgr] = None + self.param_servers: Optional[Dict[PolicyID, ParameterServer]] = None + + self.new_trajectory_callback: Optional[Callable] = None + self.status: Optional[StatusCode] = None + + self.ready: bool = False + self.stopped: bool = False + + # samples_collected counts the total number of observations processed by the algorithm + self.samples_collected = [0 for _ in range(self.cfg.num_policies)] + + self.stats = dict() # regular (non-averaged) stats + self.avg_stats = dict() + + self.policy_avg_stats: Dict[str, List[Deque]] = dict() + + # global msg handlers for messages from algo components + self.msg_handlers: Dict[str, List[MsgHandler]] = { + TIMING_STATS: [Runner._timing_msg_handler], + STATS_KEY: [Runner._stats_msg_handler], + } + + # handlers for policy-specific messages + self.policy_msg_handlers: Dict[str, List[PolicyMsgHandler]] = { + EPISODIC: [self._episodic_stats_handler], + SAMPLES_COLLECTED: [Runner._samples_stats_handler], + } + + @signal + def model_initialized(self): + ... + + @signal + def trajectory_buffers_available(self): + ... + + @signal + def stop(self): + ... + + def init( + self, buffer_mgr: Optional[BufferMgr] = None, param_servers: Optional[Dict[PolicyID, ParameterServer]] = None + ): + set_global_cuda_envvars(self.cfg) + + self.buffer_mgr = buffer_mgr + if self.buffer_mgr is None: + self.buffer_mgr = BufferMgr(self.cfg, self.env_info) + + self.param_servers = param_servers + if self.param_servers is None: + self.param_servers = dict() + for policy_id in range(self.cfg.num_policies): + self.param_servers[policy_id] = ParameterServer( + policy_id, self.buffer_mgr.policy_versions, self.cfg.serial_mode + ) + + sampler_cls = SerialSampler if self.cfg.serial_mode else ParallelSampler + sampler: AbstractSampler = sampler_cls( + self.event_loop, self.buffer_mgr, self.param_servers, self.cfg, self.env_info + ) + self.event_loop.start.connect(sampler.init) + sampler.started.connect(self.on_sampler_started) + sampler.initialized.connect(self.on_sampler_initialized) + + for policy_id in range(self.cfg.num_policies): + sampler.connect_model_initialized(policy_id, self.model_initialized) + sampler.connect_on_new_trajectories(policy_id, self.on_new_trajectories) + sampler.connect_trajectory_buffers_available(self.trajectory_buffers_available) + sampler.connect_report_msg(self._process_msg) + + for stoppable in sampler.stoppable_components(): + self.stop.connect(stoppable.on_stop) + + def _process_msg(self, msgs): + if isinstance(msgs, (dict, OrderedDict)): + msgs = (msgs,) + + if not (isinstance(msgs, (List, Tuple)) and isinstance(msgs[0], (dict, OrderedDict))): + log.error("While parsing a message: expected a dictionary or list/tuple of dictionaries, found %r", msgs) + return + + for msg in msgs: + # some messages are policy-specific + policy_id = msg.get("policy_id", None) + + for key in msg: + for handler in self.msg_handlers.get(key, ()): + handler(self, msg) + if policy_id is not None: + for handler in self.policy_msg_handlers.get(key, ()): + handler(self, msg, policy_id) + + @staticmethod + def _episodic_stats_handler(runner: Runner, msg: Dict, policy_id: PolicyID) -> None: + # the only difference between this function and the one from `Runner` + # is that we store all stats in a list and not in the deque + s = msg[EPISODIC] + for _, key, value in iterate_recursively(s): + if key not in runner.policy_avg_stats: + runner.policy_avg_stats[key] = [[] for _ in range(runner.cfg.num_policies)] + + if isinstance(value, np.ndarray) and value.ndim > 0: + # if len(value) > runner.policy_avg_stats[key][policy_id].maxlen: + # # increase maxlen to make sure we never ignore any stats from the environments + # runner.policy_avg_stats[key][policy_id] = deque(maxlen=len(value)) + + runner.policy_avg_stats[key][policy_id].extend(value) + else: + runner.policy_avg_stats[key][policy_id].append(value) + + def wait_until_ready(self): + while not self.ready: + log.debug(f"{self.object_id}: waiting for sampler to be ready...") + time.sleep(0.5) + + def start(self, init_model_data: Optional[Dict[PolicyID, InitModelData]] = None): + """Model initialization should kickstart the sampling loop.""" + for policy_id in range(self.cfg.num_policies): + if init_model_data is None: + self.model_initialized.emit(None) + else: + self.model_initialized.emit(init_model_data[policy_id]) + + def set_new_trajectory_callback(self, cb: Callable) -> None: + self.new_trajectory_callback = cb + + def on_sampler_started(self): + self.ready = True + + def on_sampler_initialized(self): + log.debug(f"{self.object_id}: sampler fully initialized!") + + def on_new_trajectories(self, trajectory_dicts: Iterable[Dict], device: str): + for trajectory_dict in trajectory_dicts: + traj_buffer_idx: int | slice = trajectory_dict["traj_buffer_idx"] + if isinstance(traj_buffer_idx, slice): + trajectory_slice = traj_buffer_idx + else: + trajectory_slice = slice(traj_buffer_idx, traj_buffer_idx + 1) + + # data for this trajectory is now available in the buffer + # always use a slice so that returned tensors are the same dimensionality regardless of whether we + # use batched or non-batched sampling + traj = self.buffer_mgr.traj_tensors_torch[device][trajectory_slice] + self.new_trajectory_callback(traj, [traj_buffer_idx], device) + + def yield_trajectory_buffers(self, available_buffers: Iterable[int | slice], device: str): + # make this trajectory buffer available again + self.buffer_mgr.traj_buffer_queues[device].put_many(available_buffers) + self.iteration += 1 + for policy_id in range(self.cfg.num_policies): + self.trajectory_buffers_available.emit(policy_id, self.iteration) + + def run(self) -> StatusCode: + log.debug("Before event loop...") + + # noinspection PyBroadException + try: + evt_loop_status = self.event_loop.exec() + self.status = ( + ExperimentStatus.INTERRUPTED + if evt_loop_status == EventLoopStatus.INTERRUPTED + else ExperimentStatus.SUCCESS + ) + self.stop.emit() + except Exception: + log.exception(f"Uncaught exception in {self.object_id} evt loop") + self.status = ExperimentStatus.FAILURE + + log.debug(f"{SamplingLoop.__name__} finished with {self.status=}") + return self.status + + def stop_sampling(self): + self.stop.emit() + self.event_loop.stop() + self.stopped = True + + +class EvalSamplingAPI: + def __init__( + self, + cfg: Config, + env_info: EnvInfo, + buffer_mgr: Optional[BufferMgr] = None, + param_servers: Optional[Dict[PolicyID, ParameterServer]] = None, + ): + self.sampling_loop: SamplingLoop = SamplingLoop(cfg, env_info) + self.sampling_loop.init(buffer_mgr, param_servers) + self.sampling_loop.set_new_trajectory_callback(self._on_new_trajectories) + self.sampling_thread: Thread = Thread(target=self.sampling_loop.run) + self.sampling_thread.start() + + self.sampling_loop.wait_until_ready() + self.traj_queue: Queue = Queue(maxsize=100) + + def start(self, init_model_data: Optional[Dict[PolicyID, InitModelData]] = None): + self.sampling_loop.start(init_model_data) + + def _on_new_trajectories(self, traj: TensorDict, traj_buffer_indices: Iterable[int | slice], device: str): + traj_clone = clone_tensordict(traj) # we copied the data so we can release the buffer + + # just release buffers after every trajectory + # we could alternatively have more sophisticated logic here, see i.e. batcher.py + self.sampling_loop.yield_trajectory_buffers(traj_buffer_indices, device) + + while not self.sampling_loop.stopped: + try: + self.traj_queue.put(traj_clone, timeout=1.0, block=True) + break + except Full: + log.debug(f"{self._on_new_trajectories.__name__}: trajectory queue full, waiting...") + self.sampling_loop.event_loop.process_events() + + def get_trajectories_sync(self) -> Optional[TensorDict]: + while not self.sampling_loop.stopped: + try: + traj = self.traj_queue.get(timeout=5.0) + return traj + except Empty: + log.debug(f"{self.get_trajectories_sync.__name__}(): waiting for trajectories...") + continue + + return None + + def stop(self) -> StatusCode: + self.sampling_loop.stop_sampling() + self.sampling_thread.join() + return self.sampling_loop.status diff --git a/sf_examples/sampler/use_evaluation_sampling_api.py b/sf_examples/sampler/use_evaluation_sampling_api.py new file mode 100644 index 000000000..2ac22226b --- /dev/null +++ b/sf_examples/sampler/use_evaluation_sampling_api.py @@ -0,0 +1,89 @@ +import sys +import time +from collections import deque +from typing import Deque + +import numpy as np +from signal_slot.signal_slot import StatusCode + +from sample_factory.algo.sampling.evaluation_sampling_api import EvalSamplingAPI +from sample_factory.algo.utils.env_info import EnvInfo, obtain_env_info_in_a_separate_process +from sample_factory.algo.utils.rl_utils import samples_per_trajectory +from sample_factory.utils.typing import Config +from sample_factory.utils.utils import log +from sf_examples.atari.train_atari import parse_atari_args, register_atari_components + + +def _print_fps_stats(cfg: Config, fps_stats: Deque): + episodes_sampled = fps_stats[-1][1] + env_steps_sampled = fps_stats[-1][2] + delta_sampled = env_steps_sampled - fps_stats[0][2] + delta_time = fps_stats[-1][0] - fps_stats[0][0] + fps = delta_sampled / delta_time + fps_frameskip = fps * cfg.env_frameskip + fps_frameskip_str = f" ({fps_frameskip:.1f} FPS with frameskip)" if cfg.env_frameskip > 1 else "" + log.debug( + f"Episodes collected: {episodes_sampled}, Samples collected: {env_steps_sampled}, throughput: {fps:.1f}, FPS{fps_frameskip_str}" + ) + + +def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: int = 1024) -> StatusCode: + sampler = EvalSamplingAPI(cfg, env_info) + sampler.start() + + batch_size = cfg.batch_size // cfg.rollout + max_episode_number = sample_env_episodes // batch_size + + print_interval_sec = 1.0 + fps_stats = deque([(time.time(), 0, 0)], maxlen=10) + episodes_sampled = 0 + env_steps_sampled = 0 + last_print = time.time() + + while episodes_sampled < sample_env_episodes: + try: + trajectory = sampler.get_trajectories_sync() + if trajectory is None: + break + + episode_numbers = sampler.sampling_loop.policy_avg_stats.get( + "episode_number", [[] for _ in range(cfg.num_policies)] + ) + # TODO: for now we only look at the first policy, but should handle all later + episode_numbers = np.array(episode_numbers[0]) + valid = episode_numbers < max_episode_number + episode_numbers = episode_numbers[valid] + + episodes_sampled = episode_numbers.sum() + env_steps_sampled += samples_per_trajectory(trajectory) + + if time.time() - last_print > print_interval_sec: + fps_stats.append((time.time(), episodes_sampled, env_steps_sampled)) + _print_fps_stats(cfg, fps_stats) + last_print = time.time() + except KeyboardInterrupt: + log.info(f"KeyboardInterrupt in {generate_trajectories.__name__}()") + break + + status = sampler.stop() + return status + + +def main() -> StatusCode: + register_atari_components() + # TODO: add more arguments + cfg = parse_atari_args( + [ + "--env=atari_mspacman", + "--batch_size=2048", + "--episode_counter=True", + # "--serial_mode=True", + ] + ) + env_info = obtain_env_info_in_a_separate_process(cfg) + + return generate_trajectories(cfg, env_info) + + +if __name__ == "__main__": + sys.exit(main()) From 07e6b817fee010e14e490e800480521678e90c5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Thu, 14 Dec 2023 15:43:12 +0100 Subject: [PATCH 03/18] create eval script with the global purpose for every environment --- sample_factory/cfg/arguments.py | 24 ++++ sample_factory/eval.py | 131 ++++++++++++++++++ .../sampler/use_evaluation_sampling_api.py | 89 ------------ 3 files changed, 155 insertions(+), 89 deletions(-) create mode 100644 sample_factory/eval.py delete mode 100644 sf_examples/sampler/use_evaluation_sampling_api.py diff --git a/sample_factory/cfg/arguments.py b/sample_factory/cfg/arguments.py index f736342d1..e1930db31 100644 --- a/sample_factory/cfg/arguments.py +++ b/sample_factory/cfg/arguments.py @@ -273,3 +273,27 @@ def maybe_load_from_checkpoint(cfg: Config) -> AttrDict: return AttrDict(vars(cfg)) return load_from_checkpoint(cfg) + + +def checkpoint_override_defaults(cfg: Config, parser) -> AttrDict: + cfg_filename = cfg_file(cfg) + cfg_filename_old = cfg_file_old(cfg) + + if not os.path.isfile(cfg_filename) and os.path.isfile(cfg_filename_old): + # rename old config file + log.warning(f"Loading legacy config file {cfg_filename_old} instead of {cfg_filename}") + os.rename(cfg_filename_old, cfg_filename) + + if not os.path.isfile(cfg_filename): + raise Exception( + f"Could not load saved parameters for experiment {cfg.experiment} " + f"(file {cfg_filename} not found). Check that you have the correct experiment name " + f"and --train_dir is set correctly." + ) + + with open(cfg_filename, "r") as json_file: + json_params = json.load(json_file) + log.warning("Loading existing experiment configuration from %s", cfg_filename) + loaded_cfg = AttrDict(json_params) + + parser.set_defaults(**loaded_cfg) diff --git a/sample_factory/eval.py b/sample_factory/eval.py new file mode 100644 index 000000000..40a0a8278 --- /dev/null +++ b/sample_factory/eval.py @@ -0,0 +1,131 @@ +import json +import time +from collections import deque +from typing import Deque + +import numpy as np +import pandas as pd +from signal_slot.signal_slot import StatusCode +from torch import Tensor + +from sample_factory.algo.learning.learner import Learner +from sample_factory.algo.sampling.evaluation_sampling_api import EvalSamplingAPI +from sample_factory.algo.utils.env_info import EnvInfo, obtain_env_info_in_a_separate_process +from sample_factory.algo.utils.model_sharing import ParameterServer +from sample_factory.algo.utils.rl_utils import samples_per_trajectory +from sample_factory.algo.utils.shared_buffers import BufferMgr +from sample_factory.utils.gpu_utils import set_global_cuda_envvars +from sample_factory.utils.typing import Config +from sample_factory.utils.utils import log + + +def _print_fps_stats(cfg: Config, fps_stats: Deque): + episodes_sampled = fps_stats[-1][1] + env_steps_sampled = fps_stats[-1][2] + delta_sampled = env_steps_sampled - fps_stats[0][2] + delta_time = fps_stats[-1][0] - fps_stats[0][0] + fps = delta_sampled / delta_time + fps_frameskip = fps * cfg.env_frameskip + fps_frameskip_str = f" ({fps_frameskip:.1f} FPS with frameskip)" if cfg.env_frameskip > 1 else "" + log.debug( + f"Episodes collected: {episodes_sampled}, Samples collected: {env_steps_sampled}, throughput: {fps:.1f} FPS{fps_frameskip_str}" + ) + + +def _print_experiment_summaries(cfg, policy_avg_stats): + for policy_id in range(cfg.num_policies): + data = pd.DataFrame(policy_avg_stats) + data.to_csv(f"eval{policy_id}.csv") + + results = {} + for key, stat in policy_avg_stats.items(): + stat_value = np.mean(stat[policy_id]) + + if "/" in key: + # custom summaries have their own sections in tensorboard + avg_tag = key + min_tag = f"{key}_min" + max_tag = f"{key}_max" + elif key in ("reward", "len"): + # reward and length get special treatment + avg_tag = f"{key}/{key}" + min_tag = f"{key}/{key}_min" + max_tag = f"{key}/{key}_max" + else: + avg_tag = f"policy_stats/avg_{key}" + min_tag = f"policy_stats/avg_{key}_min" + max_tag = f"policy_stats/avg_{key}_max" + + results[avg_tag] = float(stat_value) + + # for key stats report min/max as well + if key in ("reward", "true_objective", "len"): + results[min_tag] = float(min(stat[policy_id])) + results[max_tag] = float(max(stat[policy_id])) + + print(json.dumps(results, indent=4)) + + +def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: int = 1024) -> StatusCode: + set_global_cuda_envvars(cfg) + buffer_mgr = BufferMgr(cfg, env_info) + policy_versions_tensor: Tensor = buffer_mgr.policy_versions + + param_servers = {} + init_model_data = {} + learners = {} + for policy_id in range(cfg.num_policies): + param_servers[policy_id] = ParameterServer(policy_id, policy_versions_tensor, cfg.serial_mode) + learners[policy_id] = Learner(cfg, env_info, policy_versions_tensor, policy_id, param_servers[policy_id]) + init_model_data[policy_id] = learners[policy_id].init() + + sampler = EvalSamplingAPI(cfg, env_info, buffer_mgr=buffer_mgr, param_servers=param_servers) + sampler.start(init_model_data=init_model_data) + + batch_size = cfg.batch_size // cfg.rollout + max_episode_number = sample_env_episodes // batch_size + + print_interval_sec = 1.0 + fps_stats = deque([(time.time(), 0, 0)], maxlen=10) + episodes_sampled = 0 + env_steps_sampled = 0 + last_print = time.time() + + while episodes_sampled < sample_env_episodes: + try: + trajectory = sampler.get_trajectories_sync() + if trajectory is None: + break + + episode_numbers = sampler.sampling_loop.policy_avg_stats.get( + "episode_number", [[] for _ in range(cfg.num_policies)] + ) + # TODO: for now we only look at the first policy, + # maybe even in MARL we will look only at first policy? + episode_numbers = np.array(episode_numbers[0]) + valid = episode_numbers < max_episode_number + episodes_sampled = valid.sum() + env_steps_sampled += samples_per_trajectory(trajectory) + + if time.time() - last_print > print_interval_sec: + fps_stats.append((time.time(), episodes_sampled, env_steps_sampled)) + _print_fps_stats(cfg, fps_stats) + last_print = time.time() + except KeyboardInterrupt: + log.info(f"KeyboardInterrupt in {generate_trajectories.__name__}()") + break + + status = sampler.stop() + + # TODO: log results to tensorboard? + # print experiment summaries + _print_experiment_summaries(cfg, sampler.sampling_loop.policy_avg_stats) + + return status + + +def eval(cfg: Config) -> StatusCode: + # we override batch size to be exa + cfg.batch_size = cfg.num_workers * cfg.num_envs_per_worker * cfg.worker_num_splits + env_info = obtain_env_info_in_a_separate_process(cfg) + return generate_trajectories(cfg, env_info, cfg.sample_env_episodes) diff --git a/sf_examples/sampler/use_evaluation_sampling_api.py b/sf_examples/sampler/use_evaluation_sampling_api.py deleted file mode 100644 index 2ac22226b..000000000 --- a/sf_examples/sampler/use_evaluation_sampling_api.py +++ /dev/null @@ -1,89 +0,0 @@ -import sys -import time -from collections import deque -from typing import Deque - -import numpy as np -from signal_slot.signal_slot import StatusCode - -from sample_factory.algo.sampling.evaluation_sampling_api import EvalSamplingAPI -from sample_factory.algo.utils.env_info import EnvInfo, obtain_env_info_in_a_separate_process -from sample_factory.algo.utils.rl_utils import samples_per_trajectory -from sample_factory.utils.typing import Config -from sample_factory.utils.utils import log -from sf_examples.atari.train_atari import parse_atari_args, register_atari_components - - -def _print_fps_stats(cfg: Config, fps_stats: Deque): - episodes_sampled = fps_stats[-1][1] - env_steps_sampled = fps_stats[-1][2] - delta_sampled = env_steps_sampled - fps_stats[0][2] - delta_time = fps_stats[-1][0] - fps_stats[0][0] - fps = delta_sampled / delta_time - fps_frameskip = fps * cfg.env_frameskip - fps_frameskip_str = f" ({fps_frameskip:.1f} FPS with frameskip)" if cfg.env_frameskip > 1 else "" - log.debug( - f"Episodes collected: {episodes_sampled}, Samples collected: {env_steps_sampled}, throughput: {fps:.1f}, FPS{fps_frameskip_str}" - ) - - -def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: int = 1024) -> StatusCode: - sampler = EvalSamplingAPI(cfg, env_info) - sampler.start() - - batch_size = cfg.batch_size // cfg.rollout - max_episode_number = sample_env_episodes // batch_size - - print_interval_sec = 1.0 - fps_stats = deque([(time.time(), 0, 0)], maxlen=10) - episodes_sampled = 0 - env_steps_sampled = 0 - last_print = time.time() - - while episodes_sampled < sample_env_episodes: - try: - trajectory = sampler.get_trajectories_sync() - if trajectory is None: - break - - episode_numbers = sampler.sampling_loop.policy_avg_stats.get( - "episode_number", [[] for _ in range(cfg.num_policies)] - ) - # TODO: for now we only look at the first policy, but should handle all later - episode_numbers = np.array(episode_numbers[0]) - valid = episode_numbers < max_episode_number - episode_numbers = episode_numbers[valid] - - episodes_sampled = episode_numbers.sum() - env_steps_sampled += samples_per_trajectory(trajectory) - - if time.time() - last_print > print_interval_sec: - fps_stats.append((time.time(), episodes_sampled, env_steps_sampled)) - _print_fps_stats(cfg, fps_stats) - last_print = time.time() - except KeyboardInterrupt: - log.info(f"KeyboardInterrupt in {generate_trajectories.__name__}()") - break - - status = sampler.stop() - return status - - -def main() -> StatusCode: - register_atari_components() - # TODO: add more arguments - cfg = parse_atari_args( - [ - "--env=atari_mspacman", - "--batch_size=2048", - "--episode_counter=True", - # "--serial_mode=True", - ] - ) - env_info = obtain_env_info_in_a_separate_process(cfg) - - return generate_trajectories(cfg, env_info) - - -if __name__ == "__main__": - sys.exit(main()) From 023590969d5b0ed299cb017fb708d7a98c988105 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Wed, 13 Dec 2023 17:52:16 +0100 Subject: [PATCH 04/18] final cleanup for eval --- sample_factory/eval.py | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/sample_factory/eval.py b/sample_factory/eval.py index 40a0a8278..699d39105 100644 --- a/sample_factory/eval.py +++ b/sample_factory/eval.py @@ -32,11 +32,8 @@ def _print_fps_stats(cfg: Config, fps_stats: Deque): ) -def _print_experiment_summaries(cfg, policy_avg_stats): +def _print_eval_summaries(cfg, policy_avg_stats): for policy_id in range(cfg.num_policies): - data = pd.DataFrame(policy_avg_stats) - data.to_csv(f"eval{policy_id}.csv") - results = {} for key, stat in policy_avg_stats.items(): stat_value = np.mean(stat[policy_id]) @@ -66,6 +63,16 @@ def _print_experiment_summaries(cfg, policy_avg_stats): print(json.dumps(results, indent=4)) +def _save_eval_results(cfg, policy_avg_stats): + for policy_id in range(cfg.num_policies): + data = {} + for key, stat in policy_avg_stats.items(): + data[key] = stat[policy_id] + + data = pd.DataFrame(data) + data.to_csv(f"eval{policy_id}.csv") + + def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: int = 1024) -> StatusCode: set_global_cuda_envvars(cfg) buffer_mgr = BufferMgr(cfg, env_info) @@ -82,8 +89,9 @@ def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: i sampler = EvalSamplingAPI(cfg, env_info, buffer_mgr=buffer_mgr, param_servers=param_servers) sampler.start(init_model_data=init_model_data) - batch_size = cfg.batch_size // cfg.rollout - max_episode_number = sample_env_episodes // batch_size + # we override batch size to be the same as total number of environments + total_envs = cfg.num_workers * cfg.num_envs_per_worker + max_episode_number = sample_env_episodes / total_envs print_interval_sec = 1.0 fps_stats = deque([(time.time(), 0, 0)], maxlen=10) @@ -102,7 +110,10 @@ def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: i ) # TODO: for now we only look at the first policy, # maybe even in MARL we will look only at first policy? - episode_numbers = np.array(episode_numbers[0]) + policy_id = 0 + episode_numbers = np.array(episode_numbers[policy_id]) + # we ignore some of the episodes because we only want to look at first N episodes + # to enforce it we use wrapper with counts number of episodes for each env valid = episode_numbers < max_episode_number episodes_sampled = valid.sum() env_steps_sampled += samples_per_trajectory(trajectory) @@ -118,14 +129,14 @@ def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: i status = sampler.stop() # TODO: log results to tensorboard? - # print experiment summaries - _print_experiment_summaries(cfg, sampler.sampling_loop.policy_avg_stats) + _print_eval_summaries(cfg, sampler.sampling_loop.policy_avg_stats) + _save_eval_results(cfg, sampler.sampling_loop.policy_avg_stats) return status def eval(cfg: Config) -> StatusCode: - # we override batch size to be exa - cfg.batch_size = cfg.num_workers * cfg.num_envs_per_worker * cfg.worker_num_splits + # should always be set to True for this script + cfg.episode_counter = True env_info = obtain_env_info_in_a_separate_process(cfg) return generate_trajectories(cfg, env_info, cfg.sample_env_episodes) From 52e68e1880602a30fc3f19bc7f9e35d8d3f9be5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Wed, 13 Dec 2023 18:06:56 +0100 Subject: [PATCH 05/18] move some code to EvalSamplingApi.init --- .../algo/sampling/evaluation_sampling_api.py | 47 +++++++++++++++++-- sample_factory/eval.py | 38 ++++----------- 2 files changed, 52 insertions(+), 33 deletions(-) diff --git a/sample_factory/algo/sampling/evaluation_sampling_api.py b/sample_factory/algo/sampling/evaluation_sampling_api.py index 8e30fc456..fbc52399d 100644 --- a/sample_factory/algo/sampling/evaluation_sampling_api.py +++ b/sample_factory/algo/sampling/evaluation_sampling_api.py @@ -8,7 +8,9 @@ import numpy as np from signal_slot.signal_slot import EventLoop, EventLoopObject, EventLoopStatus, signal +from torch import Tensor +from sample_factory.algo.learning.learner import Learner from sample_factory.algo.runners.runner import MsgHandler, PolicyMsgHandler, Runner from sample_factory.algo.sampling.sampler import AbstractSampler, ParallelSampler, SerialSampler from sample_factory.algo.utils.env_info import EnvInfo @@ -220,19 +222,54 @@ def __init__( self, cfg: Config, env_info: EnvInfo, - buffer_mgr: Optional[BufferMgr] = None, - param_servers: Optional[Dict[PolicyID, ParameterServer]] = None, ): - self.sampling_loop: SamplingLoop = SamplingLoop(cfg, env_info) - self.sampling_loop.init(buffer_mgr, param_servers) + self.cfg = cfg + self.env_info = env_info + + self.buffer_mgr = None + self.policy_versions_tensor = None + self.param_servers: Dict[PolicyID, ParameterServer] = None + self.init_model_data: Dict[PolicyID, InitModelData] = None + self.learners: Dict[PolicyID, Learner] = None + + self.sampling_loop: SamplingLoop = None + self.traj_queue: Queue = Queue(maxsize=100) + + def init(self): + set_global_cuda_envvars(self.cfg) + + self.buffer_mgr = BufferMgr(self.cfg, self.env_info) + self.policy_versions_tensor: Tensor = self.buffer_mgr.policy_versions + + self.param_servers = {} + self.init_model_data = {} + self.learners = {} + for policy_id in range(self.cfg.num_policies): + self.param_servers[policy_id] = ParameterServer( + policy_id, self.policy_versions_tensor, self.cfg.serial_mode + ) + self.learners[policy_id] = Learner( + self.cfg, self.env_info, self.policy_versions_tensor, policy_id, self.param_servers[policy_id] + ) + self.init_model_data[policy_id] = self.learners[policy_id].init() + + self.sampling_loop: SamplingLoop = SamplingLoop(self.cfg, self.env_info) + self.sampling_loop.init(self.buffer_mgr, self.param_servers) self.sampling_loop.set_new_trajectory_callback(self._on_new_trajectories) self.sampling_thread: Thread = Thread(target=self.sampling_loop.run) self.sampling_thread.start() self.sampling_loop.wait_until_ready() - self.traj_queue: Queue = Queue(maxsize=100) + + @property + def eval_stats(self): + # it's possible that we would like to return additional stats, like fps or sth + # those could be added here + return self.sampling_loop.policy_avg_stats def start(self, init_model_data: Optional[Dict[PolicyID, InitModelData]] = None): + if init_model_data is None: + init_model_data = self.init_model_data self.sampling_loop.start(init_model_data) def _on_new_trajectories(self, traj: TensorDict, traj_buffer_indices: Iterable[int | slice], device: str): diff --git a/sample_factory/eval.py b/sample_factory/eval.py index 699d39105..81b68b102 100644 --- a/sample_factory/eval.py +++ b/sample_factory/eval.py @@ -6,15 +6,10 @@ import numpy as np import pandas as pd from signal_slot.signal_slot import StatusCode -from torch import Tensor -from sample_factory.algo.learning.learner import Learner from sample_factory.algo.sampling.evaluation_sampling_api import EvalSamplingAPI from sample_factory.algo.utils.env_info import EnvInfo, obtain_env_info_in_a_separate_process -from sample_factory.algo.utils.model_sharing import ParameterServer from sample_factory.algo.utils.rl_utils import samples_per_trajectory -from sample_factory.algo.utils.shared_buffers import BufferMgr -from sample_factory.utils.gpu_utils import set_global_cuda_envvars from sample_factory.utils.typing import Config from sample_factory.utils.utils import log @@ -32,10 +27,10 @@ def _print_fps_stats(cfg: Config, fps_stats: Deque): ) -def _print_eval_summaries(cfg, policy_avg_stats): +def _print_eval_summaries(cfg, eval_stats): for policy_id in range(cfg.num_policies): results = {} - for key, stat in policy_avg_stats.items(): + for key, stat in eval_stats.items(): stat_value = np.mean(stat[policy_id]) if "/" in key: @@ -63,10 +58,10 @@ def _print_eval_summaries(cfg, policy_avg_stats): print(json.dumps(results, indent=4)) -def _save_eval_results(cfg, policy_avg_stats): +def _save_eval_results(cfg, eval_stats): for policy_id in range(cfg.num_policies): data = {} - for key, stat in policy_avg_stats.items(): + for key, stat in eval_stats.items(): data[key] = stat[policy_id] data = pd.DataFrame(data) @@ -74,20 +69,9 @@ def _save_eval_results(cfg, policy_avg_stats): def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: int = 1024) -> StatusCode: - set_global_cuda_envvars(cfg) - buffer_mgr = BufferMgr(cfg, env_info) - policy_versions_tensor: Tensor = buffer_mgr.policy_versions - - param_servers = {} - init_model_data = {} - learners = {} - for policy_id in range(cfg.num_policies): - param_servers[policy_id] = ParameterServer(policy_id, policy_versions_tensor, cfg.serial_mode) - learners[policy_id] = Learner(cfg, env_info, policy_versions_tensor, policy_id, param_servers[policy_id]) - init_model_data[policy_id] = learners[policy_id].init() - - sampler = EvalSamplingAPI(cfg, env_info, buffer_mgr=buffer_mgr, param_servers=param_servers) - sampler.start(init_model_data=init_model_data) + sampler = EvalSamplingAPI(cfg, env_info) + sampler.init() + sampler.start() # we override batch size to be the same as total number of environments total_envs = cfg.num_workers * cfg.num_envs_per_worker @@ -105,9 +89,7 @@ def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: i if trajectory is None: break - episode_numbers = sampler.sampling_loop.policy_avg_stats.get( - "episode_number", [[] for _ in range(cfg.num_policies)] - ) + episode_numbers = sampler.eval_stats.get("episode_number", [[] for _ in range(cfg.num_policies)]) # TODO: for now we only look at the first policy, # maybe even in MARL we will look only at first policy? policy_id = 0 @@ -129,8 +111,8 @@ def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: i status = sampler.stop() # TODO: log results to tensorboard? - _print_eval_summaries(cfg, sampler.sampling_loop.policy_avg_stats) - _save_eval_results(cfg, sampler.sampling_loop.policy_avg_stats) + _print_eval_summaries(cfg, sampler.eval_stats) + _save_eval_results(cfg, sampler.eval_stats) return status From 2c07e06c7e7a0f20244ca6dbef3ee934c7ebba0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Wed, 13 Dec 2023 18:16:25 +0100 Subject: [PATCH 06/18] save csv in train dir --- sample_factory/eval.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sample_factory/eval.py b/sample_factory/eval.py index 81b68b102..cb12696a7 100644 --- a/sample_factory/eval.py +++ b/sample_factory/eval.py @@ -1,6 +1,7 @@ import json import time from collections import deque +from os.path import join from typing import Deque import numpy as np @@ -11,7 +12,7 @@ from sample_factory.algo.utils.env_info import EnvInfo, obtain_env_info_in_a_separate_process from sample_factory.algo.utils.rl_utils import samples_per_trajectory from sample_factory.utils.typing import Config -from sample_factory.utils.utils import log +from sample_factory.utils.utils import experiment_dir, log def _print_fps_stats(cfg: Config, fps_stats: Deque): @@ -65,7 +66,7 @@ def _save_eval_results(cfg, eval_stats): data[key] = stat[policy_id] data = pd.DataFrame(data) - data.to_csv(f"eval{policy_id}.csv") + data.to_csv(join(experiment_dir(cfg=cfg), f"eval_p{policy_id}_e{cfg.sample_env_episodes}.csv")) def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: int = 1024) -> StatusCode: From abb859f30c6a807e0d51e4bb1978bdc7d8132433 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Wed, 13 Dec 2023 21:22:48 +0100 Subject: [PATCH 07/18] simplify eval even more, skip adding the invalid data into the policy_stats --- .../algo/sampling/evaluation_sampling_api.py | 38 +++++++++++++------ sample_factory/eval.py | 14 +------ 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/sample_factory/algo/sampling/evaluation_sampling_api.py b/sample_factory/algo/sampling/evaluation_sampling_api.py index fbc52399d..86220f7b9 100644 --- a/sample_factory/algo/sampling/evaluation_sampling_api.py +++ b/sample_factory/algo/sampling/evaluation_sampling_api.py @@ -34,6 +34,11 @@ def __init__(self, cfg: Config, env_info: EnvInfo): EventLoopObject.__init__(self, self.event_loop, object_id=unique_name) # self.event_loop.verbose = True + # calculate how many episodes for each environment should be taken into account + # we only want to use first N episodes (we don't want to bias ourselves with short episodes) + total_envs = self.cfg.num_workers * self.cfg.num_envs_per_worker + self.max_episode_number = self.cfg.sample_env_episodes / total_envs + self.env_info = env_info self.iteration: int = 0 @@ -52,7 +57,7 @@ def __init__(self, cfg: Config, env_info: EnvInfo): self.stats = dict() # regular (non-averaged) stats self.avg_stats = dict() - self.policy_avg_stats: Dict[str, List[Deque]] = dict() + self.policy_avg_stats: Dict[str, List[List]] = dict() # global msg handlers for messages from algo components self.msg_handlers: Dict[str, List[MsgHandler]] = { @@ -136,18 +141,22 @@ def _episodic_stats_handler(runner: Runner, msg: Dict, policy_id: PolicyID) -> N # the only difference between this function and the one from `Runner` # is that we store all stats in a list and not in the deque s = msg[EPISODIC] - for _, key, value in iterate_recursively(s): - if key not in runner.policy_avg_stats: - runner.policy_avg_stats[key] = [[] for _ in range(runner.cfg.num_policies)] - if isinstance(value, np.ndarray) and value.ndim > 0: - # if len(value) > runner.policy_avg_stats[key][policy_id].maxlen: - # # increase maxlen to make sure we never ignore any stats from the environments - # runner.policy_avg_stats[key][policy_id] = deque(maxlen=len(value)) + # skip invalid stats, potentially be not setting episode_number one could always add stats + episode_number = s["episode_extra_stats"].get("episode_number", 0) + if episode_number < runner.max_episode_number: + for _, key, value in iterate_recursively(s): + if key not in runner.policy_avg_stats: + runner.policy_avg_stats[key] = [[] for _ in range(runner.cfg.num_policies)] - runner.policy_avg_stats[key][policy_id].extend(value) - else: - runner.policy_avg_stats[key][policy_id].append(value) + if isinstance(value, np.ndarray) and value.ndim > 0: + # if len(value) > runner.policy_avg_stats[key][policy_id].maxlen: + # # increase maxlen to make sure we never ignore any stats from the environments + # runner.policy_avg_stats[key][policy_id] = deque(maxlen=len(value)) + + runner.policy_avg_stats[key][policy_id].extend(value) + else: + runner.policy_avg_stats[key][policy_id].append(value) def wait_until_ready(self): while not self.ready: @@ -267,6 +276,13 @@ def eval_stats(self): # those could be added here return self.sampling_loop.policy_avg_stats + @property + def eval_episodes_sampled(self): + # TODO: for now we only look at the first policy, + # maybe even in MARL we will look only at first policy? + policy_id = 0 + return len(self.eval_stats.get("reward", [[] for _ in range(self.cfg.num_policies)])[policy_id]) + def start(self, init_model_data: Optional[Dict[PolicyID, InitModelData]] = None): if init_model_data is None: init_model_data = self.init_model_data diff --git a/sample_factory/eval.py b/sample_factory/eval.py index cb12696a7..b1ad19c14 100644 --- a/sample_factory/eval.py +++ b/sample_factory/eval.py @@ -74,10 +74,6 @@ def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: i sampler.init() sampler.start() - # we override batch size to be the same as total number of environments - total_envs = cfg.num_workers * cfg.num_envs_per_worker - max_episode_number = sample_env_episodes / total_envs - print_interval_sec = 1.0 fps_stats = deque([(time.time(), 0, 0)], maxlen=10) episodes_sampled = 0 @@ -90,15 +86,7 @@ def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: i if trajectory is None: break - episode_numbers = sampler.eval_stats.get("episode_number", [[] for _ in range(cfg.num_policies)]) - # TODO: for now we only look at the first policy, - # maybe even in MARL we will look only at first policy? - policy_id = 0 - episode_numbers = np.array(episode_numbers[policy_id]) - # we ignore some of the episodes because we only want to look at first N episodes - # to enforce it we use wrapper with counts number of episodes for each env - valid = episode_numbers < max_episode_number - episodes_sampled = valid.sum() + episodes_sampled = sampler.eval_episodes_sampled env_steps_sampled += samples_per_trajectory(trajectory) if time.time() - last_print > print_interval_sec: From fba1af21d4de88bf2314d0dd322c18a451a305bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Thu, 14 Dec 2023 07:03:41 +0100 Subject: [PATCH 08/18] remove returning the trajectories in eval sampling API since we don't need those, makes the code even faster --- .../algo/sampling/evaluation_sampling_api.py | 41 ++++++------------- sample_factory/eval.py | 13 +++--- 2 files changed, 18 insertions(+), 36 deletions(-) diff --git a/sample_factory/algo/sampling/evaluation_sampling_api.py b/sample_factory/algo/sampling/evaluation_sampling_api.py index 86220f7b9..19bf1c19b 100644 --- a/sample_factory/algo/sampling/evaluation_sampling_api.py +++ b/sample_factory/algo/sampling/evaluation_sampling_api.py @@ -16,8 +16,9 @@ from sample_factory.algo.utils.env_info import EnvInfo from sample_factory.algo.utils.misc import EPISODIC, SAMPLES_COLLECTED, STATS_KEY, TIMING_STATS, ExperimentStatus from sample_factory.algo.utils.model_sharing import ParameterServer +from sample_factory.algo.utils.rl_utils import samples_per_trajectory from sample_factory.algo.utils.shared_buffers import BufferMgr -from sample_factory.algo.utils.tensor_dict import TensorDict, clone_tensordict +from sample_factory.algo.utils.tensor_dict import TensorDict from sample_factory.cfg.configurable import Configurable from sample_factory.utils.dicts import iterate_recursively from sample_factory.utils.gpu_utils import set_global_cuda_envvars @@ -242,7 +243,7 @@ def __init__( self.learners: Dict[PolicyID, Learner] = None self.sampling_loop: SamplingLoop = None - self.traj_queue: Queue = Queue(maxsize=100) + self.total_samples = 0 def init(self): set_global_cuda_envvars(self.cfg) @@ -277,11 +278,14 @@ def eval_stats(self): return self.sampling_loop.policy_avg_stats @property - def eval_episodes_sampled(self): - # TODO: for now we only look at the first policy, - # maybe even in MARL we will look only at first policy? - policy_id = 0 - return len(self.eval_stats.get("reward", [[] for _ in range(self.cfg.num_policies)])[policy_id]) + def eval_episodes(self): + return self.eval_stats.get("episode_number", [[] for _ in range(self.cfg.num_policies)]) + + @property + def eval_env_steps(self): + # return number of env steps for each policy + episode_lens = self.eval_stats.get("len", [[] for _ in range(self.cfg.num_policies)]) + return [sum(episode_lens[policy_id]) for policy_id in range(self.cfg.num_policies)] def start(self, init_model_data: Optional[Dict[PolicyID, InitModelData]] = None): if init_model_data is None: @@ -289,31 +293,12 @@ def start(self, init_model_data: Optional[Dict[PolicyID, InitModelData]] = None) self.sampling_loop.start(init_model_data) def _on_new_trajectories(self, traj: TensorDict, traj_buffer_indices: Iterable[int | slice], device: str): - traj_clone = clone_tensordict(traj) # we copied the data so we can release the buffer + self.total_samples += samples_per_trajectory(traj) # just release buffers after every trajectory - # we could alternatively have more sophisticated logic here, see i.e. batcher.py + # we could alternatively have more sophisticated logic here, see i.e. batcher.py or simplified_sampling_api.py self.sampling_loop.yield_trajectory_buffers(traj_buffer_indices, device) - while not self.sampling_loop.stopped: - try: - self.traj_queue.put(traj_clone, timeout=1.0, block=True) - break - except Full: - log.debug(f"{self._on_new_trajectories.__name__}: trajectory queue full, waiting...") - self.sampling_loop.event_loop.process_events() - - def get_trajectories_sync(self) -> Optional[TensorDict]: - while not self.sampling_loop.stopped: - try: - traj = self.traj_queue.get(timeout=5.0) - return traj - except Empty: - log.debug(f"{self.get_trajectories_sync.__name__}(): waiting for trajectories...") - continue - - return None - def stop(self) -> StatusCode: self.sampling_loop.stop_sampling() self.sampling_thread.join() diff --git a/sample_factory/eval.py b/sample_factory/eval.py index b1ad19c14..5c06a6421 100644 --- a/sample_factory/eval.py +++ b/sample_factory/eval.py @@ -10,7 +10,6 @@ from sample_factory.algo.sampling.evaluation_sampling_api import EvalSamplingAPI from sample_factory.algo.utils.env_info import EnvInfo, obtain_env_info_in_a_separate_process -from sample_factory.algo.utils.rl_utils import samples_per_trajectory from sample_factory.utils.typing import Config from sample_factory.utils.utils import experiment_dir, log @@ -82,14 +81,12 @@ def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: i while episodes_sampled < sample_env_episodes: try: - trajectory = sampler.get_trajectories_sync() - if trajectory is None: - break - - episodes_sampled = sampler.eval_episodes_sampled - env_steps_sampled += samples_per_trajectory(trajectory) - if time.time() - last_print > print_interval_sec: + # TODO: for now we only look at the first policy, + policy_id = 0 + episodes_sampled = len(sampler.eval_episodes[policy_id]) + env_steps_sampled = sampler.total_samples + fps_stats.append((time.time(), episodes_sampled, env_steps_sampled)) _print_fps_stats(cfg, fps_stats) last_print = time.time() From f66c1d6e5d3d01bcd957e66aedf63fac1887209b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Thu, 14 Dec 2023 13:37:13 +0100 Subject: [PATCH 09/18] add csv_folder_name --- sample_factory/cfg/cfg.py | 6 ++++++ sample_factory/eval.py | 10 ++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/sample_factory/cfg/cfg.py b/sample_factory/cfg/cfg.py index 541053662..725ab0a42 100644 --- a/sample_factory/cfg/cfg.py +++ b/sample_factory/cfg/cfg.py @@ -706,6 +706,12 @@ def add_eval_args(parser): type=str, help="Module name used to run training script. Used to generate HF model card", ) + parser.add_argument( + "--csv_folder_name", + default=None, + type=str, + help="Path where the evaluation csv will be stored.", + ) def add_wandb_args(p: ArgumentParser): diff --git a/sample_factory/eval.py b/sample_factory/eval.py index 5c06a6421..8001dddf5 100644 --- a/sample_factory/eval.py +++ b/sample_factory/eval.py @@ -1,7 +1,7 @@ import json import time from collections import deque -from os.path import join +from pathlib import Path from typing import Deque import numpy as np @@ -64,8 +64,14 @@ def _save_eval_results(cfg, eval_stats): for key, stat in eval_stats.items(): data[key] = stat[policy_id] + csv_output_dir = Path(experiment_dir(cfg=cfg)) + if cfg.csv_folder_name is not None: + csv_output_dir = csv_output_dir / cfg.csv_folder_name + csv_output_dir.mkdir(exist_ok=True, parents=True) + csv_output_path = csv_output_dir / f"eval_p{policy_id}.csv" + data = pd.DataFrame(data) - data.to_csv(join(experiment_dir(cfg=cfg), f"eval_p{policy_id}_e{cfg.sample_env_episodes}.csv")) + data.to_csv(csv_output_path) def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: int = 1024) -> StatusCode: From 257270d07696be9b7090dee151e756b181b2ae5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Thu, 14 Dec 2023 13:44:42 +0100 Subject: [PATCH 10/18] pre-commit --- sample_factory/algo/sampling/evaluation_sampling_api.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sample_factory/algo/sampling/evaluation_sampling_api.py b/sample_factory/algo/sampling/evaluation_sampling_api.py index 19bf1c19b..22cb2ecbe 100644 --- a/sample_factory/algo/sampling/evaluation_sampling_api.py +++ b/sample_factory/algo/sampling/evaluation_sampling_api.py @@ -2,9 +2,8 @@ import time from collections import OrderedDict -from queue import Empty, Full, Queue from threading import Thread -from typing import Callable, Deque, Dict, Iterable, List, Optional, Tuple +from typing import Callable, Dict, Iterable, List, Optional, Tuple import numpy as np from signal_slot.signal_slot import EventLoop, EventLoopObject, EventLoopStatus, signal From a31b4d1cc5c6817af3fc522eeac6dd3a895e7f0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Thu, 14 Dec 2023 14:10:58 +0100 Subject: [PATCH 11/18] move sample_eval_episodes to global cfg --- sample_factory/cfg/cfg.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sample_factory/cfg/cfg.py b/sample_factory/cfg/cfg.py index 725ab0a42..ae8e7a30b 100644 --- a/sample_factory/cfg/cfg.py +++ b/sample_factory/cfg/cfg.py @@ -706,6 +706,12 @@ def add_eval_args(parser): type=str, help="Module name used to run training script. Used to generate HF model card", ) + parser.add_argument( + "--sample_env_episodes", + default=256, + type=int, + help="Determines how many episodes will be sampled in eval.", + ) parser.add_argument( "--csv_folder_name", default=None, From 5b88c5459926f44ae9c5cead8d23a452e1ea58a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Thu, 14 Dec 2023 14:51:16 +0100 Subject: [PATCH 12/18] fix pickling issue --- sample_factory/algo/sampling/evaluation_sampling_api.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sample_factory/algo/sampling/evaluation_sampling_api.py b/sample_factory/algo/sampling/evaluation_sampling_api.py index 22cb2ecbe..cf8c0b11d 100644 --- a/sample_factory/algo/sampling/evaluation_sampling_api.py +++ b/sample_factory/algo/sampling/evaluation_sampling_api.py @@ -263,7 +263,9 @@ def init(self): self.init_model_data[policy_id] = self.learners[policy_id].init() self.sampling_loop: SamplingLoop = SamplingLoop(self.cfg, self.env_info) - self.sampling_loop.init(self.buffer_mgr, self.param_servers) + # don't pass self.param_servers here, learners are normally initialized later + # TODO: fix above issue + self.sampling_loop.init(self.buffer_mgr) self.sampling_loop.set_new_trajectory_callback(self._on_new_trajectories) self.sampling_thread: Thread = Thread(target=self.sampling_loop.run) self.sampling_thread.start() From f3e3c1d943c5a3a594d2e83c8cad0a4816abd4a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Thu, 14 Dec 2023 14:51:40 +0100 Subject: [PATCH 13/18] turn off decorrelation be default in eval --- sample_factory/eval.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sample_factory/eval.py b/sample_factory/eval.py index 8001dddf5..6cf7f86c5 100644 --- a/sample_factory/eval.py +++ b/sample_factory/eval.py @@ -112,5 +112,7 @@ def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: i def eval(cfg: Config) -> StatusCode: # should always be set to True for this script cfg.episode_counter = True + # decorrelation isn't needed in eval, it only slows us down + cfg.decorrelate_envs_on_one_worker = False env_info = obtain_env_info_in_a_separate_process(cfg) return generate_trajectories(cfg, env_info, cfg.sample_env_episodes) From 599da2e555eafeeff55027400eb61e3cd5265389 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Thu, 14 Dec 2023 15:00:23 +0100 Subject: [PATCH 14/18] improve logging --- .../algo/sampling/evaluation_sampling_api.py | 11 +++++------ sample_factory/eval.py | 4 ++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/sample_factory/algo/sampling/evaluation_sampling_api.py b/sample_factory/algo/sampling/evaluation_sampling_api.py index cf8c0b11d..77ec2a7e5 100644 --- a/sample_factory/algo/sampling/evaluation_sampling_api.py +++ b/sample_factory/algo/sampling/evaluation_sampling_api.py @@ -138,22 +138,21 @@ def _process_msg(self, msgs): @staticmethod def _episodic_stats_handler(runner: Runner, msg: Dict, policy_id: PolicyID) -> None: - # the only difference between this function and the one from `Runner` - # is that we store all stats in a list and not in the deque + # heavily based on the `_episodic_stats_handler` from `Runner` s = msg[EPISODIC] # skip invalid stats, potentially be not setting episode_number one could always add stats episode_number = s["episode_extra_stats"].get("episode_number", 0) if episode_number < runner.max_episode_number: + log.debug( + f"Episode ended after {s['len']:.1f} steps. Return: {s['reward']:.1f}. True objective {s['true_objective']:.1f}" + ) + for _, key, value in iterate_recursively(s): if key not in runner.policy_avg_stats: runner.policy_avg_stats[key] = [[] for _ in range(runner.cfg.num_policies)] if isinstance(value, np.ndarray) and value.ndim > 0: - # if len(value) > runner.policy_avg_stats[key][policy_id].maxlen: - # # increase maxlen to make sure we never ignore any stats from the environments - # runner.policy_avg_stats[key][policy_id] = deque(maxlen=len(value)) - runner.policy_avg_stats[key][policy_id].extend(value) else: runner.policy_avg_stats[key][policy_id].append(value) diff --git a/sample_factory/eval.py b/sample_factory/eval.py index 6cf7f86c5..44e162d7f 100644 --- a/sample_factory/eval.py +++ b/sample_factory/eval.py @@ -22,7 +22,7 @@ def _print_fps_stats(cfg: Config, fps_stats: Deque): fps = delta_sampled / delta_time fps_frameskip = fps * cfg.env_frameskip fps_frameskip_str = f" ({fps_frameskip:.1f} FPS with frameskip)" if cfg.env_frameskip > 1 else "" - log.debug( + log.info( f"Episodes collected: {episodes_sampled}, Samples collected: {env_steps_sampled}, throughput: {fps:.1f} FPS{fps_frameskip_str}" ) @@ -55,7 +55,7 @@ def _print_eval_summaries(cfg, eval_stats): results[min_tag] = float(min(stat[policy_id])) results[max_tag] = float(max(stat[policy_id])) - print(json.dumps(results, indent=4)) + log.info(json.dumps(results, indent=4)) def _save_eval_results(cfg, eval_stats): From 12aa03c9618205f48f84fd1277feac9ffcc291a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Thu, 14 Dec 2023 15:11:03 +0100 Subject: [PATCH 15/18] add eval mujoco for testing --- sf_examples/mujoco/eval_mujoco.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 sf_examples/mujoco/eval_mujoco.py diff --git a/sf_examples/mujoco/eval_mujoco.py b/sf_examples/mujoco/eval_mujoco.py new file mode 100644 index 000000000..2325e3d37 --- /dev/null +++ b/sf_examples/mujoco/eval_mujoco.py @@ -0,0 +1,27 @@ +import sys + +from sample_factory.cfg.arguments import checkpoint_override_defaults, parse_full_cfg, parse_sf_args +from sample_factory.eval import eval +from sf_examples.mujoco.mujoco_params import add_mujoco_env_args, mujoco_override_defaults +from sf_examples.mujoco.train_mujoco import register_mujoco_components + + +def main(): + """Script entry point.""" + register_mujoco_components() + parser, cfg = parse_sf_args(evaluation=True) + add_mujoco_env_args(cfg.env, parser) + mujoco_override_defaults(cfg.env, parser) + + # important, instead of `load_from_checkpoint` as in enjoy we want + # to override it here to be able to use argv arguments + checkpoint_override_defaults(cfg, parser) + + cfg = parse_full_cfg(parser) + + status = eval(cfg) + return status + + +if __name__ == "__main__": + sys.exit(main()) From 07b5a33f07d41883a086ed9478f675df22fa1beb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Cupia=C5=82?= Date: Thu, 14 Dec 2023 15:45:03 +0100 Subject: [PATCH 16/18] add example usage of perf_eval to readme --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 889b70d0f..cf102c901 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,12 @@ Stop the experiment (Ctrl+C) when the desired performance is reached and then ev python -m sf_examples.mujoco.enjoy_mujoco --env=mujoco_ant --experiment=Ant --train_dir=./train_dir ``` +Or alternativaly use faster evaluation script, which doesn't support rendering, but computes some stats. It's best that `sample_env_episodes` <= `num_workers` * `num_envs_per_worker`. + +```bash +python -m sf_examples.mujoco.eval_mujoco --env=mujoco_ant --experiment=Ant --train_dir=./train_dir --sample_env_episodes=128 --num_workers=16, --num_envs_per_worker=2 +``` + Do the same in a pixel-based VizDoom environment (might need to run `pip install sample-factory[vizdoom]`, please also see docs for VizDoom-specific instructions): ```bash From 4ef740161d7187799a7bbd2b03768bd9114efa81 Mon Sep 17 00:00:00 2001 From: A K Date: Wed, 27 Dec 2023 03:23:32 -0800 Subject: [PATCH 17/18] Mujoco troubles --- docs/09-environment-integrations/mujoco.md | 12 ++++++++++-- setup.py | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/docs/09-environment-integrations/mujoco.md b/docs/09-environment-integrations/mujoco.md index 6801c2205..5290ce05b 100644 --- a/docs/09-environment-integrations/mujoco.md +++ b/docs/09-environment-integrations/mujoco.md @@ -17,13 +17,21 @@ that we can achieve even faster training on a multi-core machine with more optim To train a model in the `Ant-v4` environment: ``` -python -m sf_examples.mujoco.train_mujoco --algo=APPO --env=mujoco_ant --experiment= +python -m sf_examples.mujoco.train_mujoco --env=mujoco_ant --experiment= ``` To visualize the training results, use the `enjoy_mujoco` script: ``` -python -m sf_examples.mujoco.enjoy_mujoco --algo=APPO --env=mujoco_ant --experiment= +python -m sf_examples.mujoco.enjoy_mujoco --env=mujoco_ant --experiment= +``` + +If you're having issues with the Mujoco viewer in a Unix/Linux environment with Conda, try running the following +before executing the `enjoy_mujoco` script: + +``` +export LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libstdc++.so.6 +python -m sf_examples.mujoco.enjoy_mujoco ... ``` Multiple experiments can be run in parallel with the launcher module. `mujoco_all_envs` is an example launcher script that runs all mujoco envs with 10 seeds. diff --git a/setup.py b/setup.py index 77e3ab12c..575a395f3 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ _atari_deps = ["gymnasium[atari, accept-rom-license]"] -_mujoco_deps = ["gymnasium[mujoco]", "mujoco<=2.3.3"] +_mujoco_deps = ["gymnasium[mujoco]", "mujoco<2.5"] _envpool_deps = ["envpool"] _docs_deps = [ From a5fde394f28035efd33f8f9d8e2a2fe6ae7fe473 Mon Sep 17 00:00:00 2001 From: A K Date: Wed, 27 Dec 2023 03:37:40 -0800 Subject: [PATCH 18/18] Minor fixes, added missing dependencies, fixed some PEP complaints --- README.md | 7 ++----- sample_factory/cfg/arguments.py | 2 ++ sample_factory/eval.py | 5 +++-- setup.py | 1 + sf_examples/mujoco/{eval_mujoco.py => fast_eval_mujoco.py} | 4 ++-- 5 files changed, 10 insertions(+), 9 deletions(-) rename sf_examples/mujoco/{eval_mujoco.py => fast_eval_mujoco.py} (92%) diff --git a/README.md b/README.md index c0859b339..786bedbc8 100644 --- a/README.md +++ b/README.md @@ -88,12 +88,9 @@ Stop the experiment (Ctrl+C) when the desired performance is reached and then ev ```bash python -m sf_examples.mujoco.enjoy_mujoco --env=mujoco_ant --experiment=Ant --train_dir=./train_dir -``` - -Or alternativaly use faster evaluation script, which doesn't support rendering, but computes some stats. It's best that `sample_env_episodes` <= `num_workers` * `num_envs_per_worker`. -```bash -python -m sf_examples.mujoco.eval_mujoco --env=mujoco_ant --experiment=Ant --train_dir=./train_dir --sample_env_episodes=128 --num_workers=16, --num_envs_per_worker=2 +# Or use an alternative eval script, no rendering but much faster! (use `sample_env_episodes` >= `num_workers` * `num_envs_per_worker`). +python -m sf_examples.mujoco.fast_eval_mujoco --env=mujoco_ant --experiment=Ant --train_dir=./train_dir --sample_env_episodes=128 --num_workers=16 --num_envs_per_worker=2 ``` Do the same in a pixel-based VizDoom environment (might need to run `pip install sample-factory[vizdoom]`, please also see docs for VizDoom-specific instructions): diff --git a/sample_factory/cfg/arguments.py b/sample_factory/cfg/arguments.py index e1930db31..a15f0444e 100644 --- a/sample_factory/cfg/arguments.py +++ b/sample_factory/cfg/arguments.py @@ -297,3 +297,5 @@ def checkpoint_override_defaults(cfg: Config, parser) -> AttrDict: loaded_cfg = AttrDict(json_params) parser.set_defaults(**loaded_cfg) + + return loaded_cfg diff --git a/sample_factory/eval.py b/sample_factory/eval.py index 44e162d7f..3d9a7268b 100644 --- a/sample_factory/eval.py +++ b/sample_factory/eval.py @@ -82,7 +82,6 @@ def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: i print_interval_sec = 1.0 fps_stats = deque([(time.time(), 0, 0)], maxlen=10) episodes_sampled = 0 - env_steps_sampled = 0 last_print = time.time() while episodes_sampled < sample_env_episodes: @@ -96,6 +95,8 @@ def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: i fps_stats.append((time.time(), episodes_sampled, env_steps_sampled)) _print_fps_stats(cfg, fps_stats) last_print = time.time() + + log.info(f"Progress: {episodes_sampled}/{sample_env_episodes} episodes sampled") except KeyboardInterrupt: log.info(f"KeyboardInterrupt in {generate_trajectories.__name__}()") break @@ -109,7 +110,7 @@ def generate_trajectories(cfg: Config, env_info: EnvInfo, sample_env_episodes: i return status -def eval(cfg: Config) -> StatusCode: +def do_eval(cfg: Config) -> StatusCode: # should always be set to True for this script cfg.episode_counter = True # decorrelation isn't needed in eval, it only slows us down diff --git a/setup.py b/setup.py index 575a395f3..46254b904 100644 --- a/setup.py +++ b/setup.py @@ -56,6 +56,7 @@ "opencv-python", "wandb>=0.12.9", "huggingface-hub>=0.10.0,<1.0", + "pandas", ], extras_require={ # some tests require Atari and Mujoco so let's make sure dev environment has that diff --git a/sf_examples/mujoco/eval_mujoco.py b/sf_examples/mujoco/fast_eval_mujoco.py similarity index 92% rename from sf_examples/mujoco/eval_mujoco.py rename to sf_examples/mujoco/fast_eval_mujoco.py index 2325e3d37..d0cc8abfd 100644 --- a/sf_examples/mujoco/eval_mujoco.py +++ b/sf_examples/mujoco/fast_eval_mujoco.py @@ -1,7 +1,7 @@ import sys from sample_factory.cfg.arguments import checkpoint_override_defaults, parse_full_cfg, parse_sf_args -from sample_factory.eval import eval +from sample_factory.eval import do_eval from sf_examples.mujoco.mujoco_params import add_mujoco_env_args, mujoco_override_defaults from sf_examples.mujoco.train_mujoco import register_mujoco_components @@ -19,7 +19,7 @@ def main(): cfg = parse_full_cfg(parser) - status = eval(cfg) + status = do_eval(cfg) return status