Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated GitHub Actions Test for gRPC Training #148

Merged
merged 22 commits into from
Dec 1, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions .github/workflows/train.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
name: Test Training Code with gRPC

on:
workflow_dispatch:
push:
branches:
# - main
- "*"
pull_request:
branches:
- main

env:
ACTIONS_STEP_DEBUG: true

jobs:
train-check:
runs-on: ubuntu-latest

steps:
# Step 1: Checkout the code
- name: Checkout repository
uses: actions/checkout@v3

# Step 2: Set up Python
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10" # Specify the Python version you're using

# Step 3: Install dependencies
- name: Install dependencies
run: |
sudo apt update
sudo apt install -y libopenmpi-dev openmpi-bin
sudo apt-get install -y libgl1 libglib2.0-0

pip install -r requirements.txt

# Step 4: Run gRPC server and client
- name: Run test
run: |
cd src
# chmod +x ./configs/algo_config_test.py

echo "starting main grpc"
python main_grpc.py -n 4 -host localhost
echo "starting main"
python main.py -super true -s "./configs/sys_config_test.py"
echo "done"

# further checks:
# only 5 rounds
# gRPC only? or also MPI?
# num of samples
# num users and nodes
11 changes: 9 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
{
"python.analysis.typeCheckingMode": "strict"
}
"python.analysis.typeCheckingMode": "strict",
"sshfs.configs": [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remove this?

{
"name": "matlaber",
"host": "matlaber7.media.mit.edu",
"username": "kle"
}
]
}
1 change: 0 additions & 1 deletion src/algos/fl.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ def fed_avg(self, model_wts: List[OrderedDict[str, Tensor]]):
num_users = len(model_wts)
coeff = 1 / num_users
avgd_wts: OrderedDict[str, Tensor] = OrderedDict()

for key in model_wts[0].keys():
avgd_wts[key] = sum(coeff * m[key] for m in model_wts) # type: ignore

Expand Down
7 changes: 4 additions & 3 deletions src/configs/algo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def get_malicious_types(malicious_config_list: List[ConfigType]) -> Dict[str, st
traditional_fl: ConfigType = {
# Collaboration setup
"algo": "fedavg",
"rounds": 2,
"rounds": 1,

# Model parameters
"model": "resnet10",
Expand Down Expand Up @@ -192,7 +192,7 @@ def get_malicious_types(malicious_config_list: List[ConfigType]) -> Dict[str, st
# Collaboration setup
"algo": "fedstatic",
"topology": {"name": "watts_strogatz", "k": 3, "p": 0.2}, # type: ignore
"rounds": 20,
"rounds": 5,

# Model parameters
"model": "resnet10",
Expand Down Expand Up @@ -351,4 +351,5 @@ def get_malicious_types(malicious_config_list: List[ConfigType]) -> Dict[str, st
]


default_config_list: List[ConfigType] = [traditional_fl]
# default_config_list: List[ConfigType] = [traditional_fl]
default_config_list: List[ConfigType] = [fedstatic, fedstatic, fedstatic, fedstatic]
26 changes: 26 additions & 0 deletions src/configs/algo_config_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from utils.types import ConfigType

# fedstatic: ConfigType = {
# # Collaboration setup
# "algo": "fedstatic",
# "topology": {"name": "watts_strogatz", "k": 3, "p": 0.2}, # type: ignore
# "rounds": 1,

# # Model parameters
# "model": "resnet10",
# "model_lr": 3e-4,
# "batch_size": 256,
# }

traditional_fl: ConfigType = {
# Collaboration setup
"algo": "fedavg",
"rounds": 1,

# Model parameters
"model": "resnet10",
"model_lr": 3e-4,
"batch_size": 256,
}

# default_config_list: List[ConfigType] = [fedstatic, fedstatic, fedstatic, fedstatic]
21 changes: 12 additions & 9 deletions src/configs/sys_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,20 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE):
CIAR10_DPATH = "./datasets/imgs/cifar10/"

NUM_COLLABORATORS = 1
DUMP_DIR = "/mas/camera/Experiments/SONAR/abhi/"
# DUMP_DIR = "../../../../../../../home/"
DUMP_DIR = "./"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the default dump directory as /tmp
The risk with keeping "./" as dump directory is that the dump_dir might try to copy itself in dump_dir kicking off a recursive cascade


num_users = 3
mpi_system_config: ConfigType = {
"exp_id": "",
"comm": {"type": "MPI"},
"num_users": num_users,
"num_collaborators": NUM_COLLABORATORS,
"dset": CIFAR10_DSET,
"dump_dir": DUMP_DIR,
"dpath": CIAR10_DPATH,
"seed": 32,
# "seed": 32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure to not include any unnecessary comments

"seed": 2,
# node_0 is a server currently
# The device_ids dictionary depicts the GPUs on which the nodes reside.
# For a single-GPU environment, the config will look as follows (as it follows a 0-based indexing):
Expand All @@ -177,11 +181,9 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE):
# "algo": get_algo_configs(num_users=3, algo_configs=algo_configs_list),
"algos": get_algo_configs(
num_users=3,
algo_configs=malicious_algo_config_list,
assignment_method="distribution",
distribution={0: 1, 1: 1, 2: 1},
algo_configs=default_config_list
), # type: ignore
"samples_per_user": 1000, # TODO: To model scenarios where different users have different number of samples
"samples_per_user": 5555, # TODO: To model scenarios where different users have different number of samples
# we need to make this a dictionary with user_id as key and number of samples as value
"train_label_distribution": "iid", # Either "iid", "non_iid" "support"
"test_label_distribution": "iid", # Either "iid", "non_iid" "support"
Expand Down Expand Up @@ -316,7 +318,7 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE):
"exp_keys": [],
}

num_users = 9
num_users = 4

dropout_dict = {
"distribution_dict": { # leave dict empty to disable dropout
Expand Down Expand Up @@ -347,7 +349,8 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE):
"device_ids": get_device_ids(num_users, gpu_ids),
# "algos": get_algo_configs(num_users=num_users, algo_configs=default_config_list), # type: ignore
"algos": get_algo_configs(num_users=num_users, algo_configs=[fedstatic]), # type: ignore
"samples_per_user": 50000 // num_users, # distributed equally
# "samples_per_user": 50000 // num_users, # distributed equally
"samples_per_user": 100,
"train_label_distribution": "non_iid",
"test_label_distribution": "iid",
"alpha_data": 1.0,
Expand All @@ -356,6 +359,6 @@ def get_digit_five_support(num_users: int, domains: List[str] = DIGIT_FIVE):
"test_samples_per_user": 200,
}


current_config = grpc_system_config
# current_config = mpi_system_config

126 changes: 126 additions & 0 deletions src/configs/sys_config_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from typing import Dict, List, Literal, Optional
import random
from utils.types import ConfigType

from .algo_config_test import (
traditional_fl
)

def get_device_ids(num_users: int, gpus_available: List[int | Literal["cpu"]]) -> Dict[str, List[int | Literal["cpu"]]]:
"""
Get the GPU device IDs for the users.
"""
# TODO: Make it multi-host
device_ids: Dict[str, List[int | Literal["cpu"]]] = {}
for i in range(num_users + 1): # +1 for the super-node
index = i % len(gpus_available)
gpu_id = gpus_available[index]
device_ids[f"node_{i}"] = [gpu_id]
return device_ids


def get_algo_configs(
num_users: int,
algo_configs: List[ConfigType],
assignment_method: Literal[
"sequential", "random", "mapping", "distribution"
] = "sequential",
seed: Optional[int] = 1,
mapping: Optional[List[int]] = None,
distribution: Optional[Dict[int, int]] = None,
) -> Dict[str, ConfigType]:
"""
Assign an algorithm configuration to each node, allowing for repetition.
sequential: Assigns the algo_configs sequentially to the nodes
random: Assigns the algo_configs randomly to the nodes
mapping: Assigns the algo_configs based on the mapping of node index to algo index provided
distribution: Assigns the algo_configs based on the distribution of algo index to number of nodes provided
"""
algo_config_map: Dict[str, ConfigType] = {}
algo_config_map["node_0"] = algo_configs[0] # Super-node
if assignment_method == "sequential":
for i in range(1, num_users + 1):
algo_config_map[f"node_{i}"] = algo_configs[i % len(algo_configs)]
elif assignment_method == "random":
for i in range(1, num_users + 1):
algo_config_map[f"node_{i}"] = random.choice(algo_configs)
elif assignment_method == "mapping":
if not mapping:
raise ValueError("Mapping must be provided for assignment method 'mapping'")
assert len(mapping) == num_users
for i in range(1, num_users + 1):
algo_config_map[f"node_{i}"] = algo_configs[mapping[i - 1]]
elif assignment_method == "distribution":
if not distribution:
raise ValueError(
"Distribution must be provided for assignment method 'distribution'"
)
total_users = sum(distribution.values())
assert total_users == num_users

# List of node indices to assign
node_indices = list(range(1, total_users + 1))
# Seed for reproducibility
random.seed(seed)
# Shuffle the node indices based on the seed
random.shuffle(node_indices)

# Assign nodes based on the shuffled indices
current_index = 0
for algo_index, num_nodes in distribution.items():
for i in range(num_nodes):
node_id = node_indices[current_index]
algo_config_map[f"node_{node_id}"] = algo_configs[algo_index]
current_index += 1
else:
raise ValueError(f"Invalid assignment method: {assignment_method}")
# print("algo config mapping is: ", algo_config_map)
return algo_config_map

CIFAR10_DSET = "cifar10"
CIAR10_DPATH = "./datasets/imgs/cifar10/"

# DUMP_DIR = "../../../../../../../home/"
DUMP_DIR = "./"

NUM_COLLABORATORS = 1
num_users = 4

dropout_dict = {
"distribution_dict": { # leave dict empty to disable dropout
"method": "uniform", # "uniform", "normal"
"parameters": {} # "mean": 0.5, "std": 0.1 in case of normal distribution
},
"dropout_rate": 0.0, # cutoff for dropout: [0,1]
"dropout_correlation": 0.0, # correlation between dropouts of successive rounds: [0,1]
}

dropout_dicts = {"node_0": {}}
for i in range(1, num_users + 1):
dropout_dicts[f"node_{i}"] = dropout_dict

gpu_ids = [2, 3, 5, 6]

grpc_system_config: ConfigType = {
"exp_id": "static",
"num_users": num_users,
"num_collaborators": NUM_COLLABORATORS,
"comm": {"type": "GRPC", "synchronous": True, "peer_ids": ["localhost:50048"]}, # The super-node
"dset": CIFAR10_DSET,
"dump_dir": DUMP_DIR,
"dpath": CIAR10_DPATH,
"seed": 2,
"device_ids": get_device_ids(num_users, gpu_ids),
# "algos": get_algo_configs(num_users=num_users, algo_configs=default_config_list), # type: ignore
"algos": get_algo_configs(num_users=num_users, algo_configs=[traditional_fl]), # type: ignore
# "samples_per_user": 50000 // num_users, # distributed equally
"samples_per_user": 100,
"train_label_distribution": "non_iid",
"test_label_distribution": "iid",
"alpha_data": 1.0,
"exp_keys": [],
"dropout_dicts": dropout_dicts,
"test_samples_per_user": 200,
}

current_config = grpc_system_config
1 change: 0 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,5 @@
# Start the scheduler
scheduler.install_config()
scheduler.initialize()

# Run the job
scheduler.run_job()
2 changes: 1 addition & 1 deletion src/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ def initialize(self, copy_souce_code: bool = True) -> None:
random.seed(seed)
numpy.random.seed(seed)
self.merge_configs()

if self.communication.get_rank() == 0:
if copy_souce_code:
copy_source_code(self.config)
Expand All @@ -130,6 +129,7 @@ def initialize(self, copy_souce_code: bool = True) -> None:
rank=self.communication.get_rank(),
comm_utils=self.communication,
)
self.communication.send_quorum()

def run_job(self) -> None:
self.node.run_protocol()
Expand Down
8 changes: 6 additions & 2 deletions src/utils/communication/comm_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from enum import Enum
from utils.communication.grpc.main import GRPCCommunication
from typing import Any, Dict, List, TYPE_CHECKING
# from utils.communication.mpi import MPICommUtils
from utils.communication.mpi import MPICommUtils
from mpi4py import MPI

if TYPE_CHECKING:
from algos.base_class import BaseNode
Expand All @@ -20,7 +21,7 @@ def create_communication(
):
comm_type = comm_type
if comm_type == CommunicationType.MPI:
raise NotImplementedError("MPI's new version not yet implemented. Please use GRPC. See https://github.com/aidecentralized/sonar/issues/96 for more details.")
return MPICommUtils(config)
elif comm_type == CommunicationType.GRPC:
return GRPCCommunication(config)
elif comm_type == CommunicationType.HTTP:
Expand Down Expand Up @@ -70,6 +71,9 @@ def receive(self, node_ids: List[int]) -> Any:
def broadcast(self, data: Any, tag: int = 0):
self.comm.broadcast(data)

def send_quorum(self):
self.comm.send_quorum()

def all_gather(self, tag: int = 0):
return self.comm.all_gather()

Expand Down
Loading