Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed Framework #327

Draft
wants to merge 29 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2567fe5
added demo notebooks
threewisemonkeys-as Aug 27, 2020
1aebe3c
Merge branch 'master' of https://github.com/SforAiDl/genrl
threewisemonkeys-as Aug 31, 2020
e3b8a8a
Merge branch 'master' of https://github.com/SforAiDl/genrl
threewisemonkeys-as Sep 3, 2020
4cba727
initial structure
threewisemonkeys-as Sep 3, 2020
3d233c4
added mp
threewisemonkeys-as Sep 6, 2020
1c504cc
add files
threewisemonkeys-as Sep 20, 2020
2c3298a
added new structure on rpc
threewisemonkeys-as Oct 1, 2020
73586d5
working structure
threewisemonkeys-as Oct 7, 2020
9ef6845
fixed integration bugs
threewisemonkeys-as Oct 7, 2020
072d545
removed unneccary files
threewisemonkeys-as Oct 7, 2020
64db1c1
added support for running from multiple scripts
threewisemonkeys-as Oct 8, 2020
4d57a06
added evaluate to trainer
threewisemonkeys-as Oct 8, 2020
f325429
added proxy getter
threewisemonkeys-as Oct 23, 2020
7ce19ec
added rpc backend option
threewisemonkeys-as Oct 23, 2020
cfba909
added logging to trainer
threewisemonkeys-as Oct 23, 2020
992a3a9
Added more options to trainer
threewisemonkeys-as Oct 23, 2020
bf1a50a
moved load weights to user
threewisemonkeys-as Oct 23, 2020
e2eef66
decreased number of eval its
threewisemonkeys-as Oct 23, 2020
837eb18
removed train wrapper
threewisemonkeys-as Oct 23, 2020
7fcbb23
removed loop to user fn
threewisemonkeys-as Oct 26, 2020
0002fa4
added example for secondary node
threewisemonkeys-as Oct 26, 2020
bebf50f
removed original exmpale
threewisemonkeys-as Oct 26, 2020
29bd1d6
removed fn
threewisemonkeys-as Oct 26, 2020
18536a2
shifted examples
threewisemonkeys-as Oct 29, 2020
8f859d6
shifted logger to base class
threewisemonkeys-as Oct 29, 2020
555e290
added on policy example
threewisemonkeys-as Oct 29, 2020
59e960c
removed temp example
threewisemonkeys-as Oct 29, 2020
8d5a8b6
got on policy distributed example to work
threewisemonkeys-as Oct 29, 2020
8030b2a
formatting
threewisemonkeys-as Oct 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions examples/distributed/offpolicy_distributed_primary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from genrl.distributed import (
Master,
ExperienceServer,
ParameterServer,
ActorNode,
LearnerNode,
)
from genrl.core import ReplayBuffer
from genrl.agents import DDPG
from genrl.trainers import DistributedTrainer
from genrl.utils import Logger
import gym
import torch.distributed.rpc as rpc
import time


N_ACTORS = 1
BUFFER_SIZE = 5000
MAX_ENV_STEPS = 500
TRAIN_STEPS = 5000
BATCH_SIZE = 64
INIT_BUFFER_SIZE = 1000
WARMUP_STEPS = 1000


def collect_experience(agent, parameter_server, experience_server, learner):
while not learner.is_completed():
agent.load_weights(parameter_server.get_weights())
obs = agent.env.reset()
done = False
for i in range(MAX_ENV_STEPS):
action = (
agent.env.action_space.sample()
if i < WARMUP_STEPS
else agent.select_action(obs)
)
next_obs, reward, done, _ = agent.env.step(action)
experience_server.push((obs, action, reward, next_obs, done))
obs = next_obs
if done:
break


class MyTrainer(DistributedTrainer):
def __init__(
self, agent, train_steps, batch_size, init_buffer_size, log_interval=200
):
super(MyTrainer, self).__init__(agent)
self.train_steps = train_steps
self.batch_size = batch_size
self.init_buffer_size = init_buffer_size
self.log_interval = log_interval

def train(self, parameter_server, experience_server):
while experience_server.__len__() < self.init_buffer_size:
time.sleep(1)
for i in range(self.train_steps):
batch = experience_server.sample(self.batch_size)
if batch is None:
continue
self.agent.update_params(1, batch)
parameter_server.store_weights(self.agent.get_weights())
if i % self.log_interval == 0:
self.evaluate(i)


master = Master(
world_size=5,
address="localhost",
port=29502,
proc_start_method="fork",
rpc_backend=rpc.BackendType.TENSORPIPE,
)
env = gym.make("Pendulum-v0")
agent = DDPG("mlp", env)
parameter_server = ParameterServer("param-0", master, agent.get_weights(), rank=1)
buffer = ReplayBuffer(BUFFER_SIZE)
experience_server = ExperienceServer("experience-0", master, buffer, rank=2)
trainer = MyTrainer(agent, TRAIN_STEPS, BATCH_SIZE, INIT_BUFFER_SIZE)
learner = LearnerNode("learner-0", master, "param-0", "experience-0", trainer, rank=3)
actors = [
ActorNode(
name=f"actor-{i}",
master=master,
parameter_server_name="param-0",
experience_server_name="experience-0",
learner_name="learner-0",
agent=agent,
collect_experience=collect_experience,
rank=i + 4,
)
for i in range(N_ACTORS)
]
80 changes: 80 additions & 0 deletions examples/distributed/offpolicy_distributed_secondary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from genrl.distributed import (
Master,
ExperienceServer,
ParameterServer,
ActorNode,
LearnerNode,
WeightHolder,
)
from genrl.core import ReplayBuffer
from genrl.agents import DDPG
from genrl.trainers import DistributedTrainer
import gym
import argparse
import torch.multiprocessing as mp


N_ACTORS = 2
BUFFER_SIZE = 10
MAX_ENV_STEPS = 100
TRAIN_STEPS = 10
BATCH_SIZE = 1


def collect_experience(agent, experience_server_rref):
obs = agent.env.reset()
done = False
for i in range(MAX_ENV_STEPS):
action = agent.select_action(obs)
next_obs, reward, done, info = agent.env.step(action)
experience_server_rref.rpc_sync().push((obs, action, reward, next_obs, done))
obs = next_obs
if done:
break


# class MyTrainer(DistributedTrainer):
# def __init__(self, agent, train_steps, batch_size):
# super(MyTrainer, self).__init__(agent)
# self.train_steps = train_steps
# self.batch_size = batch_size

# def train(self, parameter_server_rref, experience_server_rref):
# i = 0
# while i < self.train_steps:
# batch = experience_server_rref.rpc_sync().sample(self.batch_size)
# if batch is None:
# continue
# self.agent.update_params(batch, 1)
# parameter_server_rref.rpc_sync().store_weights(self.agent.get_weights())
# print(f"Trainer: {i + 1} / {self.train_steps} steps completed")
# i += 1


mp.set_start_method("fork")

master = Master(world_size=8, address="localhost", port=29500, secondary=True)
env = gym.make("Pendulum-v0")
agent = DDPG("mlp", env)
# parameter_server = ParameterServer(
# "param-0", master, WeightHolder(agent.get_weights()), rank=1
# )
# buffer = ReplayBuffer(BUFFER_SIZE)
# experience_server = ExperienceServer("experience-0", master, buffer, rank=2)
# trainer = MyTrainer(agent, TRAIN_STEPS, BATCH_SIZE)
# learner = LearnerNode(
# "learner-0", master, parameter_server, experience_server, trainer, rank=3
# )
actors = [
ActorNode(
name=f"actor-{i+2}",
master=master,
parameter_server_name="param-0",
experience_server_name="experience-0",
learner_name="learner-0",
agent=agent,
collect_experience=collect_experience,
rank=i + 6,
)
for i in range(N_ACTORS)
]
Loading