diff --git a/.gitignore b/.gitignore
index b63ec90f2..d03a1f8db 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,8 +16,7 @@ cov_html/*
dev/*
results/*
temp/*
-client_data/*
-remote_data/*
+rlberry_data/*
# Byte-compiled / optimized / DLL files
__pycache__/
diff --git a/CITATION.cff b/CITATION.cff
index 527a9b76e..f05179896 100644
--- a/CITATION.cff
+++ b/CITATION.cff
@@ -16,7 +16,7 @@ authors:
title: "rlberry - A Reinforcement Learning Library for Research and Education"
abbreviation: rlberry
-version: 0.2
-doi: 10.5281/zenodo.5544540
+version: 0.2.1
+doi: 10.5281/zenodo.5223307
date-released: 2021-10-01
url: "https://github.com/rlberry-py/rlberry"
\ No newline at end of file
diff --git a/README.md b/README.md
index eb21e1a9f..ba66faf50 100644
--- a/README.md
+++ b/README.md
@@ -25,9 +25,9 @@
-
+
diff --git a/examples/demo_adaptiveql.py b/examples/demo_adaptiveql.py
index e761e2a43..145102d23 100644
--- a/examples/demo_adaptiveql.py
+++ b/examples/demo_adaptiveql.py
@@ -57,7 +57,7 @@
preprocess_func=np.cumsum, title='Cumulative Rewards')
for stats in multimanagers.managers:
- agent = stats.agent_handlers[0]
+ agent = stats.get_agent_instances()[0]
try:
agent.Qtree.plot(0, 25)
except AttributeError:
diff --git a/examples/demo_agent_manager.py b/examples/demo_agent_manager.py
index 526b7fda7..88fb46855 100644
--- a/examples/demo_agent_manager.py
+++ b/examples/demo_agent_manager.py
@@ -57,7 +57,12 @@
init_kwargs=params,
eval_kwargs=eval_kwargs,
n_fit=4,
- seed=123)
+ seed=123,
+ enable_tensorboard=True,
+ default_writer_kwargs=dict(
+ maxlen=N_EPISODES - 10,
+ log_interval=5.0,
+ ))
rskernel_stats = AgentManager(
RSKernelUCBVIAgent,
train_env,
@@ -65,7 +70,8 @@
init_kwargs=params_kernel,
eval_kwargs=eval_kwargs,
n_fit=4,
- seed=123)
+ seed=123,
+ enable_tensorboard=True)
a2c_stats = AgentManager(
A2CAgent,
train_env,
@@ -81,16 +87,26 @@
for st in agent_manager_list:
st.fit()
+ # Fit RSUCBVI for 50 more episodes
+ rsucbvi_stats.fit(budget=50)
+
# learning curves
plot_writer_data(agent_manager_list,
tag='episode_rewards',
preprocess_func=np.cumsum,
title='cumulative rewards',
show=False)
+
+ plot_writer_data(agent_manager_list,
+ tag='episode_rewards',
+ title='episode rewards',
+ show=False)
# compare final policies
output = evaluate_agents(agent_manager_list)
+
print(output)
- for st in agent_manager_list:
- st.clear_output_dir()
+ # uncomment to delete output directories
+ # for st in agent_manager_list:
+ # st.clear_output_dir()
diff --git a/examples/demo_dqn.py b/examples/demo_dqn.py
index 771d75ac7..caed348b8 100644
--- a/examples/demo_dqn.py
+++ b/examples/demo_dqn.py
@@ -12,8 +12,8 @@
agent.set_writer(SummaryWriter())
print(f"Running DQN on {env}")
-print(f"Visualize with tensorboard by \
-running:\n$tensorboard --logdir {Path(agent.writer.log_dir).parent}")
+print("Visualize with tensorboard by "
+ f"running:\n$tensorboard --logdir {Path(agent.writer.log_dir).parent}")
agent.fit(budget=50)
diff --git a/examples/demo_experiment/run.py b/examples/demo_experiment/run.py
index 5cc99156e..1ee513d0a 100644
--- a/examples/demo_experiment/run.py
+++ b/examples/demo_experiment/run.py
@@ -26,4 +26,9 @@
del multimanagers
data = load_experiment_results('results', 'params_experiment')
+
print(data)
+
+ # Fit one of the managers for a few more episodes
+ # If tensorboard is enabled, you should see more episodes ran for 'rsucbvi_alternative'
+ data['manager']['rsucbvi_alternative'].fit(50)
diff --git a/examples/demo_hyperparam_optim.py b/examples/demo_hyperparam_optim.py
index 58e8a4082..2d22bfa51 100644
--- a/examples/demo_hyperparam_optim.py
+++ b/examples/demo_hyperparam_optim.py
@@ -1,5 +1,5 @@
from rlberry.envs.benchmarks.ball_exploration import PBall2D
-from rlberry.agents.torch.ppo import PPOAgent
+from rlberry.agents.torch import REINFORCEAgent
from rlberry.manager import AgentManager
if __name__ == '__main__':
@@ -11,54 +11,54 @@
# -----------------------------
# Parameters
# -----------------------------
- N_EPISODES = 100
+ N_EPISODES = 10
GAMMA = 0.99
HORIZON = 50
BONUS_SCALE_FACTOR = 0.1
MIN_DIST = 0.1
- params_ppo = {"gamma": GAMMA,
- "horizon": HORIZON,
- "learning_rate": 0.0003}
+ params = {"gamma": GAMMA,
+ "horizon": HORIZON,
+ "learning_rate": 0.0003}
eval_kwargs = dict(eval_horizon=HORIZON, n_simulations=20)
# -------------------------------
# Run AgentManager and save results
# --------------------------------
- ppo_stats = AgentManager(
- PPOAgent, train_env, fit_budget=N_EPISODES,
- init_kwargs=params_ppo,
+ manager = AgentManager(
+ REINFORCEAgent, train_env, fit_budget=N_EPISODES,
+ init_kwargs=params,
eval_kwargs=eval_kwargs,
- n_fit=4,
- output_dir='dev/')
+ n_fit=4)
# hyperparam optim with multiple threads
- ppo_stats.optimize_hyperparams(
+ manager.optimize_hyperparams(
n_trials=5, timeout=None,
n_fit=2,
sampler_method='optuna_default',
optuna_parallelization='thread')
- initial_n_trials = len(ppo_stats.optuna_study.trials)
+ initial_n_trials = len(manager.optuna_study.trials)
# save
- ppo_stats_fname = ppo_stats.save()
- del ppo_stats
+ manager_fname = manager.save()
+ del manager
# load
- ppo_stats = AgentManager.load(ppo_stats_fname)
+ manager = AgentManager.load(manager_fname)
# continue previous optimization, now with 120s of timeout and multiprocessing
- ppo_stats.optimize_hyperparams(
+ manager.optimize_hyperparams(
n_trials=512, timeout=120,
- n_fit=2,
+ n_fit=8,
continue_previous=True,
- optuna_parallelization='process')
+ optuna_parallelization='process',
+ n_optuna_workers=4)
print("number of initial trials = ", initial_n_trials)
- print("number of trials after continuing= ", len(ppo_stats.optuna_study.trials))
+ print("number of trials after continuing= ", len(manager.optuna_study.trials))
print("----")
print("fitting agents after choosing hyperparams...")
- ppo_stats.fit() # fit the 4 agents
+ manager.fit() # fit the 4 agents
diff --git a/examples/demo_network/run_remote_manager.py b/examples/demo_network/run_remote_manager.py
index 530766f21..71a395945 100644
--- a/examples/demo_network/run_remote_manager.py
+++ b/examples/demo_network/run_remote_manager.py
@@ -14,7 +14,7 @@
port = int(input("Select server port: "))
client = BerryClient(port=port)
- FIT_BUDGET = 1000
+ FIT_BUDGET = 500
local_manager = AgentManager(
agent_class=REINFORCEAgent,
@@ -35,10 +35,11 @@
fit_budget=FIT_BUDGET,
init_kwargs=dict(gamma=0.99),
eval_kwargs=dict(eval_horizon=200, n_simulations=20),
- n_fit=2,
+ n_fit=3,
seed=10,
agent_name='REINFORCE(remote)',
- parallelization='process'
+ parallelization='process',
+ enable_tensorboard=True,
)
remote_manager.set_writer(
@@ -48,7 +49,7 @@
)
# Optimize hyperparams of remote agent
- best_params = remote_manager.optimize_hyperparams(timeout=120, optuna_parallelization='process')
+ best_params = remote_manager.optimize_hyperparams(timeout=60, optuna_parallelization='process')
print(f'best params = {best_params}')
# Test save/load
@@ -62,6 +63,9 @@
mmanagers.append(remote_manager)
mmanagers.run()
+ # Fit remotely for a few more episodes
+ remote_manager.fit(budget=100)
+
# plot
plot_writer_data(mmanagers.managers, tag='episode_rewards', show=False)
evaluate_agents(mmanagers.managers, n_simulations=10, show=True)
@@ -69,6 +73,7 @@
# Test some methods
print([manager.eval_agents() for manager in mmanagers.managers])
- for manager in mmanagers.managers:
- manager.clear_handlers()
- manager.clear_output_dir()
+ # # uncomment to clear output files
+ # for manager in mmanagers.managers:
+ # manager.clear_handlers()
+ # manager.clear_output_dir()
diff --git a/rlberry/agents/adaptiveql/adaptiveql.py b/rlberry/agents/adaptiveql/adaptiveql.py
index ce4b92159..83b97f824 100644
--- a/rlberry/agents/adaptiveql/adaptiveql.py
+++ b/rlberry/agents/adaptiveql/adaptiveql.py
@@ -2,7 +2,6 @@
import gym.spaces as spaces
import numpy as np
from rlberry.agents import AgentWithSimplePolicy
-from rlberry.utils.writers import DefaultWriter
from rlberry.agents.adaptiveql.tree import MDPTreePartition
logger = logging.getLogger(__name__)
@@ -85,9 +84,6 @@ def reset(self):
# info
self.episode = 0
- # default writer
- self.writer = DefaultWriter(self.name, metadata=self._metadata)
-
def policy(self, observation):
action, _ = self.Qtree.get_argmax_and_node(observation, 0)
return action
diff --git a/rlberry/agents/agent.py b/rlberry/agents/agent.py
index d542c8a0b..b26699223 100644
--- a/rlberry/agents/agent.py
+++ b/rlberry/agents/agent.py
@@ -5,11 +5,14 @@
import numpy as np
from inspect import signature
from pathlib import Path
+from rlberry import metadata_utils
from rlberry import types
from rlberry.seeding.seeder import Seeder
from rlberry.seeding import safe_reseed
from rlberry.envs.utils import process_env
-from typing import Any, Optional, Mapping
+from rlberry.utils.writers import DefaultWriter
+from typing import Optional
+
logger = logging.getLogger(__name__)
@@ -27,8 +30,10 @@ class Agent(ABC):
If true, makes a deep copy of the environment.
seeder : rlberry.seeding.Seeder, int, or None
Object for random number generation.
- _metadata : dict
- Extra information (e.g. about which is the process id where the agent is running).
+ _execution_metadata : ExecutionMetadata (optional)
+ Extra information about agent execution (e.g. about which is the process id where the agent is running).
+ _default_writer_kwargs : dict (optional)
+ Parameters to initialize DefaultWriter (attribute self.writer).
.. note::
Classes that implement this interface should send ``**kwargs`` to :code:`Agent.__init__()`
@@ -45,6 +50,11 @@ class Agent(ABC):
Writer object (e.g. tensorboard SummaryWriter).
seeder : rlberry.seeding.Seeder, int, or None
Object for random number generation.
+ output_dir : str or Path
+ Directory that the agent can use to store data.
+ unique_id : str
+ Unique identifier for the agent instance. Can be used, for example,
+ to create files/directories for the agent to log data safely.
"""
name = ""
@@ -54,7 +64,9 @@ def __init__(self,
eval_env: Optional[types.Env] = None,
copy_env: bool = True,
seeder: Optional[types.Seed] = None,
- _metadata: Optional[Mapping[str, Any]] = None,
+ output_dir: Optional[str] = None,
+ _execution_metadata: Optional[metadata_utils.ExecutionMetadata] = None,
+ _default_writer_kwargs: Optional[dict] = None,
**kwargs):
# Check if wrong parameters have been sent to an agent.
assert kwargs == {}, \
@@ -62,14 +74,37 @@ def __init__(self,
self.seeder = Seeder(seeder)
self.env = process_env(env, self.seeder, copy_env=copy_env)
- self.writer = None
# evaluation environment
eval_env = eval_env or env
self.eval_env = process_env(eval_env, self.seeder, copy_env=True)
# metadata
- self._metadata = _metadata or dict()
+ self._execution_metadata = _execution_metadata or metadata_utils.ExecutionMetadata()
+ self._unique_id = metadata_utils.get_unique_id(self)
+ if self.name:
+ self._unique_id = self.name + '_' + self._unique_id
+
+ # create writer
+ _default_writer_kwargs = _default_writer_kwargs or dict(
+ name=self.name, execution_metadata=self._execution_metadata)
+ self._writer = DefaultWriter(**_default_writer_kwargs)
+
+ # output directory for the agent instance
+ self._output_dir = output_dir or f"output_{self._unique_id}"
+ self._output_dir = Path(self._output_dir)
+
+ @property
+ def writer(self):
+ return self._writer
+
+ @property
+ def unique_id(self):
+ return self._unique_id
+
+ @property
+ def output_dir(self):
+ return self._output_dir
@abstractmethod
def fit(self, budget: int, **kwargs):
@@ -87,6 +122,8 @@ def fit(self, budget: int, **kwargs):
optimization (by allowing early stopping), but it is not strictly required
elsewhere in the library.
+ If the agent does not require a budget, set it to -1.
+
Parameters
----------
budget: int
@@ -110,9 +147,9 @@ def eval(self, **kwargs):
pass
def set_writer(self, writer):
- self.writer = writer
+ self._writer = writer
- if self.writer:
+ if self._writer:
init_args = signature(self.__init__).parameters
kwargs = [f"| {key} | {getattr(self, key, None)} |" for key in init_args]
writer.add_text(
diff --git a/rlberry/agents/jax/dqn/dqn.py b/rlberry/agents/jax/dqn/dqn.py
index e92d641d2..7f889f260 100644
--- a/rlberry/agents/jax/dqn/dqn.py
+++ b/rlberry/agents/jax/dqn/dqn.py
@@ -41,7 +41,6 @@
from rlberry import types
from rlberry.agents import AgentWithSimplePolicy
from rlberry.agents.jax.utils.replay_buffer import ReplayBuffer
-from rlberry.utils.writers import DefaultWriter
from typing import Any, Callable, Mapping, Optional
logger = logging.getLogger(__name__)
@@ -135,7 +134,6 @@ def __init__(
AgentWithSimplePolicy.__init__(self, env, **kwargs)
env = self.env
self.rng_key = jax.random.PRNGKey(self.rng.integers(2 ** 32).item())
- self.writer = DefaultWriter(name=self.name, metadata=self._metadata)
# checks
if not isinstance(self.env.observation_space, spaces.Box):
@@ -433,7 +431,7 @@ def load(cls, filename, **kwargs):
agent._all_states = agent_data['states']
writer = agent_data['writer']
if writer:
- agent.writer = writer
+ agent._writer = writer
return agent
#
diff --git a/rlberry/agents/kernel_based/rs_kernel_ucbvi.py b/rlberry/agents/kernel_based/rs_kernel_ucbvi.py
index 5e676ae22..e376fdbab 100644
--- a/rlberry/agents/kernel_based/rs_kernel_ucbvi.py
+++ b/rlberry/agents/kernel_based/rs_kernel_ucbvi.py
@@ -10,7 +10,6 @@
from rlberry.utils.metrics import metric_lp
from rlberry.agents.kernel_based.kernels import kernel_func
from rlberry.agents.kernel_based.common import map_to_representative
-from rlberry.utils.writers import DefaultWriter
logger = logging.getLogger(__name__)
@@ -250,9 +249,6 @@ def reset(self, **kwargs):
self.episode = 0
- # default writer
- self.writer = DefaultWriter(self.name, metadata=self._metadata)
-
def policy(self, observation):
state = observation
assert self.Q_policy is not None
diff --git a/rlberry/agents/kernel_based/rs_ucbvi.py b/rlberry/agents/kernel_based/rs_ucbvi.py
index 9f15fff08..a9908c0e1 100644
--- a/rlberry/agents/kernel_based/rs_ucbvi.py
+++ b/rlberry/agents/kernel_based/rs_ucbvi.py
@@ -6,7 +6,6 @@
from rlberry.agents.dynprog.utils import backward_induction
from rlberry.agents.dynprog.utils import backward_induction_in_place
from rlberry.agents.kernel_based.common import map_to_representative
-from rlberry.utils.writers import DefaultWriter
logger = logging.getLogger(__name__)
@@ -191,9 +190,6 @@ def reset(self, **kwargs):
self.episode = 0
- # default writer
- self.writer = DefaultWriter(self.name, metadata=self._metadata)
-
def policy(self, observation):
state = observation
assert self.Q_policy is not None
diff --git a/rlberry/agents/linear/lsvi_ucb.py b/rlberry/agents/linear/lsvi_ucb.py
index e8890dfac..438b39e3c 100644
--- a/rlberry/agents/linear/lsvi_ucb.py
+++ b/rlberry/agents/linear/lsvi_ucb.py
@@ -2,7 +2,6 @@
import numpy as np
from rlberry.agents import AgentWithSimplePolicy
from gym.spaces import Discrete
-from rlberry.utils.writers import DefaultWriter
from rlberry.utils.jit_setup import numba_jit
logger = logging.getLogger(__name__)
@@ -203,9 +202,6 @@ def reset(self):
#
self.w_policy = None
- # default writer
- self.writer = DefaultWriter(name=self.name, metadata=self._metadata)
-
def fit(self, budget, **kwargs):
del kwargs
diff --git a/rlberry/agents/optql/optql.py b/rlberry/agents/optql/optql.py
index 8e71e5999..f0fc66750 100644
--- a/rlberry/agents/optql/optql.py
+++ b/rlberry/agents/optql/optql.py
@@ -4,7 +4,6 @@
import gym.spaces as spaces
from rlberry.agents import AgentWithSimplePolicy
from rlberry.exploration_tools.discrete_counter import DiscreteCounter
-from rlberry.utils.writers import DefaultWriter
logger = logging.getLogger(__name__)
@@ -103,9 +102,6 @@ def reset(self, **kwargs):
self.counter = DiscreteCounter(self.env.observation_space,
self.env.action_space)
- # default writer
- self.writer = DefaultWriter(self.name, metadata=self._metadata)
-
def policy(self, observation):
""" Recommended policy. """
state = observation
diff --git a/rlberry/agents/torch/a2c/a2c.py b/rlberry/agents/torch/a2c/a2c.py
index d444140c5..6fe9b475e 100644
--- a/rlberry/agents/torch/a2c/a2c.py
+++ b/rlberry/agents/torch/a2c/a2c.py
@@ -9,7 +9,6 @@
from rlberry.agents.torch.utils.models import default_policy_net_fn
from rlberry.agents.torch.utils.models import default_value_net_fn
from rlberry.utils.torch import choose_device
-from rlberry.utils.writers import DefaultWriter
from rlberry.wrappers.uncertainty_estimator_wrapper import UncertaintyEstimatorWrapper
logger = logging.getLogger(__name__)
@@ -144,9 +143,6 @@ def reset(self, **kwargs):
self.episode = 0
- # default writer
- self.writer = DefaultWriter(self.name, metadata=self._metadata)
-
def policy(self, observation):
state = observation
assert self.cat_policy is not None
diff --git a/rlberry/agents/torch/avec/avec_ppo.py b/rlberry/agents/torch/avec/avec_ppo.py
index a5351aa0f..a3176f34a 100644
--- a/rlberry/agents/torch/avec/avec_ppo.py
+++ b/rlberry/agents/torch/avec/avec_ppo.py
@@ -9,7 +9,6 @@
from rlberry.agents.torch.utils.models import default_policy_net_fn
from rlberry.agents.torch.utils.models import default_value_net_fn
from rlberry.utils.torch import choose_device
-from rlberry.utils.writers import DefaultWriter
from rlberry.wrappers.uncertainty_estimator_wrapper import UncertaintyEstimatorWrapper
logger = logging.getLogger(__name__)
@@ -183,9 +182,6 @@ def reset(self, **kwargs):
self.episode = 0
- # default writer
- self.writer = DefaultWriter(self.name, metadata=self._metadata)
-
def policy(self, observation):
state = observation
assert self.cat_policy is not None
diff --git a/rlberry/agents/torch/dqn/dqn.py b/rlberry/agents/torch/dqn/dqn.py
index da4958485..df8ada635 100644
--- a/rlberry/agents/torch/dqn/dqn.py
+++ b/rlberry/agents/torch/dqn/dqn.py
@@ -135,7 +135,6 @@ def __init__(self,
self.training = True
self.steps = 0
self.episode = 0
- self.writer = None
self.optimizer_kwargs = {'optimizer_type': optimizer_type,
'lr': learning_rate}
@@ -465,7 +464,7 @@ def initialize_model(self):
self.value_net.reset()
def set_writer(self, writer):
- self.writer = writer
+ self._writer = writer
try:
self.exploration_policy.set_writer(writer)
except AttributeError:
diff --git a/rlberry/agents/torch/ppo/ppo.py b/rlberry/agents/torch/ppo/ppo.py
index 3a73e2e45..5b32c5b97 100644
--- a/rlberry/agents/torch/ppo/ppo.py
+++ b/rlberry/agents/torch/ppo/ppo.py
@@ -10,7 +10,6 @@
from rlberry.agents.torch.utils.models import default_policy_net_fn
from rlberry.agents.torch.utils.models import default_value_net_fn
from rlberry.utils.torch import choose_device
-from rlberry.utils.writers import DefaultWriter
from rlberry.wrappers.uncertainty_estimator_wrapper import UncertaintyEstimatorWrapper
@@ -174,9 +173,6 @@ def reset(self, **kwargs):
self.episode = 0
- # default writer
- self.writer = DefaultWriter(self.name, metadata=self._metadata)
-
def policy(self, observation):
state = observation
assert self.cat_policy is not None
diff --git a/rlberry/agents/torch/reinforce/reinforce.py b/rlberry/agents/torch/reinforce/reinforce.py
index 6d2843536..80255877c 100644
--- a/rlberry/agents/torch/reinforce/reinforce.py
+++ b/rlberry/agents/torch/reinforce/reinforce.py
@@ -7,7 +7,6 @@
from rlberry.agents.torch.utils.training import optimizer_factory
from rlberry.agents.torch.utils.models import default_policy_net_fn
from rlberry.utils.torch import choose_device
-from rlberry.utils.writers import DefaultWriter
logger = logging.getLogger(__name__)
@@ -113,9 +112,6 @@ def reset(self, **kwargs):
self.episode = 0
- # default writer
- self.writer = DefaultWriter(self.name, metadata=self._metadata)
-
def policy(self, observation):
state = observation
assert self.policy_net is not None
diff --git a/rlberry/agents/torch/tests/test_actor_critic_algos.py b/rlberry/agents/torch/tests/test_actor_critic_algos.py
index 3d8dc1db7..989cf63dd 100644
--- a/rlberry/agents/torch/tests/test_actor_critic_algos.py
+++ b/rlberry/agents/torch/tests/test_actor_critic_algos.py
@@ -88,7 +88,6 @@ def test_ppo_agent_partial_fit():
eps_clip=0.2,
k_epochs=4,
use_bonus=False)
- agent._log_interval = 0
agent.fit(budget=n_episodes // 2)
agent.policy(env.observation_space.sample())
diff --git a/rlberry/agents/ucbvi/ucbvi.py b/rlberry/agents/ucbvi/ucbvi.py
index cd69b8d60..99476b5f4 100644
--- a/rlberry/agents/ucbvi/ucbvi.py
+++ b/rlberry/agents/ucbvi/ucbvi.py
@@ -7,7 +7,6 @@
from rlberry.exploration_tools.discrete_counter import DiscreteCounter
from rlberry.agents.dynprog.utils import backward_induction_sd
from rlberry.agents.dynprog.utils import backward_induction_in_place
-from rlberry.utils.writers import DefaultWriter
logger = logging.getLogger(__name__)
@@ -154,9 +153,6 @@ def reset(self, **kwargs):
if self.real_time_dp:
self.name = 'UCBVI-RTDP'
- # default writer
- self.writer = DefaultWriter(self.name, metadata=self._metadata)
-
def policy(self, observation):
state = observation
assert self.Q_policy is not None
diff --git a/rlberry/check_packages.py b/rlberry/check_packages.py
index df8d2bd98..8bac791e1 100644
--- a/rlberry/check_packages.py
+++ b/rlberry/check_packages.py
@@ -5,6 +5,12 @@
except ModuleNotFoundError:
TORCH_INSTALLED = False
+TENSORBOARD_INSTALLED = True
+try:
+ import torch.utils.tensorboard
+except ModuleNotFoundError:
+ TENSORBOARD_INSTALLED = False
+
NUMBA_INSTALLED = True
try:
import numba
diff --git a/rlberry/envs/basewrapper.py b/rlberry/envs/basewrapper.py
index 6f6c90a03..f0bb3c6e4 100644
--- a/rlberry/envs/basewrapper.py
+++ b/rlberry/envs/basewrapper.py
@@ -1,6 +1,7 @@
from rlberry.seeding import Seeder, safe_reseed
import numpy as np
from rlberry.envs.interface import Model
+from rlberry.spaces.from_gym import convert_space_from_gym
class Wrapper(Model):
@@ -12,23 +13,34 @@ class Wrapper(Model):
The input environment is not copied (Wrapper.env points
to the input env).
+ Parameters
+ ----------
+ env: gym.Env
+ Environment to be wrapped.
+ wrap_spaces: bool, default = False
+ If True, gym.spaces are converted to rlberry.spaces, which defined a reseed() method.
+
See also:
https://stackoverflow.com/questions/1443129/completely-wrap-an-object-in-python
[1] https://github.com/openai/gym/blob/master/gym/core.py
"""
- def __init__(self, env):
+ def __init__(self, env, wrap_spaces=False):
# Init base class
Model.__init__(self)
# Save reference to env
self.env = env
-
- self.observation_space = self.env.observation_space
- self.action_space = self.env.action_space
self.metadata = self.env.metadata
+ if wrap_spaces:
+ self.observation_space = convert_space_from_gym(self.env.observation_space)
+ self.action_space = convert_space_from_gym(self.env.action_space)
+ else:
+ self.observation_space = self.env.observation_space
+ self.action_space = self.env.action_space
+
try:
self.reward_range = self.env.reward_range
except AttributeError:
@@ -65,8 +77,8 @@ def reseed(self, seed_seq=None):
self.seeder = Seeder(seed_seq)
# seed gym.Env that is not a rlberry Model
if not isinstance(self.env, Model):
- # get a seed for gym environment
- safe_reseed(self.env, self.seeder)
+ # get a seed for gym environment; spaces are reseeded below.
+ safe_reseed(self.env, self.seeder, reseed_spaces=False)
# seed rlberry Model
else:
self.env.reseed(self.seeder)
diff --git a/rlberry/envs/finite/gridworld.py b/rlberry/envs/finite/gridworld.py
index 1f965d6d1..082ed41ef 100644
--- a/rlberry/envs/finite/gridworld.py
+++ b/rlberry/envs/finite/gridworld.py
@@ -67,7 +67,7 @@ def __init__(self,
if terminal_states is not None:
self.terminal_states = terminal_states
else:
- self.terminal_states = ((nrows - 1, ncols - 1),)
+ self.terminal_states = ()
# Probability of going left/right/up/down when choosing the
# correspondent action
@@ -354,7 +354,8 @@ def get_layout_img(
# map data to [0.0, 1.0]
if state_data is not None:
state_data = state_data - state_data.min()
- state_data = state_data / state_data.max()
+ if state_data.max() > 0.0:
+ state_data = state_data / state_data.max()
colormap_fn = plt.get_cmap(colormap_name)
layout = self.get_layout_array(state_data, fill_walls_with=np.nan)
@@ -364,9 +365,9 @@ def get_layout_img(
for rr in range(layout.shape[0]):
for cc in range(layout.shape[1]):
if np.isnan(layout[rr, cc]):
- img[rr, cc, :] = wall_color
+ img[self.nrows - 1 - rr, cc, :] = wall_color
else:
- img[rr, cc, :3] = scalar_map.to_rgba(layout[rr, cc])[:3]
+ img[self.nrows - 1 - rr, cc, :3] = scalar_map.to_rgba(layout[rr, cc])[:3]
return img
def get_background(self):
diff --git a/rlberry/envs/gym_make.py b/rlberry/envs/gym_make.py
index dd57c7bba..93766e049 100644
--- a/rlberry/envs/gym_make.py
+++ b/rlberry/envs/gym_make.py
@@ -2,10 +2,20 @@
from rlberry.envs.basewrapper import Wrapper
-def gym_make(id, **kwargs):
+def gym_make(id, wrap_spaces=False, **kwargs):
"""
Same as gym.make, but wraps the environment
to ensure unified seeding with rlberry.
+
+ Parameters
+ ----------
+ id : str
+ Environment id.
+ wrap_spaces : bool, default = False
+ If true, also wraps observation_space and action_space using classes in rlberry.spaces,
+ that define a reseed() method.
+ **kwargs
+ Optional arguments to configure the environment.
"""
if "module_import" in kwargs:
__import__(kwargs.pop("module_import"))
@@ -14,7 +24,7 @@ def gym_make(id, **kwargs):
env.configure(kwargs)
except AttributeError:
pass
- return Wrapper(env)
+ return Wrapper(env, wrap_spaces=wrap_spaces)
def atari_make(id, scalarize=True, **kwargs):
diff --git a/rlberry/envs/utils.py b/rlberry/envs/utils.py
index 3d9cd3678..e29a4cdde 100644
--- a/rlberry/envs/utils.py
+++ b/rlberry/envs/utils.py
@@ -20,5 +20,6 @@ def process_env(env, seeder, copy_env=True):
else:
processed_env = env
reseeded = safe_reseed(processed_env, seeder)
- assert reseeded
+ if not reseeded:
+ logger.warning("[Agent] Not possible to reseed environment.")
return processed_env
diff --git a/rlberry/experiment/generator.py b/rlberry/experiment/generator.py
index a93625423..816016a7e 100644
--- a/rlberry/experiment/generator.py
+++ b/rlberry/experiment/generator.py
@@ -1,27 +1,22 @@
"""Run experiments.
Usage:
- run.py [--writer] [--n_fit=] [--output_dir=] [--parallelization=]
+ run.py [--enable_tensorboard] [--n_fit=] [--output_dir=] [--parallelization=]
run.py (-h | --help)
Options:
- -h --help Show this screen.
- --writer Use a tensorboard writer.
- --n_fit= Number of times each agent is fit [default: 4].
- --output_dir= Directory to save the results [default: results].
+ -h --help Show this screen.
+ --enable_tensorboard Enable tensorboard writer in AgentManager.
+ --n_fit= Number of times each agent is fit [default: 4].
+ --output_dir= Directory to save the results [default: results].
--parallelization= Either 'thread' or 'process' [default: process].
"""
import logging
from docopt import docopt
from pathlib import Path
-from datetime import datetime
from rlberry.experiment.yaml_utils import parse_experiment_config
-
-_TENSORBOARD_INSTALLED = True
-try:
- from torch.utils.tensorboard import SummaryWriter
-except ImportError:
- _TENSORBOARD_INSTALLED = False
+from rlberry.manager import AgentManager
+from rlberry import check_packages
logger = logging.getLogger(__name__)
@@ -31,17 +26,15 @@ def experiment_generator():
Parse command line arguments and yields AgentManager instances.
"""
args = docopt(__doc__)
- for (_, agent_manager) in parse_experiment_config(
+ for (_, agent_manager_kwargs) in parse_experiment_config(
Path(args[""]),
n_fit=int(args["--n_fit"]),
output_base_dir=args["--output_dir"],
parallelization=args["--parallelization"]):
- if args["--writer"]:
- if _TENSORBOARD_INSTALLED:
- for idx in range(agent_manager.n_fit):
- logdir = agent_manager.output_dir / f"run_{idx + 1}_{datetime.now().strftime('%b%d_%H-%M-%S')}"
- agent_manager.set_writer(idx=idx, writer_fn=SummaryWriter, writer_kwargs={'log_dir': logdir})
+ if args["--enable_tensorboard"]:
+ if check_packages.TENSORBOARD_INSTALLED:
+ agent_manager_kwargs.update(dict(enable_tensorboard=True))
else:
- logger.warning('Option --writer is not available: tensorboard is not installed.')
+ logger.warning('Option --enable_tensorboard is not available: tensorboard is not installed.')
- yield agent_manager
+ yield AgentManager(**agent_manager_kwargs)
diff --git a/rlberry/experiment/tests/test_experiment_generator.py b/rlberry/experiment/tests/test_experiment_generator.py
index bf4340bdc..0d6e6c8ab 100644
--- a/rlberry/experiment/tests/test_experiment_generator.py
+++ b/rlberry/experiment/tests/test_experiment_generator.py
@@ -16,25 +16,25 @@ def test_mock_args(monkeypatch):
random_numbers.append(rng.uniform(size=10))
assert agent_manager.agent_class is RSUCBVIAgent
- assert agent_manager.init_kwargs['horizon'] == 51
+ assert agent_manager._base_init_kwargs['horizon'] == 51
assert agent_manager.fit_budget == 10
assert agent_manager.eval_kwargs['eval_horizon'] == 51
- assert agent_manager.init_kwargs['lp_metric'] == 2
- assert agent_manager.init_kwargs['min_dist'] == 0.0
- assert agent_manager.init_kwargs['max_repr'] == 800
- assert agent_manager.init_kwargs['bonus_scale_factor'] == 1.0
- assert agent_manager.init_kwargs['reward_free'] is True
+ assert agent_manager._base_init_kwargs['lp_metric'] == 2
+ assert agent_manager._base_init_kwargs['min_dist'] == 0.0
+ assert agent_manager._base_init_kwargs['max_repr'] == 800
+ assert agent_manager._base_init_kwargs['bonus_scale_factor'] == 1.0
+ assert agent_manager._base_init_kwargs['reward_free'] is True
train_env = agent_manager.train_env[0](**agent_manager.train_env[1])
assert train_env.reward_free is False
assert train_env.array_observation is True
if agent_manager.agent_name == 'rsucbvi':
- assert agent_manager.init_kwargs['gamma'] == 1.0
+ assert agent_manager._base_init_kwargs['gamma'] == 1.0
elif agent_manager.agent_name == 'rsucbvi_alternative':
- assert agent_manager.init_kwargs['gamma'] == 0.9
+ assert agent_manager._base_init_kwargs['gamma'] == 0.9
else:
raise ValueError()
diff --git a/rlberry/experiment/yaml_utils.py b/rlberry/experiment/yaml_utils.py
index c9250af18..e9852512e 100644
--- a/rlberry/experiment/yaml_utils.py
+++ b/rlberry/experiment/yaml_utils.py
@@ -1,8 +1,6 @@
from pathlib import Path
from typing import Generator, Tuple
import yaml
-
-from rlberry.manager import AgentManager
from rlberry.utils.factory import load
_AGENT_KEYS = ('init_kwargs', 'eval_kwargs', 'fit_kwargs')
@@ -102,7 +100,7 @@ def read_env_config(config_path):
def parse_experiment_config(path: Path,
n_fit: int = 4,
output_base_dir: str = 'results',
- parallelization: str = 'process') -> Generator[Tuple[int, AgentManager], None, None]:
+ parallelization: str = 'process') -> Generator[Tuple[int, dict], None, None]:
"""
Read .yaml files. set global seed and convert to AgentManager instances.
@@ -133,8 +131,8 @@ def parse_experiment_config(path: Path,
-------
seed: int
global seed
- agent_manager: AgentManager
- the Agent Stats to fit
+ agent_manager_kwargs:
+ parameters to create an AgentManager instance.
"""
with path.open() as file:
config = yaml.safe_load(file)
@@ -185,7 +183,7 @@ def parse_experiment_config(path: Path,
# append run index to dir
output_dir = output_dir / str(last + 1)
- yield seed, AgentManager(
+ yield seed, dict(
agent_class=agent_class,
init_kwargs=init_kwargs,
eval_kwargs=eval_kwargs,
diff --git a/rlberry/manager/agent_manager.py b/rlberry/manager/agent_manager.py
index c1c63ec80..9c4ec685c 100644
--- a/rlberry/manager/agent_manager.py
+++ b/rlberry/manager/agent_manager.py
@@ -1,10 +1,10 @@
import concurrent.futures
from copy import deepcopy
-from datetime import datetime
from pathlib import Path
from rlberry.seeding import safe_reseed, set_external_seed
from rlberry.seeding import Seeder
+from rlberry import metadata_utils
import functools
import json
@@ -17,7 +17,6 @@
import threading
import multiprocessing
import numpy as np
-import uuid
from rlberry.envs.utils import process_env
from rlberry.utils.logging import configure_logging
from rlberry.utils.writers import DefaultWriter
@@ -55,7 +54,7 @@ class AgentHandler:
Class of the agent to be wrapped
agent_instance:
An instance of agent_class, or None (if not loaded).
- **agent_kwargs:
+ agent_kwargs:
Arguments required by __init__ method of agent_class.
"""
@@ -65,13 +64,13 @@ def __init__(self,
seeder,
agent_class,
agent_instance=None,
- **agent_kwargs) -> None:
+ agent_kwargs=None) -> None:
self._id = id
self._fname = Path(filename)
self._seeder = seeder
self._agent_class = agent_class
self._agent_instance = agent_instance
- self._agent_kwargs = agent_kwargs
+ self._agent_kwargs = agent_kwargs or {}
@property
def id(self):
@@ -80,6 +79,11 @@ def id(self):
def set_instance(self, agent_instance):
self._agent_instance = agent_instance
+ def get_instance(self):
+ if not self.is_loaded():
+ self.load()
+ return self._agent_instance
+
def is_empty(self):
return self._agent_instance is None and (not self._fname.exists())
@@ -156,7 +160,7 @@ class AgentManager:
eval_env : Tuple (constructor, kwargs)
Environment used to evaluate the agent. If None, set train_env.
init_kwargs : dict
- Arguments required by the agent's constructor.
+ Arguments required by the agent's constructor. Shared across all n_fit instances.
fit_kwargs : dict
Extra required to call agent.fit(bugdet, **fit_kwargs).
eval_kwargs : dict
@@ -169,16 +173,26 @@ class AgentManager:
Directory where to store data.
parallelization: {'thread', 'process'}, default: 'process'
Whether to parallelize agent training using threads or processes.
- thread_logging_level : str, default: 'INFO'
- Logging level in each of the threads used to fit agents.
+ worker_logging_level : str, default: 'INFO'
+ Logging level in each of the threads/processes used to fit agents.
seed : np.random.SeedSequence, rlberry.seeding.Seeder or int, default : None
Seed sequence from which to spawn the random number generator.
If None, generate random seed.
If int, use as entropy for SeedSequence.
If seeder, use seeder.seed_seq
+ enable_tensorboard : bool, default = False
+ If True, enable tensorboard logging in Agent's DefaultWriter.
create_unique_out_dir : bool, default = True
If true, data is saved to output_dir/manager_data/
Otherwise, data is saved to output_dir/manager_data
+ default_writer_kwargs : dict
+ Optional arguments for DefaultWriter.
+ init_kwargs_per_instance : List[dict] (optional)
+ List of length n_fit containing the params to be passed to each of
+ the n_fit agent instances. It can be useful if different instances
+ require different parameters. If the same parameter is defined by
+ init_kwargs and init_kwargs_per_instance, the value given by
+ init_kwargs_per_instance will be used.
"""
def __init__(self,
@@ -193,9 +207,12 @@ def __init__(self,
n_fit=4,
output_dir=None,
parallelization='thread',
- thread_logging_level='INFO',
+ worker_logging_level='INFO',
seed=None,
- create_unique_out_dir=True):
+ enable_tensorboard=False,
+ create_unique_out_dir=True,
+ default_writer_kwargs=None,
+ init_kwargs_per_instance=None):
# agent_class should only be None when the constructor is called
# by the class method AgentManager.load(), since the agent class
# will be loaded.
@@ -218,9 +235,7 @@ def __init__(self,
eval_env, Tuple), "[AgentManager]train_env must be Tuple (constructor, kwargs)"
# create oject identifier
- timestamp = datetime.timestamp(datetime.now())
- self.timestamp = str(timestamp).replace('.', '')
- self.unique_id = str(id(self)) + self.timestamp
+ self.unique_id = metadata_utils.get_unique_id(self)
# Agent class
self.agent_class = agent_class
@@ -235,17 +250,17 @@ def __init__(self,
self._eval_env = eval_env
# check kwargs
- init_kwargs = init_kwargs or {}
fit_kwargs = fit_kwargs or {}
eval_kwargs = eval_kwargs or {}
# params
- self.init_kwargs = deepcopy(init_kwargs)
+ base_init_kwargs = init_kwargs or {}
+ self._base_init_kwargs = deepcopy(base_init_kwargs)
self.fit_kwargs = deepcopy(fit_kwargs)
self.eval_kwargs = deepcopy(eval_kwargs)
self.n_fit = n_fit
self.parallelization = parallelization
- self.thread_logging_level = thread_logging_level
+ self.worker_logging_level = worker_logging_level
if fit_budget is not None:
self.fit_budget = fit_budget
else:
@@ -253,18 +268,50 @@ def __init__(self,
self.fit_budget = self.fit_kwargs.pop('fit_budget')
except KeyError:
raise ValueError('[AgentManager] fit_budget missing in __init__().')
+ # extra params per instance
+ if init_kwargs_per_instance is not None:
+ assert len(init_kwargs_per_instance) == n_fit
+ init_kwargs_per_instance = deepcopy(init_kwargs_per_instance)
+ self.init_kwargs_per_instance = init_kwargs_per_instance or [dict() for _ in range(n_fit)]
# output dir
if output_dir is None:
- output_dir = 'temp/'
+ output_dir = metadata_utils.RLBERRY_TEMP_DATA_DIR
self.output_dir = Path(output_dir) / 'manager_data'
if create_unique_out_dir:
- self.output_dir = self.output_dir / (self.agent_name + '_id' + self.unique_id)
+ self.output_dir = self.output_dir / (self.agent_name + '_' + self.unique_id)
# Create list of writers for each agent that will be trained
+ # 'default' will keep Agent's use of DefaultWriter.
self.writers = [('default', None) for _ in range(n_fit)]
- #
+ # Parameters to setup Agent's DefaultWriter
+ self.agent_default_writer_kwargs = [
+ dict(
+ name=self.agent_name,
+ log_interval=3,
+ tensorboard_kwargs=None,
+ execution_metadata=metadata_utils.ExecutionMetadata(obj_worker_id=idx)
+ )
+ for idx in range(n_fit)
+ ]
+ self.tensorboard_dir = None
+ if enable_tensorboard:
+ self.tensorboard_dir = self.output_dir / 'tensorboard'
+ for idx, params in enumerate(self.agent_default_writer_kwargs):
+ params['tensorboard_kwargs'] = dict(
+ log_dir=self.tensorboard_dir / str(idx)
+ )
+ # Update DefaultWriter according to user's settings.
+ default_writer_kwargs = default_writer_kwargs or {}
+ if default_writer_kwargs:
+ logger.warning('(Re)defining the following DefaultWriter'
+ f' parameters in AgentManager: {list(default_writer_kwargs.keys())}')
+ for ii in range(n_fit):
+ self.agent_default_writer_kwargs[ii].update(default_writer_kwargs)
+
+ # agent handlers and init kwargs
+ self._set_init_kwargs() # init_kwargs for each agent
self.agent_handlers = None
self._reset_agent_handlers()
self.default_writer_data = None
@@ -285,6 +332,26 @@ def _init_optuna_storage_url(self):
self.optuna_storage_url = "sqlite:///:memory:"
logger.warning(f'Unable to create databate {self.db_filename}. Using sqlite:///:memory:')
+ def _set_init_kwargs(self):
+ init_seeders = self.seeder.spawn(self.n_fit, squeeze=False)
+ self.init_kwargs = []
+ for ii in range(self.n_fit):
+ kwargs_ii = deepcopy(self._base_init_kwargs)
+ kwargs_ii.update(
+ dict(
+ env=self.train_env,
+ eval_env=self._eval_env,
+ copy_env=False,
+ seeder=init_seeders[ii],
+ output_dir=Path(self.output_dir) / f"output_{ii}",
+ _execution_metadata=self.agent_default_writer_kwargs[ii]['execution_metadata'],
+ _default_writer_kwargs=self.agent_default_writer_kwargs[ii],
+ )
+ )
+ per_instance_kwargs = self.init_kwargs_per_instance[ii]
+ kwargs_ii.update(per_instance_kwargs)
+ self.init_kwargs.append(kwargs_ii)
+
def _reset_agent_handlers(self):
handlers_seeders = self.seeder.spawn(self.n_fit, squeeze=False)
self.agent_handlers = [
@@ -295,8 +362,7 @@ def _reset_agent_handlers(self):
agent_class=self.agent_class,
agent_instance=None,
# kwargs
- env=self.train_env,
- **self.init_kwargs,
+ agent_kwargs=self.init_kwargs[ii],
)
for ii in range(self.n_fit)
]
@@ -311,6 +377,11 @@ def build_eval_env(self):
def get_writer_data(self):
return self.default_writer_data
+ def get_agent_instances(self):
+ if self.agent_handlers:
+ return [agent_handler.get_instance() for agent_handler in self.agent_handlers]
+ return []
+
def eval_agents(self, n_simulations: Optional[int] = None) -> list:
"""
Call .eval() method in fitted agents and returns a list with the results.
@@ -405,20 +476,17 @@ def fit(self, budget=None, **kwargs):
raise ValueError(f'Invalid backend for parallelization: {self.parallelization}')
args = [(
- idx,
lock,
handler,
self.agent_class,
- self.train_env,
- self._eval_env,
budget,
- deepcopy(self.init_kwargs),
+ init_kwargs,
deepcopy(self.fit_kwargs),
writer,
- self.thread_logging_level,
+ self.worker_logging_level,
seeder)
- for idx, (handler, seeder, writer)
- in enumerate(zip(self.agent_handlers, seeders, self.writers))]
+ for init_kwargs, handler, seeder, writer
+ in zip(self.init_kwargs, self.agent_handlers, seeders, self.writers)]
if len(args) == 1:
workers_output = [_fit_worker(args[0])]
@@ -667,7 +735,7 @@ def optimize_hyperparams(self,
#
objective = functools.partial(
_optuna_objective,
- init_kwargs=self.init_kwargs, # self.init_kwargs
+ base_init_kwargs=self._base_init_kwargs, # self._base_init_kwargs
agent_class=self.agent_class, # self.agent_class
train_env=self.train_env, # self.train_env
eval_env=self._eval_env,
@@ -731,9 +799,10 @@ def optimize_hyperparams(self,
self.best_hyperparams = best_trial.params
# update using best parameters
- self.init_kwargs.update(best_trial.params)
+ self._base_init_kwargs.update(best_trial.params)
- # reset agent handlers, so that they take the new parameters
+ # reset init_kwargs and agent handlers, so that they take the new parameters
+ self._set_init_kwargs()
self._reset_agent_handlers()
return deepcopy(best_trial.params)
@@ -748,37 +817,29 @@ def _fit_worker(args):
"""
Create and fit an agent instance
"""
- idx, lock, agent_handler, agent_class, train_env, eval_env, fit_budget, init_kwargs, \
- fit_kwargs, writer, thread_logging_level, seeder = args
+ (lock, agent_handler, agent_class, fit_budget, init_kwargs,
+ fit_kwargs, writer, worker_logging_level, seeder) = args
# reseed external libraries
set_external_seed(seeder)
# logging level in thread
- configure_logging(thread_logging_level)
+ configure_logging(worker_logging_level)
# Using a lock when creating envs and agents, to avoid problems
# as here: https://github.com/openai/gym/issues/281
with lock:
if agent_handler.is_empty():
# create agent
- agent = agent_class(
- env=train_env,
- eval_env=eval_env,
- copy_env=False,
- seeder=seeder,
- _metadata=dict(
- worker=idx,
- ),
- **init_kwargs)
+ agent = agent_class(**init_kwargs)
# seed agent
- agent.reseed(seeder)
+ agent.reseed(seeder) # TODO: check if extra reseeding here is necessary
agent_handler.set_instance(agent)
# set writer
if writer[0] is None:
agent_handler.set_writer(None)
- elif writer[0] != 'default':
+ elif writer[0] != 'default': # 'default' corresponds to DefaultWriter created by Agent.__init__()
writer_fn = writer[0]
writer_kwargs = writer[1]
agent_handler.set_writer(writer_fn(**writer_kwargs))
@@ -813,7 +874,7 @@ def default(obj):
def _optuna_objective(
trial,
- init_kwargs, # self.init_kwargs
+ base_init_kwargs, # self._base_init_kwargs
agent_class, # self.agent_class
train_env, # self.train_env
eval_env,
@@ -824,7 +885,7 @@ def _optuna_objective(
disable_evaluation_writers,
fit_fraction
):
- kwargs = deepcopy(init_kwargs)
+ kwargs = deepcopy(base_init_kwargs)
# will raise exception if sample_parameters() is not
# implemented by the agent class
@@ -841,11 +902,12 @@ def _optuna_objective(
eval_env=eval_env,
init_kwargs=kwargs, # kwargs are being optimized
eval_kwargs=deepcopy(eval_kwargs),
- agent_name='optim_' + uuid.uuid4().hex,
+ agent_name='optim',
n_fit=n_fit,
- thread_logging_level='INFO',
+ worker_logging_level='INFO',
parallelization='thread',
output_dir=temp_dir,
+ enable_tensorboard=False,
create_unique_out_dir=True)
if disable_evaluation_writers:
diff --git a/rlberry/manager/evaluation.py b/rlberry/manager/evaluation.py
index 785cb31ae..dd3a64b47 100644
--- a/rlberry/manager/evaluation.py
+++ b/rlberry/manager/evaluation.py
@@ -87,6 +87,7 @@ def evaluate_agents(agent_manager_list,
def plot_writer_data(agent_manager,
tag,
+ xtag=None,
fignum=None,
show=True,
preprocess_func=None,
@@ -100,7 +101,9 @@ def plot_writer_data(agent_manager,
----------
agent_manager : AgentManager, or list of AgentManager
tag : str
- Tag of data to plot.
+ Tag of data to plot on y-axis.
+ xtag : str
+ Tag of data to plot on x-axis. If None, use 'global_step'.
fignum: string or int
Identifier of plot figure.
show: bool
@@ -112,6 +115,10 @@ def plot_writer_data(agent_manager,
Optional title to plot. If None, set to tag.
sns_kwargs: dict
Optional extra params for seaborn lineplot.
+
+ Returns
+ -------
+ Pandas DataFrame with processed data used by seaborn's lineplot.
"""
sns_kwargs = sns_kwargs or {'ci': 'sd'}
@@ -136,11 +143,15 @@ def plot_writer_data(agent_manager,
if writer_data is not None:
for idx in writer_data:
df = writer_data[idx]
- df = pd.DataFrame(df[df['tag'] == tag])
- df['value'] = preprocess_func(df['value'].values)
+ processed_df = pd.DataFrame(df[df['tag'] == tag])
+ processed_df['value'] = preprocess_func(processed_df['value'].values)
# update name according to AgentManager name
- df['name'] = agent_name
- data_list.append(df)
+ processed_df['name'] = agent_name
+ # add column with xtag, if given
+ if xtag is not None:
+ df_xtag = pd.DataFrame(df[df['tag'] == xtag])
+ processed_df[xtag] = df_xtag['value'].values
+ data_list.append(processed_df)
if len(data_list) == 0:
logger.error('[plot_writer_data]: No data to be plotted.')
return
@@ -148,17 +159,24 @@ def plot_writer_data(agent_manager,
all_writer_data = pd.concat(data_list, ignore_index=True)
data = all_writer_data[all_writer_data['tag'] == tag]
- if data['global_step'].notnull().sum() > 0:
- xx = 'global_step'
+ if xtag is None:
+ xtag = 'global_step'
+
+ if data[xtag].notnull().sum() > 0:
+ xx = xtag
if data['global_step'].isna().sum() > 0:
- logger.warning(f'Plotting {tag} vs global_step, but global_step might be missing for some agents.')
+ logger.warning(f'Plotting {tag} vs {xtag}, but {xtag} might be missing for some agents.')
else:
xx = data.index
plt.figure(fignum)
- sns.lineplot(x=xx, y='value', hue='name', style='name', data=data, **sns_kwargs)
+ lineplot_kwargs = dict(x=xx, y='value', hue='name', style='name', data=data)
+ lineplot_kwargs.update(sns_kwargs)
+ sns.lineplot(**lineplot_kwargs)
plt.title(title)
plt.ylabel(ylabel)
if show:
plt.show()
+
+ return data
diff --git a/rlberry/manager/remote_agent_manager.py b/rlberry/manager/remote_agent_manager.py
index fffaaa8fb..5d296bf34 100644
--- a/rlberry/manager/remote_agent_manager.py
+++ b/rlberry/manager/remote_agent_manager.py
@@ -1,9 +1,11 @@
+import base64
import dill
import io
import logging
import pandas as pd
import pathlib
import pickle
+import zipfile
from typing import Any, Mapping, Optional
from rlberry.network import interface
from rlberry.network.client import BerryClient
@@ -41,6 +43,9 @@ def __init__(
data=None,
)
)
+ if msg.command == interface.Command.RAISE_EXCEPTION:
+ raise Exception(msg.message)
+
self._remote_agent_manager_filename = pathlib.Path(
msg.info['filename']
)
@@ -57,6 +62,11 @@ def remote_file(self):
return str(self._remote_agent_manager_filename)
def get_writer_data(self):
+ """
+ * Calls get_writer_data() in the remote AgentManager and returns the result locally.
+ * If tensorboard data is available in the remote AgentManager, the data is zipped,
+ received locally and unzipped.
+ """
msg = self._client.send(
interface.Message.create(
command=interface.Command.AGENT_MANAGER_GET_WRITER_DATA,
@@ -65,18 +75,32 @@ def get_writer_data(self):
)
if msg.command == interface.Command.RAISE_EXCEPTION:
raise Exception(msg.message)
- raw_data = msg.data
+ raw_data = msg.data['writer_data']
writer_data = dict()
for idx in raw_data:
csv_content = raw_data[idx]
writer_data[idx] = pd.read_csv(io.StringIO(csv_content), sep=',')
+
+ # check if tensorboard data was received
+ # If so, read file and unzip it.
+ tensorboard_bin_data = msg.data['tensorboard_bin_data']
+ if tensorboard_bin_data is not None:
+ tensorboard_bin_data = base64.b64decode(tensorboard_bin_data.encode('ascii'))
+ zip_file = open(self.output_dir / 'tensorboard_data.zip', "wb")
+ zip_file.write(tensorboard_bin_data)
+ zip_file.close()
+ with zipfile.ZipFile(self.output_dir / 'tensorboard_data.zip', 'r') as zip_ref:
+ zip_ref.extractall(self.output_dir)
return writer_data
- def fit(self):
+ def fit(self, budget=None, **kwargs):
msg = self._client.send(
interface.Message.create(
command=interface.Command.AGENT_MANAGER_FIT,
- params=dict(filename=self.remote_file),
+ params=dict(
+ filename=self.remote_file,
+ budget=budget,
+ extra_params=kwargs),
data=None,
)
)
diff --git a/rlberry/manager/tests/test_agent_manager.py b/rlberry/manager/tests/test_agent_manager.py
index f92ec369a..22500a92c 100644
--- a/rlberry/manager/tests/test_agent_manager.py
+++ b/rlberry/manager/tests/test_agent_manager.py
@@ -2,7 +2,6 @@
from rlberry.envs import GridWorld
from rlberry.agents import AgentWithSimplePolicy
from rlberry.manager import AgentManager, plot_writer_data, evaluate_agents
-from rlberry.utils.writers import DefaultWriter
class DummyAgent(AgentWithSimplePolicy):
@@ -18,7 +17,6 @@ def __init__(self,
self.hyperparameter2 = hyperparameter2
self.total_budget = 0.0
- self.writer = DefaultWriter(self.name, metadata=self._metadata)
def fit(self, budget, **kwargs):
del kwargs
@@ -48,7 +46,7 @@ def test_agent_manager_1():
train_env = (GridWorld, {})
# Parameters
- params = {}
+ params = dict(hyperparameter1=-1, hyperparameter2=100)
eval_kwargs = dict(eval_horizon=10)
# Check DummyAgent
@@ -57,9 +55,10 @@ def test_agent_manager_1():
agent.policy(None)
# Run AgentManager
+ params_per_instance = [dict(hyperparameter2=ii) for ii in range(4)]
stats_agent1 = AgentManager(
DummyAgent, train_env, fit_budget=5, eval_kwargs=eval_kwargs,
- init_kwargs=params, n_fit=4, seed=123)
+ init_kwargs=params, n_fit=4, seed=123, init_kwargs_per_instance=params_per_instance)
stats_agent2 = AgentManager(
DummyAgent, train_env, fit_budget=5, eval_kwargs=eval_kwargs,
init_kwargs=params, n_fit=4, seed=123)
@@ -67,6 +66,14 @@ def test_agent_manager_1():
for st in agent_manager_list:
st.fit()
+ for ii, instance in enumerate(stats_agent1.agent_handlers):
+ assert instance.hyperparameter1 == -1
+ assert instance.hyperparameter2 == ii
+
+ for ii, instance in enumerate(stats_agent2.agent_handlers):
+ assert instance.hyperparameter1 == -1
+ assert instance.hyperparameter2 == 100
+
# learning curves
plot_writer_data(agent_manager_list, tag='episode_rewards', show=False)
diff --git a/rlberry/metadata_utils.py b/rlberry/metadata_utils.py
new file mode 100644
index 000000000..7a4302451
--- /dev/null
+++ b/rlberry/metadata_utils.py
@@ -0,0 +1,42 @@
+from datetime import datetime
+import uuid
+import hashlib
+from typing import Optional, NamedTuple
+
+
+# Default output directory used by the library.
+RLBERRY_DEFAULT_DATA_DIR = 'rlberry_data/'
+
+# Temporary directory used by the library
+RLBERRY_TEMP_DATA_DIR = 'rlberry_data/temp/'
+
+
+def get_unique_id(obj):
+ """
+ Get a unique id for an obj. Use it in __init__ methods when necessary.
+ """
+ # id() is guaranteed to be unique among simultaneously existing objects (uses memory address).
+ # uuid4() is an universal id, but there might be issues if called simultaneously in different processes.
+ # This function combines id(), uuid4(), and a timestamp in a single ID, and hashes it.
+ timestamp = datetime.timestamp(datetime.now())
+ timestamp = str(timestamp).replace('.', '')
+ str_id = timestamp + str(id(obj)) + uuid.uuid4().hex
+ str_id = hashlib.md5(str_id.encode()).hexdigest()
+ return str_id
+
+
+class ExecutionMetadata(NamedTuple):
+ """
+ Metadata for objects handled by rlberry.
+
+ Attributes
+ ----------
+ obj_worker_id : int, default: -1
+ If given, must be >= 0, and inform the worker id (thread or process) where the
+ object was created. It is not necessarity unique across all the workers launched by
+ rlberry, it is mainly for debug purposes.
+ obj_info : dict, default: None
+ Extra info about the object.
+ """
+ obj_worker_id: int = -1
+ obj_info: Optional[dict] = None
diff --git a/rlberry/network/interface.py b/rlberry/network/interface.py
index 815722887..7ee4b5cc8 100644
--- a/rlberry/network/interface.py
+++ b/rlberry/network/interface.py
@@ -81,6 +81,7 @@ def send_data(socket, data):
"""
adapted from: https://stackoverflow.com/a/63532988
"""
+ print(f'[rlberry.network] sending {len(data)} bytes...')
socket.sendall(struct.pack('>I', len(data)) + data)
@@ -97,4 +98,5 @@ def receive_data(socket):
while remaining_size > 0:
received_data += socket.recv(remaining_size)
remaining_size = data_size - len(received_data)
+ print(f'[rlberry.network] ... received {len(received_data)}/{data_size} bytes.')
return received_data
diff --git a/rlberry/network/server_utils.py b/rlberry/network/server_utils.py
index 29dcc8030..e5beed83c 100644
--- a/rlberry/network/server_utils.py
+++ b/rlberry/network/server_utils.py
@@ -1,5 +1,9 @@
+import pathlib
from rlberry.network import interface
from rlberry.manager import AgentManager
+from rlberry import metadata_utils
+import rlberry.utils.io
+import base64
def execute_message(
@@ -15,25 +19,28 @@ def execute_message(
# AGENT_MANAGER_CREATE_INSTANCE
elif message.command == interface.Command.AGENT_MANAGER_CREATE_INSTANCE:
params = message.params
+ base_dir = pathlib.Path(metadata_utils.RLBERRY_DEFAULT_DATA_DIR)
if 'output_dir' in params:
- params['output_dir'] = 'client_data' / params['output_dir']
+ params['output_dir'] = base_dir / 'server_data' / params['output_dir']
else:
- params['output_dir'] = 'client_data/'
+ params['output_dir'] = base_dir / 'server_data/'
agent_manager = AgentManager(**params)
filename = str(agent_manager.save())
response = interface.Message.create(
info=dict(
filename=filename,
agent_name=agent_manager.agent_name,
- output_dir=str(agent_manager.output_dir).replace('client_data/', 'remote_data/')
+ output_dir=str(agent_manager.output_dir).replace('server_data/', 'client_data/')
)
)
del agent_manager
# AGENT_MANAGER_FIT
elif message.command == interface.Command.AGENT_MANAGER_FIT:
filename = message.params['filename']
+ budget = message.params['budget']
+ extra_params = message.params['extra_params']
agent_manager = AgentManager.load(filename)
- agent_manager.fit()
+ agent_manager.fit(budget, **extra_params)
agent_manager.save()
response = interface.Message.create(command=interface.Command.ECHO)
del agent_manager
@@ -76,13 +83,27 @@ def execute_message(
response = interface.Message.create(data=best_params_dict)
# AGENT_MANAGER_GET_WRITER_DATA
elif message.command == interface.Command.AGENT_MANAGER_GET_WRITER_DATA:
+ # writer scalar data
filename = message.params['filename']
agent_manager = AgentManager.load(filename)
writer_data = agent_manager.get_writer_data()
writer_data = writer_data or dict()
for idx in writer_data:
writer_data[idx] = writer_data[idx].to_csv(index=False)
+ # tensoboard data
+ tensorboard_bin_data = None
+ if agent_manager.tensorboard_dir is not None:
+ tensorboard_zip_file = rlberry.utils.io.zipdir(
+ agent_manager.tensorboard_dir,
+ agent_manager.output_dir / 'tensorboard_data.zip')
+ if tensorboard_zip_file is not None:
+ tensorboard_bin_data = open(tensorboard_zip_file, "rb").read()
+ tensorboard_bin_data = base64.b64encode(tensorboard_bin_data).decode('ascii')
+ response = interface.Message.create(
+ data=dict(
+ writer_data=writer_data,
+ tensorboard_bin_data=tensorboard_bin_data)
+ )
del agent_manager
- response = interface.Message.create(data=writer_data)
# end
return response
diff --git a/rlberry/seeding/seeding.py b/rlberry/seeding/seeding.py
index 0f82f07bc..0edce4be2 100644
--- a/rlberry/seeding/seeding.py
+++ b/rlberry/seeding/seeding.py
@@ -27,7 +27,7 @@ def set_external_seed(seeder):
torch.manual_seed(seeder.seed_seq.generate_state(1, dtype=np.uint32)[0])
-def safe_reseed(obj, seeder):
+def safe_reseed(obj, seeder, reseed_spaces=True):
"""
Calls obj.reseed(seed_seq) method if available;
If a obj.seed() method is available, call obj.seed(seed_val),
@@ -40,6 +40,9 @@ def safe_reseed(obj, seeder):
Object to be reseeded.
seeder: seeding.Seeder
Seeder object from which to generate random seeds.
+ reseed_spaces: bool, default = True.
+ If False, do not try to reseed observation_space and action_space (if
+ they exist as attributes of `obj`).
Returns
-------
@@ -58,10 +61,11 @@ def safe_reseed(obj, seeder):
reseeded = False
# check if the object has observation and action spaces to be reseeded.
- try:
- safe_reseed(obj.observation_space, seeder)
- safe_reseed(obj.action_space, seeder)
- except AttributeError:
- pass
+ if reseed_spaces:
+ try:
+ safe_reseed(obj.observation_space, seeder)
+ safe_reseed(obj.action_space, seeder)
+ except AttributeError:
+ pass
return reseeded
diff --git a/rlberry/spaces/from_gym.py b/rlberry/spaces/from_gym.py
new file mode 100644
index 000000000..939a3740c
--- /dev/null
+++ b/rlberry/spaces/from_gym.py
@@ -0,0 +1,36 @@
+import rlberry.spaces
+import gym.spaces
+
+
+def convert_space_from_gym(space):
+ if isinstance(space, gym.spaces.Box) and (not isinstance(space, rlberry.spaces.Box)):
+ return rlberry.spaces.Box(
+ space.low,
+ space.high,
+ shape=space.shape,
+ dtype=space.dtype
+ )
+ if isinstance(space, gym.spaces.Discrete) and (not isinstance(space, rlberry.spaces.Discrete)):
+ return rlberry.spaces.Discrete(
+ n=space.n
+ )
+ if isinstance(space, gym.spaces.MultiBinary) and (not isinstance(space, rlberry.spaces.MultiBinary)):
+ return rlberry.spaces.MultiBinary(
+ n=space.n
+ )
+ if isinstance(space, gym.spaces.MultiDiscrete) and (not isinstance(space, rlberry.spaces.MultiDiscrete)):
+ return rlberry.spaces.MultiDiscrete(
+ nvec=space.nvec,
+ dtype=space.dtype,
+ )
+ if isinstance(space, gym.spaces.Tuple) and (not isinstance(space, rlberry.spaces.Tuple)):
+ return rlberry.spaces.Tuple(
+ spaces=[convert_space_from_gym(sp) for sp in space.spaces]
+ )
+ if isinstance(space, gym.spaces.Dict) and (not isinstance(space, rlberry.spaces.Dict)):
+ converted_spaces = dict()
+ for key in space.spaces:
+ converted_spaces[key] = convert_space_from_gym(space.spaces[key])
+ return rlberry.spaces.Dict(spaces=converted_spaces)
+
+ return space
diff --git a/rlberry/spaces/multi_discrete.py b/rlberry/spaces/multi_discrete.py
index fbe78aaba..36bd836e0 100644
--- a/rlberry/spaces/multi_discrete.py
+++ b/rlberry/spaces/multi_discrete.py
@@ -1,4 +1,5 @@
import gym
+import numpy as np
from rlberry.seeding import Seeder
@@ -21,8 +22,8 @@ class MultiDiscrete(gym.spaces.MultiDiscrete):
get new random number generator
"""
- def __init__(self, nvec):
- gym.spaces.MultiDiscrete.__init__(self, nvec)
+ def __init__(self, nvec, dtype=np.int64):
+ gym.spaces.MultiDiscrete.__init__(self, nvec, dtype=dtype)
self.seeder = Seeder()
@property
diff --git a/rlberry/spaces/tests/test_from_gym.py b/rlberry/spaces/tests/test_from_gym.py
new file mode 100644
index 000000000..779c55597
--- /dev/null
+++ b/rlberry/spaces/tests/test_from_gym.py
@@ -0,0 +1,136 @@
+import numpy as np
+import pytest
+import gym.spaces
+import rlberry.spaces
+from rlberry.spaces.from_gym import convert_space_from_gym
+
+
+@pytest.mark.parametrize("n", list(range(1, 10)))
+def test_discrete_space(n):
+ gym_sp = gym.spaces.Discrete(n)
+ sp = convert_space_from_gym(gym_sp)
+ assert isinstance(sp, rlberry.spaces.Discrete)
+ sp.reseed(123)
+ for ii in range(n):
+ assert sp.contains(ii)
+
+ for ii in range(2 * n):
+ assert sp.contains(sp.sample())
+
+
+@pytest.mark.parametrize("low, high, dim",
+ [
+ (1.0, 10.0, 1),
+ (1.0, 10.0, 2),
+ (1.0, 10.0, 4),
+ (-10.0, 1.0, 1),
+ (-10.0, 1.0, 2),
+ (-10.0, 1.0, 4),
+ (-np.inf, 1.0, 1),
+ (-np.inf, 1.0, 2),
+ (-np.inf, 1.0, 4),
+ (1.0, np.inf, 1),
+ (1.0, np.inf, 2),
+ (1.0, np.inf, 4),
+ (-np.inf, np.inf, 1),
+ (-np.inf, np.inf, 2),
+ (-np.inf, np.inf, 4),
+ ])
+def test_box_space_case_1(low, high, dim):
+ shape = (dim, 1)
+ gym_sp = gym.spaces.Box(low, high, shape=shape)
+ sp = convert_space_from_gym(gym_sp)
+ assert isinstance(sp, rlberry.spaces.Box)
+ sp.reseed(123)
+ for _ in range(2 ** dim):
+ assert (sp.contains(sp.sample()))
+
+
+@pytest.mark.parametrize(
+ "low, high",
+ [
+ (np.array([0.0, 0.0, 0.0]), np.array([1.0, 1.0, 1.0])),
+ (np.array([-10.0, -10.0, -10.0]), np.array([10.0, 10.0, 10.0])),
+ (np.array([-10.0, -10.0, -10.0]), np.array([10.0, 10.0, np.inf])),
+ (np.array([-np.inf, -10.0, -10.0]), np.array([10.0, 10.0, np.inf])),
+ (np.array([-np.inf, -10.0, -10.0]), np.array([np.inf, 10.0, np.inf]))
+ ])
+def test_box_space_case_2(low, high):
+ gym_sp = gym.spaces.Box(low, high, dtype=np.float64)
+ sp = convert_space_from_gym(gym_sp)
+ assert isinstance(sp, rlberry.spaces.Box)
+ sp.reseed(123)
+ if (-np.inf in low) or (np.inf in high):
+ assert not sp.is_bounded()
+ else:
+ assert sp.is_bounded()
+ for ii in range(2 ** sp.shape[0]):
+ assert (sp.contains(sp.sample()))
+
+
+def test_tuple():
+ sp1 = gym.spaces.Box(0.0, 1.0, shape=(3, 2))
+ sp2 = gym.spaces.Discrete(2)
+ gym_sp = gym.spaces.Tuple([sp1, sp2])
+ sp = convert_space_from_gym(gym_sp)
+ assert isinstance(sp, rlberry.spaces.Tuple)
+ assert isinstance(sp.spaces[0], rlberry.spaces.Box)
+ assert isinstance(sp.spaces[1], rlberry.spaces.Discrete)
+ sp.reseed(123)
+ for _ in range(10):
+ assert sp.contains(sp.sample())
+
+
+def test_multidiscrete():
+ gym_sp = gym.spaces.MultiDiscrete([5, 2, 2])
+ sp = convert_space_from_gym(gym_sp)
+ assert isinstance(sp, rlberry.spaces.MultiDiscrete)
+ sp.reseed(123)
+ for _ in range(10):
+ assert sp.contains(sp.sample())
+
+
+def test_multibinary():
+ for n in [1, 5, [3, 4]]:
+ gym_sp = gym.spaces.MultiBinary(n)
+ sp = convert_space_from_gym(gym_sp)
+ assert isinstance(sp, rlberry.spaces.MultiBinary)
+ for _ in range(10):
+ assert sp.contains(sp.sample())
+ sp.reseed(123)
+
+
+def test_dict():
+ nested_observation_space = gym.spaces.Dict({
+ 'sensors': gym.spaces.Dict({
+ 'position': gym.spaces.Box(low=-100, high=100, shape=(3,)),
+ 'velocity': gym.spaces.Box(low=-1, high=1, shape=(3,)),
+ 'front_cam': gym.spaces.Tuple((
+ gym.spaces.Box(low=0, high=1, shape=(10, 10, 3)),
+ gym.spaces.Box(low=0, high=1, shape=(10, 10, 3))
+ )),
+ 'rear_cam': gym.spaces.Box(low=0, high=1, shape=(10, 10, 3)),
+ }),
+ 'ext_controller': gym.spaces.MultiDiscrete((5, 2, 2)),
+ 'inner_state': gym.spaces.Dict({
+ 'charge': gym.spaces.Discrete(100),
+ 'system_checks': gym.spaces.MultiBinary(10),
+ 'job_status': gym.spaces.Dict({
+ 'task': gym.spaces.Discrete(5),
+ 'progress': gym.spaces.Box(low=0, high=100, shape=()),
+ })
+ })
+ })
+ gym_sp = nested_observation_space
+ sp = convert_space_from_gym(gym_sp)
+ assert isinstance(sp, rlberry.spaces.Dict)
+ for _ in range(10):
+ assert sp.contains(sp.sample())
+ sp.reseed(123)
+
+ gym_sp2 = gym.spaces.Dict(sp.spaces)
+ sp2 = convert_space_from_gym(gym_sp2)
+ assert isinstance(sp2, rlberry.spaces.Dict)
+ for _ in range(10):
+ assert sp.contains(sp2.sample())
+ sp2.reseed(123)
diff --git a/rlberry/types.py b/rlberry/types.py
index 739b1f396..42f41d1d5 100644
--- a/rlberry/types.py
+++ b/rlberry/types.py
@@ -6,4 +6,4 @@
Env = Union[gym.Env, Tuple[Callable[..., gym.Env], Mapping[str, Any]]]
#
-Seed = Union[Seeder, int]
\ No newline at end of file
+Seed = Union[Seeder, int]
diff --git a/rlberry/utils/io.py b/rlberry/utils/io.py
new file mode 100644
index 000000000..94f7092ce
--- /dev/null
+++ b/rlberry/utils/io.py
@@ -0,0 +1,33 @@
+
+import os
+import zipfile
+import pathlib
+
+
+def zipdir(dir_path, ouput_fname):
+ """
+ Zip a directory.
+
+ Parameters
+ ----------
+ dir_path : Path or str
+ Directory to be compressed.
+ output_fname : str
+ Name of output zip file.
+
+ Returns
+ -------
+ path to zip file, or None if dir_path does not exist.
+ """
+ dir_path = pathlib.Path(dir_path)
+ if not dir_path.exists():
+ return None
+ ouput_fname = pathlib.Path(ouput_fname).with_suffix('.zip')
+ zipf = zipfile.ZipFile(ouput_fname, 'w', zipfile.ZIP_DEFLATED)
+ for root, _, files in os.walk(dir_path):
+ for file in files:
+ zipf.write(os.path.join(root, file),
+ os.path.relpath(os.path.join(root, file),
+ os.path.join(dir_path, '..')))
+ zipf.close()
+ return ouput_fname
diff --git a/rlberry/utils/writers.py b/rlberry/utils/writers.py
index eea991ab4..7e714117b 100644
--- a/rlberry/utils/writers.py
+++ b/rlberry/utils/writers.py
@@ -1,15 +1,21 @@
import logging
import numpy as np
import pandas as pd
+from collections import deque
from typing import Optional
from timeit import default_timer as timer
+from rlberry import check_packages
+from rlberry import metadata_utils
+
+if check_packages.TENSORBOARD_INSTALLED:
+ from torch.utils.tensorboard import SummaryWriter
logger = logging.getLogger(__name__)
class DefaultWriter:
"""
- Default writer to be used by the agents.
+ Default writer to be used by the agents, optionally wraps an instance of tensorboard.SummaryWriter.
Can be used in the fit() method of the agents, so
that training data can be handled by AgentManager and RemoteAgentManager.
@@ -19,26 +25,53 @@ class DefaultWriter:
name : str
Name of the writer.
log_interval : int
- Minimum number of seconds between consecutive logs.
- metadata : dict
- Extra information to be logged.
+ Minimum number of seconds between consecutive logs (with logging module).
+ tensorboard_kwargs : Optional[dict]
+ Parameters for tensorboard SummaryWriter. If provided, DefaultWriter
+ will behave as tensorboard.SummaryWriter, and will keep utilities to handle
+ data added with the add_scalar method.
+ execution_metadata : metadata_utils.ExecutionMetadata
+ Execution metadata about the object that is using the writer.
+ maxlen : Optional[int], default: None
+ If given, data stored by self._data (accessed through the property self.data) is limited
+ to `maxlen` entries.
"""
- def __init__(self, name: str, log_interval: int = 3, metadata: Optional[dict] = None):
+ def __init__(
+ self, name: str,
+ log_interval: int = 3,
+ tensorboard_kwargs: Optional[dict] = None,
+ execution_metadata: Optional[metadata_utils.ExecutionMetadata] = None,
+ maxlen: Optional[int] = None):
self._name = name
self._log_interval = log_interval
- self._metadata = metadata or dict()
+ self._execution_metadata = execution_metadata
self._data = None
self._time_last_log = None
self._log_time = True
+ self._maxlen = maxlen
self.reset()
+ # initialize tensorboard
+ if (tensorboard_kwargs is not None) and (not check_packages.TENSORBOARD_INSTALLED):
+ logger.warning('[DefaultWriter]: received tensorboard_kwargs, but tensorboard is not installed.')
+ self._tensorboard_kwargs = tensorboard_kwargs
+ self._tensorboard_logdir = None
+ self._summary_writer = None
+ if (tensorboard_kwargs is not None) and check_packages.TENSORBOARD_INSTALLED:
+ self._summary_writer = SummaryWriter(**self._tensorboard_kwargs)
+ self._tensorboard_logdir = self._summary_writer.get_logdir()
+
def reset(self):
- """Clear all data."""
+ """Clear data."""
self._data = dict()
self._initial_time = timer()
self._time_last_log = timer()
+ @property
+ def summary_writer(self):
+ return self._summary_writer
+
@property
def data(self):
df = pd.DataFrame(columns=('name', 'tag', 'value', 'global_step'))
@@ -46,9 +79,10 @@ def data(self):
df = df.append(pd.DataFrame(self._data[tag]), ignore_index=True)
return df
- def add_scalar(self, tag: str, scalar_value: float, global_step: Optional[int] = None):
+ def add_scalar(
+ self, tag: str, scalar_value: float, global_step: Optional[int] = None, walltime=None, new_style=False):
"""
- Store scalar value.
+ Behaves as SummaryWriter.add_scalar().
Note: the tag 'dw_time_elapsed' is reserved and updated internally.
It logs automatically the number of seconds elapsed
@@ -61,14 +95,27 @@ def add_scalar(self, tag: str, scalar_value: float, global_step: Optional[int] =
Value of the scalar.
global_step : int
Step where scalar was added. If None, global steps will no longer be stored for the current tag.
+ walltime : float
+ Optional override default walltime (time.time()) with seconds after epoch of event
+ new_style : bool
+ Whether to use new style (tensor field) or old
+ style (simple_value field). New style could lead to faster data loading.
+ """
+ if self._summary_writer:
+ self._summary_writer.add_scalar(tag, scalar_value, global_step, walltime, new_style)
+ self._add_scalar(tag, scalar_value, global_step)
+
+ def _add_scalar(self, tag: str, scalar_value: float, global_step: Optional[int] = None):
+ """
+ Store scalar value in self._data.
"""
# Update data structures
if tag not in self._data:
self._data[tag] = dict()
- self._data[tag]['name'] = []
- self._data[tag]['tag'] = []
- self._data[tag]['value'] = []
- self._data[tag]['global_step'] = []
+ self._data[tag]['name'] = deque(maxlen=self._maxlen)
+ self._data[tag]['tag'] = deque(maxlen=self._maxlen)
+ self._data[tag]['value'] = deque(maxlen=self._maxlen)
+ self._data[tag]['global_step'] = deque(maxlen=self._maxlen)
self._data[tag]['name'].append(self._name) # used in plots, when aggregating several writers
self._data[tag]['tag'].append(tag) # useful to convert all data to a single DataFrame
@@ -82,7 +129,7 @@ def add_scalar(self, tag: str, scalar_value: float, global_step: Optional[int] =
if global_step is not None and self._log_time:
assert tag != 'dw_time_elapsed', 'The tag dw_time_elapsed is reserved.'
self._log_time = False
- self.add_scalar(tag='dw_time_elapsed', scalar_value=timer() - self._initial_time, global_step=global_step)
+ self._add_scalar(tag='dw_time_elapsed', scalar_value=timer() - self._initial_time, global_step=global_step)
self._log_time = True
# Log
@@ -106,21 +153,39 @@ def _log(self):
max_global_step = max(max_global_step, gstep)
header = self._name
- if self._metadata:
- header += f' | {self._metadata}'
+ if self._execution_metadata:
+ header += f'[worker: {self._execution_metadata.obj_worker_id}]'
message = f'[{header}] | max_global_step = {max_global_step} | ' + message
logger.info(message)
def __getattr__(self, attr):
"""
- Avoid raising exceptions when invalid method is called, so
- that DefaultWriter does not raise exceptions when
- the code expects a tensorboard writer.
+ Calls SummaryWriter methods, if self._summary_writer is not None.
+ Otherwise, does nothing.
"""
if attr[:2] == '__':
raise AttributeError(attr)
+ if attr in self.__dict__:
+ return getattr(self, attr)
+ if self._summary_writer:
+ return getattr(self._summary_writer, attr)
def method(*args, **kwargs):
pass
-
return method
+
+ #
+ # For pickle
+ #
+ def __getstate__(self):
+ if self._summary_writer:
+ self._summary_writer.close()
+ state = self.__dict__.copy()
+ return state
+
+ def __setstate__(self, newstate):
+ # Re-create summary writer with the same logdir
+ if newstate['_summary_writer']:
+ newstate['_tensorboard_kwargs'].update(dict(log_dir=newstate['_tensorboard_logdir']))
+ newstate['_summary_writer'] = SummaryWriter(**newstate['_tensorboard_kwargs'])
+ self.__dict__.update(newstate)
diff --git a/setup.py b/setup.py
index 5134471d7..a1ad82a9d 100644
--- a/setup.py
+++ b/setup.py
@@ -60,7 +60,7 @@
setup(
name='rlberry',
- version='0.2',
+ version='0.2.1',
description='An easy-to-use reinforcement learning library for research and education',
long_description=long_description,
long_description_content_type="text/markdown",