Skip to content

Commit

Permalink
Multiple machines (#55)
Browse files Browse the repository at this point in the history
* define config data type

* improve grpc execution

* fix missing attribute bug

* add capability to run across multiple machines

* update docs

* update the docs
  • Loading branch information
tremblerz authored Aug 22, 2024
1 parent e716dd1 commit dbea41e
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 44 deletions.
44 changes: 44 additions & 0 deletions docs/getting-started/grpc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Multi-agent collaboration with gRPC

In this tutorial, we will discuss how to use gRPC for training models across multiple machines. If you have not already read the [Getting Started](./getting-started.md) guide, we recommend you do so before proceeding.

> **_NOTE:_** If you are running experiments on a single machine right now then MPI is easier to set up and get started.
## Overview
The main advantage of our abstract communication layer is that the same code runs regardless of the fact you are using MPI or gRPC underneath. As long as the communication layer is implemented correctly, the rest of the code remains the same. This is a huge advantage for the framework as it allows us to switch between different communication layers without changing the code.

## Running the code
Let's say you want to run the decentralized training with 80 users on 4 machines. Our implementation currently requires a coordinating node to manage the orchestration. Therefore, there will be 81 nodes in total. Make sure `sys_config.py` has `num_users: 80` in the config. You should run the following command on all 4 machines:

``` bash
python main_grpc.py -n 20 -host randomhost42.mit.edu
```

On **one** of the machines that you want to use as a coordinator node (let's say it is `randomhost43.mit.edu`), change the `peer_ids` with the hostname and the port you want to run the coordinator node and then run the following command:

``` bash
python main.py -super true
```

> **_NOTE:_** Most of the algorithms right now do not use the new communication protocol, hence you can only use the old MPI version with them. We are working on updating the algorithms to use the new communication protocol.
## Protocol
1. Each node registers to the coordinating node and receives a unique rank.
2. Each node then tries to find available ports to start a listener.
3. Once a quorum has been reached, the coordinating node sends a message to all nodes with the list of all nodes and their ports.
4. Each node then starts local training.


## FAQ
1. **Is it really decentralized if it requires a coordinating node?**
- Yes, it is. The coordinating node is required for all nodes to discover each other. In fact, going forward, we plan to provide support for several nodes to act as coordinating nodes. This will make the system more robust and fault-tolerant. We are looking for contributors to implement a distributed hash table (DHT) to make our system more like BitTorrent. So, if you have a knack for distributed systems, please reach out to us.
2. **Why do I need main_grpc.py and main.py?**
- The main.py file is the actual file. However, if you are running lots of nodes, it is easier to run the main_grpc.py file which will automatically run the main.py file `n` times. This is just a convenience script.
3. **How do I know if the nodes are communicating?**
- You can check the console logs of the coordinating node. It will print out the messages it is sending and receiving. You can also check the logs of the nodes to see their local training status.
4. **My training is stuck or failed. How do I stop and restart?**
- GRPC simulation starts a lot of threads and even if one of them fail right now then you will have to kill all of them and start all over. So, here is a command to get the pid of all the threads and kill them all at once:
``` bash
for pid in $(ps aux|grep 'python main.py' | cut -b 10-16); do kill -9 $pid; done
```
Make sure you remove the experiment folder before starting again.
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ nav:
- Main: getting-started.md
- Config File: getting-started/config.md
- Customizability: getting-started/customize.md
- Using GRPC: getting-started/grpc.md
- CollaBench:
- Main: collabench.md
- Feature Comparison: feature.md
Expand Down
4 changes: 0 additions & 4 deletions src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,3 @@ Client Metrics
Server Metrics
<img src="../resources/images/Server_metrics.png" width=50% height=50%>

### Debugging instructions
GRPC simulation starts a lot of threads and even if one of them fail right now then you will have to kill all of them and start all over.
So, here is a command to get the pid of all the threads and kill them all at once:
`for pid in $(ps aux|grep 'python main.py -r' | cut -b 10-16); do kill -9 $pid; done`
12 changes: 8 additions & 4 deletions src/configs/algo_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import TypeAlias, Dict, List

ConfigType: TypeAlias = Dict[str, str|float|int|bool|List[str]]
# Algorithm Configuration

iid_dispfl_clients_new = {
iid_dispfl_clients_new: ConfigType = {
"algo": "dispfl",
"exp_id": 12,
"exp_type": "iid_dispfl",
Expand All @@ -16,7 +19,7 @@
"exp_keys": []
}

traditional_fl = {
traditional_fl: ConfigType = {
"algo": "fedavg",
"exp_id": 10,
"exp_type": "iid_clients_federated",
Expand All @@ -28,7 +31,7 @@
"exp_keys": [],
}

fedavg_object_detect = {
fedavg_object_detect: ConfigType = {
"algo": "fedavg",
"exp_id": "test_modular_yolo",
"exp_type": "test",
Expand All @@ -40,4 +43,5 @@
"exp_keys": [],
}

current_config = fedavg_object_detect
# current_config = fedavg_object_detect
current_config = traditional_fl
5 changes: 3 additions & 2 deletions src/configs/sys_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ def get_device_ids(num_users: int, gpus_available: List[int]) -> Dict[str, List[
device_ids[f"node_{i}"] = [gpu_id]
return device_ids

num_users = 10
gpu_ids = [0, 1, 2, 3, 4, 5, 6, 7]
num_users = 80
gpu_ids = [1, 2, 3, 4, 5, 6, 7]
# gpu_ids = [1, 2, 3, 4, 5, 7]
grpc_system_config = {
"num_users": num_users,
"comm": {
Expand Down
16 changes: 10 additions & 6 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

B_DEFAULT = "./configs/algo_config.py"
S_DEFAULT = "./configs/sys_config.py"
RANK_DEFAULT = None

parser = argparse.ArgumentParser(description="Run collaborative learning experiments")
parser.add_argument(
Expand All @@ -29,17 +28,22 @@
help=f"filepath for system config, default: {S_DEFAULT}",
)
parser.add_argument(
"-r",
"-super",
nargs="?",
default=RANK_DEFAULT,
type=int,
help=f"rank of the node, default: {RANK_DEFAULT}",
type=bool,
help=f"whether to run the super node",
)
parser.add_argument(
"-host",
nargs="?",
type=str,
help=f"host address of the nodes",
)

args = parser.parse_args()

scheduler = Scheduler()
scheduler.assign_config_by_path(args.s, args.b, args.r)
scheduler.assign_config_by_path(args.s, args.b, args.super, args.host)
print("Config loaded")

scheduler.install_config()
Expand Down
35 changes: 15 additions & 20 deletions src/main_grpc.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,32 @@
"""
This module runs collaborative learning experiments using the Scheduler class.
This module runs main.py n times.
Usage: python main_grpc.py -n <number of nodes>
"""

import argparse
import logging
import subprocess

from utils.config_utils import load_config

logging.getLogger("PIL").setLevel(logging.INFO)

S_DEFAULT = "./configs/sys_config.py"
RANK_DEFAULT = 0
parser = argparse.ArgumentParser(description="Number of nodes to run on this machine")
parser.add_argument(
"-n",
nargs="?",
type=int,
help=f"number of nodes to run on this machine",
)

parser = argparse.ArgumentParser(description="Run collaborative learning experiments")
parser.add_argument(
"-s",
"-host",
nargs="?",
default=S_DEFAULT,
type=str,
help=f"filepath for system config, default: {S_DEFAULT}",
help=f"host address of the nodes",
)

args = parser.parse_args()

sys_config = load_config(args.s)
print("Sys config loaded")

# 1. find the number of users in the system configuration
# 2. start separate processes by running python main.py for each user
command_list = ["python", "main.py", "-host", args.host]
# if the super-node is to be started on this machine

num_users = sys_config["num_users"] + 1 # +1 for the super-node
for i in range(num_users):
for i in range(args.n):
print(f"Starting process for user {i}")
# start a Popen process
subprocess.Popen(["python", "main.py", "-r", str(i)])
subprocess.Popen(command_list)
12 changes: 7 additions & 5 deletions src/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"""

import os
import random
from typing import Any, Dict

import torch
Expand Down Expand Up @@ -32,7 +31,6 @@
from utils.communication.comm_utils import CommunicationManager
from utils.config_utils import load_config, process_config
from utils.log_utils import copy_source_code, check_and_create_path
import os


# Mapping of algorithm names to their corresponding client and server classes so that they can be consumed by the scheduler later on.
Expand Down Expand Up @@ -74,10 +72,13 @@ def __init__(self) -> None:
def install_config(self) -> None:
self.config: Dict[str, Any] = process_config(self.config)

def assign_config_by_path(self, sys_config_path: Dict[str, Any], algo_config_path: Dict[str, Any], rank: int|None = None) -> None:
def assign_config_by_path(self, sys_config_path: str, algo_config_path: str, is_super_node: bool|None = None, host: str|None = None) -> None:
self.sys_config = load_config(sys_config_path)
if rank is not None:
self.sys_config["comm"]["rank"] = rank
if is_super_node:
self.sys_config["comm"]["rank"] = 0
else:
self.sys_config["comm"]["host"] = host
self.sys_config["comm"]["rank"] = None
self.algo_config = load_config(algo_config_path)
self.merge_configs()

Expand All @@ -90,6 +91,7 @@ def initialize(self, copy_souce_code: bool=True) -> None:
assert self.config is not None, "Config should be set when initializing"
self.communication = CommunicationManager(self.config)

self.config["comm"]["rank"] = self.communication.get_rank()
# Base clients modify the seed later on
seed = self.config["seed"]
torch.manual_seed(seed) # type: ignore
Expand Down
12 changes: 9 additions & 3 deletions src/utils/communication/grpc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,16 @@ def __init__(self, config: Dict[str, Dict[str, Any]]):
# 4. The nodes will execute rest of the protocol in the same way as before
self.num_users: int = int(config["num_users"]) # type: ignore
self.rank: int|None = config["comm"]["rank"]
# TODO: Get rid of peer_ids now that we are passing [comm][host]
self.super_node_host: str = config["comm"]["peer_ids"][0]
if self.rank == 0:
node_id: List[str] = self.super_node_host.split(":")
self.host: str = node_id[0]
self.port: int = int(node_id[1])
else:
# get hostname based on ip address
self.host: str = config["comm"]["host"]
pass
self.listener: Any = None
self.servicer = Servicer(self.super_node_host)

Expand All @@ -167,7 +172,7 @@ def register(self):
self.port = get_port(self.rank, self.num_users) # type: ignore because we are setting it in the register method
rank = comm_pb2.Rank(rank=self.rank) # type: ignore
port = comm_pb2.Port(port=self.port)
peer_id = comm_pb2.PeerId(rank=rank, port=port)
peer_id = comm_pb2.PeerId(rank=rank, port=port, ip=self.host)
stub.update_port(peer_id) # type: ignore

def start_listener(self):
Expand All @@ -176,9 +181,10 @@ def start_listener(self):
('grpc.max_receive_message_length', 100 * 1024 * 1024), # 100MB
])
comm_pb2_grpc.add_CommunicationServerServicer_to_server(self.servicer, self.listener) # type: ignore
self.listener.add_insecure_port(f'[::]:{self.port}')
address = f"{self.host}:{self.port}"
self.listener.add_insecure_port(address)
self.listener.start()
print(f'Started listener on port {self.port}')
print(f'Started listener on {address}')

def peer_ids_to_proto(self, peer_ids: OrderedDict[int, Dict[str, int|str]]) -> Dict[int, comm_pb2.PeerId]:
peer_ids_proto: Dict[int, comm_pb2.PeerId] = {}
Expand Down
1 change: 1 addition & 0 deletions src/utils/model_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def get_model(
pretrained:bool=False,
**kwargs: Any
) -> nn.Module:
self.dset = dset
# TODO: add support for loading checkpointed models
model_name = model_name.lower()
if model_name == "resnet10":
Expand Down

0 comments on commit dbea41e

Please sign in to comment.