From dbea41e0cbf4d141949e1cda35cb217e0805daa5 Mon Sep 17 00:00:00 2001 From: Abhishek Singh Date: Wed, 21 Aug 2024 20:17:30 -0400 Subject: [PATCH] Multiple machines (#55) * define config data type * improve grpc execution * fix missing attribute bug * add capability to run across multiple machines * update docs * update the docs --- docs/getting-started/grpc.md | 44 ++++++++++++++++++++++++++++ mkdocs.yml | 1 + src/README.md | 4 --- src/configs/algo_config.py | 12 +++++--- src/configs/sys_config.py | 5 ++-- src/main.py | 16 ++++++---- src/main_grpc.py | 35 ++++++++++------------ src/scheduler.py | 12 ++++---- src/utils/communication/grpc/main.py | 12 ++++++-- src/utils/model_utils.py | 1 + 10 files changed, 98 insertions(+), 44 deletions(-) create mode 100644 docs/getting-started/grpc.md diff --git a/docs/getting-started/grpc.md b/docs/getting-started/grpc.md new file mode 100644 index 0000000..b4dc8d0 --- /dev/null +++ b/docs/getting-started/grpc.md @@ -0,0 +1,44 @@ +# Multi-agent collaboration with gRPC + +In this tutorial, we will discuss how to use gRPC for training models across multiple machines. If you have not already read the [Getting Started](./getting-started.md) guide, we recommend you do so before proceeding. + +> **_NOTE:_** If you are running experiments on a single machine right now then MPI is easier to set up and get started. + +## Overview +The main advantage of our abstract communication layer is that the same code runs regardless of the fact you are using MPI or gRPC underneath. As long as the communication layer is implemented correctly, the rest of the code remains the same. This is a huge advantage for the framework as it allows us to switch between different communication layers without changing the code. + +## Running the code +Let's say you want to run the decentralized training with 80 users on 4 machines. Our implementation currently requires a coordinating node to manage the orchestration. Therefore, there will be 81 nodes in total. Make sure `sys_config.py` has `num_users: 80` in the config. You should run the following command on all 4 machines: + +``` bash +python main_grpc.py -n 20 -host randomhost42.mit.edu +``` + +On **one** of the machines that you want to use as a coordinator node (let's say it is `randomhost43.mit.edu`), change the `peer_ids` with the hostname and the port you want to run the coordinator node and then run the following command: + +``` bash +python main.py -super true +``` + +> **_NOTE:_** Most of the algorithms right now do not use the new communication protocol, hence you can only use the old MPI version with them. We are working on updating the algorithms to use the new communication protocol. + +## Protocol +1. Each node registers to the coordinating node and receives a unique rank. +2. Each node then tries to find available ports to start a listener. +3. Once a quorum has been reached, the coordinating node sends a message to all nodes with the list of all nodes and their ports. +4. Each node then starts local training. + + +## FAQ +1. **Is it really decentralized if it requires a coordinating node?** + - Yes, it is. The coordinating node is required for all nodes to discover each other. In fact, going forward, we plan to provide support for several nodes to act as coordinating nodes. This will make the system more robust and fault-tolerant. We are looking for contributors to implement a distributed hash table (DHT) to make our system more like BitTorrent. So, if you have a knack for distributed systems, please reach out to us. +2. **Why do I need main_grpc.py and main.py?** + - The main.py file is the actual file. However, if you are running lots of nodes, it is easier to run the main_grpc.py file which will automatically run the main.py file `n` times. This is just a convenience script. +3. **How do I know if the nodes are communicating?** + - You can check the console logs of the coordinating node. It will print out the messages it is sending and receiving. You can also check the logs of the nodes to see their local training status. +4. **My training is stuck or failed. How do I stop and restart?** + - GRPC simulation starts a lot of threads and even if one of them fail right now then you will have to kill all of them and start all over. So, here is a command to get the pid of all the threads and kill them all at once: + ``` bash + for pid in $(ps aux|grep 'python main.py' | cut -b 10-16); do kill -9 $pid; done + ``` + Make sure you remove the experiment folder before starting again. diff --git a/mkdocs.yml b/mkdocs.yml index 5f818e5..09d6dc7 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -12,6 +12,7 @@ nav: - Main: getting-started.md - Config File: getting-started/config.md - Customizability: getting-started/customize.md + - Using GRPC: getting-started/grpc.md - CollaBench: - Main: collabench.md - Feature Comparison: feature.md diff --git a/src/README.md b/src/README.md index 49136fe..24690ad 100644 --- a/src/README.md +++ b/src/README.md @@ -32,7 +32,3 @@ Client Metrics Server Metrics -### Debugging instructions -GRPC simulation starts a lot of threads and even if one of them fail right now then you will have to kill all of them and start all over. -So, here is a command to get the pid of all the threads and kill them all at once: -`for pid in $(ps aux|grep 'python main.py -r' | cut -b 10-16); do kill -9 $pid; done` \ No newline at end of file diff --git a/src/configs/algo_config.py b/src/configs/algo_config.py index 95ff516..736af58 100644 --- a/src/configs/algo_config.py +++ b/src/configs/algo_config.py @@ -1,6 +1,9 @@ +from typing import TypeAlias, Dict, List + +ConfigType: TypeAlias = Dict[str, str|float|int|bool|List[str]] # Algorithm Configuration -iid_dispfl_clients_new = { +iid_dispfl_clients_new: ConfigType = { "algo": "dispfl", "exp_id": 12, "exp_type": "iid_dispfl", @@ -16,7 +19,7 @@ "exp_keys": [] } -traditional_fl = { +traditional_fl: ConfigType = { "algo": "fedavg", "exp_id": 10, "exp_type": "iid_clients_federated", @@ -28,7 +31,7 @@ "exp_keys": [], } -fedavg_object_detect = { +fedavg_object_detect: ConfigType = { "algo": "fedavg", "exp_id": "test_modular_yolo", "exp_type": "test", @@ -40,4 +43,5 @@ "exp_keys": [], } -current_config = fedavg_object_detect +# current_config = fedavg_object_detect +current_config = traditional_fl diff --git a/src/configs/sys_config.py b/src/configs/sys_config.py index bfe99fd..36924b4 100644 --- a/src/configs/sys_config.py +++ b/src/configs/sys_config.py @@ -55,8 +55,9 @@ def get_device_ids(num_users: int, gpus_available: List[int]) -> Dict[str, List[ device_ids[f"node_{i}"] = [gpu_id] return device_ids -num_users = 10 -gpu_ids = [0, 1, 2, 3, 4, 5, 6, 7] +num_users = 80 +gpu_ids = [1, 2, 3, 4, 5, 6, 7] +# gpu_ids = [1, 2, 3, 4, 5, 7] grpc_system_config = { "num_users": num_users, "comm": { diff --git a/src/main.py b/src/main.py index 4880f22..508d734 100644 --- a/src/main.py +++ b/src/main.py @@ -11,7 +11,6 @@ B_DEFAULT = "./configs/algo_config.py" S_DEFAULT = "./configs/sys_config.py" -RANK_DEFAULT = None parser = argparse.ArgumentParser(description="Run collaborative learning experiments") parser.add_argument( @@ -29,17 +28,22 @@ help=f"filepath for system config, default: {S_DEFAULT}", ) parser.add_argument( - "-r", + "-super", nargs="?", - default=RANK_DEFAULT, - type=int, - help=f"rank of the node, default: {RANK_DEFAULT}", + type=bool, + help=f"whether to run the super node", +) +parser.add_argument( + "-host", + nargs="?", + type=str, + help=f"host address of the nodes", ) args = parser.parse_args() scheduler = Scheduler() -scheduler.assign_config_by_path(args.s, args.b, args.r) +scheduler.assign_config_by_path(args.s, args.b, args.super, args.host) print("Config loaded") scheduler.install_config() diff --git a/src/main_grpc.py b/src/main_grpc.py index 546e6c0..a8692ba 100644 --- a/src/main_grpc.py +++ b/src/main_grpc.py @@ -1,37 +1,32 @@ """ -This module runs collaborative learning experiments using the Scheduler class. +This module runs main.py n times. +Usage: python main_grpc.py -n """ import argparse -import logging import subprocess -from utils.config_utils import load_config - -logging.getLogger("PIL").setLevel(logging.INFO) - -S_DEFAULT = "./configs/sys_config.py" -RANK_DEFAULT = 0 +parser = argparse.ArgumentParser(description="Number of nodes to run on this machine") +parser.add_argument( + "-n", + nargs="?", + type=int, + help=f"number of nodes to run on this machine", +) -parser = argparse.ArgumentParser(description="Run collaborative learning experiments") parser.add_argument( - "-s", + "-host", nargs="?", - default=S_DEFAULT, type=str, - help=f"filepath for system config, default: {S_DEFAULT}", + help=f"host address of the nodes", ) args = parser.parse_args() -sys_config = load_config(args.s) -print("Sys config loaded") - -# 1. find the number of users in the system configuration -# 2. start separate processes by running python main.py for each user +command_list = ["python", "main.py", "-host", args.host] +# if the super-node is to be started on this machine -num_users = sys_config["num_users"] + 1 # +1 for the super-node -for i in range(num_users): +for i in range(args.n): print(f"Starting process for user {i}") # start a Popen process - subprocess.Popen(["python", "main.py", "-r", str(i)]) \ No newline at end of file + subprocess.Popen(command_list) diff --git a/src/scheduler.py b/src/scheduler.py index 568a936..b61cf7a 100644 --- a/src/scheduler.py +++ b/src/scheduler.py @@ -4,7 +4,6 @@ """ import os -import random from typing import Any, Dict import torch @@ -32,7 +31,6 @@ from utils.communication.comm_utils import CommunicationManager from utils.config_utils import load_config, process_config from utils.log_utils import copy_source_code, check_and_create_path -import os # Mapping of algorithm names to their corresponding client and server classes so that they can be consumed by the scheduler later on. @@ -74,10 +72,13 @@ def __init__(self) -> None: def install_config(self) -> None: self.config: Dict[str, Any] = process_config(self.config) - def assign_config_by_path(self, sys_config_path: Dict[str, Any], algo_config_path: Dict[str, Any], rank: int|None = None) -> None: + def assign_config_by_path(self, sys_config_path: str, algo_config_path: str, is_super_node: bool|None = None, host: str|None = None) -> None: self.sys_config = load_config(sys_config_path) - if rank is not None: - self.sys_config["comm"]["rank"] = rank + if is_super_node: + self.sys_config["comm"]["rank"] = 0 + else: + self.sys_config["comm"]["host"] = host + self.sys_config["comm"]["rank"] = None self.algo_config = load_config(algo_config_path) self.merge_configs() @@ -90,6 +91,7 @@ def initialize(self, copy_souce_code: bool=True) -> None: assert self.config is not None, "Config should be set when initializing" self.communication = CommunicationManager(self.config) + self.config["comm"]["rank"] = self.communication.get_rank() # Base clients modify the seed later on seed = self.config["seed"] torch.manual_seed(seed) # type: ignore diff --git a/src/utils/communication/grpc/main.py b/src/utils/communication/grpc/main.py index 82695b1..870f3bd 100644 --- a/src/utils/communication/grpc/main.py +++ b/src/utils/communication/grpc/main.py @@ -137,11 +137,16 @@ def __init__(self, config: Dict[str, Dict[str, Any]]): # 4. The nodes will execute rest of the protocol in the same way as before self.num_users: int = int(config["num_users"]) # type: ignore self.rank: int|None = config["comm"]["rank"] + # TODO: Get rid of peer_ids now that we are passing [comm][host] self.super_node_host: str = config["comm"]["peer_ids"][0] if self.rank == 0: node_id: List[str] = self.super_node_host.split(":") self.host: str = node_id[0] self.port: int = int(node_id[1]) + else: + # get hostname based on ip address + self.host: str = config["comm"]["host"] + pass self.listener: Any = None self.servicer = Servicer(self.super_node_host) @@ -167,7 +172,7 @@ def register(self): self.port = get_port(self.rank, self.num_users) # type: ignore because we are setting it in the register method rank = comm_pb2.Rank(rank=self.rank) # type: ignore port = comm_pb2.Port(port=self.port) - peer_id = comm_pb2.PeerId(rank=rank, port=port) + peer_id = comm_pb2.PeerId(rank=rank, port=port, ip=self.host) stub.update_port(peer_id) # type: ignore def start_listener(self): @@ -176,9 +181,10 @@ def start_listener(self): ('grpc.max_receive_message_length', 100 * 1024 * 1024), # 100MB ]) comm_pb2_grpc.add_CommunicationServerServicer_to_server(self.servicer, self.listener) # type: ignore - self.listener.add_insecure_port(f'[::]:{self.port}') + address = f"{self.host}:{self.port}" + self.listener.add_insecure_port(address) self.listener.start() - print(f'Started listener on port {self.port}') + print(f'Started listener on {address}') def peer_ids_to_proto(self, peer_ids: OrderedDict[int, Dict[str, int|str]]) -> Dict[int, comm_pb2.PeerId]: peer_ids_proto: Dict[int, comm_pb2.PeerId] = {} diff --git a/src/utils/model_utils.py b/src/utils/model_utils.py index 14ea93b..384c0a3 100644 --- a/src/utils/model_utils.py +++ b/src/utils/model_utils.py @@ -43,6 +43,7 @@ def get_model( pretrained:bool=False, **kwargs: Any ) -> nn.Module: + self.dset = dset # TODO: add support for loading checkpointed models model_name = model_name.lower() if model_name == "resnet10":