Skip to content

Commit

Permalink
Merge pull request #3 from aidecentralized/main
Browse files Browse the repository at this point in the history
Update
  • Loading branch information
aymann121 authored Oct 27, 2024
2 parents 74e0bef + eb8c4b7 commit d6b4e5e
Show file tree
Hide file tree
Showing 11 changed files with 399 additions and 42 deletions.
58 changes: 58 additions & 0 deletions docs/getting-started/experiments.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Automating Experiments

In this tutorial, we will discuss how to automate running multiple experiments by customizing our experiment script. Note that we currently only support automation on one machine with the gRPC protocol. If you have not already read the [Getting Started](./getting-started.md) guide, we recommend you do so before proceeding.

## Running the Code
The `main_exp.py` file automates running experiments on one machine using gRPC. You can run this file with the command:
``` bash
python main_exp.py -host randomhost42.mit.edu
```

## Customizing the Experiments
To customize your experiment automation, make these changes in `main_exp.py`.

1. Specify your constant settings in `sys_config.py` and `algo_config.py`
2. Import the sys_config and algo_config setting objects you want to use for your experiments.
``` python
from configs.algo_config import traditional_fl
from configs.sys_config import grpc_system_config
```

3. Write the experiment object like the example `exp_dict`, mapping each new experiment ID to the set of keys that you want to change per experiment. Specify the `algo_config` and its specific customizations in `algo`, and likewise for `sys_config` and `sys`. *Note every experiment must have a unique experiment path, and we recommend guarenteeing this by giving every experiment a unique experiment id.*
``` python
exp_dict = exp_dict = {
"test_automation_1": {
"algo_config": traditional_fl,
"sys_config": grpc_system_config,
"algo": {
"rounds": 3,
},
"sys": {
"seed": 3,
"num_users": 3,
},
},
"test_automation_2": {
"algo_config": traditional_fl,
"sys_config": grpc_system_config,
"algo": {
"rounds": 4,
},
"sys": {
"seed": 4,
"num_users": 4,
},
},
}
```


4. (Optional) Specify whether or not to run post hoc metrics and plots by setting the boolean at the top of the file.
``` bash
post_hoc_plot: bool = True
```

5. Start the experiments with the command.
``` bash
python main_exp.py -host randomhost42.mit.edu
```
10 changes: 5 additions & 5 deletions docs/getting-started/grpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ In this tutorial, we will discuss how to use gRPC for training models across mul
The main advantage of our abstract communication layer is that the same code runs regardless of the fact you are using MPI or gRPC underneath. As long as the communication layer is implemented correctly, the rest of the code remains the same. This is a huge advantage for the framework as it allows us to switch between different communication layers without changing the code.

## Running the code
Let's say you want to run the decentralized training with 80 users on 4 machines. Our implementation currently requires a coordinating node to manage the orchestration. Therefore, there will be 81 nodes in total. Make sure `sys_config.py` has `num_users: 80` in the config. You should run the following command on all 4 machines:
Let's say you want to run the decentralized training with 80 users on 4 machines. Our implementation currently requires a coordinating node to manage the orchestration. Therefore, there will be 81 nodes in total. In the `sys_config.py`, specify the hostname and port you want to run the coordinator node (i.e. `"comm": { "type": "GRPC", "peer_ids": ["randomhost41.mit.edu:5003"] # the coordinator port will be specified here }`), and set `num_users: 80`.

On the machine that you want to run the coordinator node on, start the coordinator by running the following command:
``` bash
python main_grpc.py -n 20 -host randomhost42.mit.edu
python main.py -super true
```

On **one** of the machines that you want to use as a coordinator node (let's say it is `randomhost43.mit.edu`), change the `peer_ids` with the hostname and the port you want to run the coordinator node and then run the following command:

Then, start the user threads by running the following command on all 4 machines (change the name of the host per machine you are using, and note that you may need to open a new terminal if you are using the same machine as the supernode):
``` bash
python main.py -super true
python main_grpc.py -n 20 -host randomhost42.mit.edu
```

> **_NOTE:_** Most of the algorithms right now do not use the new communication protocol, hence you can only use the old MPI version with them. We are working on updating the algorithms to use the new communication protocol.
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ nav:
- Config File: getting-started/config.md
- Customizability: getting-started/customize.md
- Using GRPC: getting-started/grpc.md
- Automating Experiments: getting-started/experiments.md
- CollaBench:
- Main: collabench.md
- Feature Comparison: feature.md
Expand Down
34 changes: 23 additions & 11 deletions src/algos/base_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,13 @@ def get_model_weights(self) -> Dict[str, Tensor]:
"""
Share the model weights
"""
return self.model.state_dict()
message = {"sender": self.node_id, "round": self.round, "model": self.model.state_dict()}

# Move to CPU before sending
for key in message["model"].keys():
message["model"][key] = message["model"][key].to("cpu")

return message

def get_local_rounds(self) -> int:
return self.round
Expand Down Expand Up @@ -307,7 +313,7 @@ def receive_and_aggregate(self):
raise NotImplementedError


def strip_empty_models(self, models_wts: List[OrderedDict[str, Tensor]],
def strip_empty_models(self, models_wts: List[OrderedDict[str, Any]],
collab_weights: Optional[List[float]] = None) -> Any:
repr_list = []
if collab_weights is not None:
Expand Down Expand Up @@ -606,7 +612,12 @@ def receive_and_aggregate(self):
"""
if self.is_working:
repr = self.comm_utils.receive([self.server_node])[0]
self.set_model_weights(repr)
if "round" in repr:
round = repr["round"]
if "sender" in repr:
sender = repr["sender"]
assert "model" in repr, "Model not found in the received message"
self.set_model_weights(repr["model"])

def run_protocol(self) -> None:
raise NotImplementedError
Expand Down Expand Up @@ -673,7 +684,7 @@ def set_data_parameters(self, config: Dict[str, Any]) -> None:
self._test_loader = DataLoader(test_dset, batch_size=batch_size)

def aggregate(
self, representation_list: List[OrderedDict[str, Tensor]], **kwargs: Any
self, representation_list: List[OrderedDict[str, Any]], **kwargs: Any
) -> OrderedDict[str, Tensor]:
"""
Aggregate the knowledge from the users
Expand Down Expand Up @@ -745,15 +756,9 @@ def local_test(self, **kwargs: Any) -> Tuple[float, float]:
self.model_utils.save_model(self.model, self.model_save_path)
return test_loss, acc

def get_model_weights(self) -> OrderedDict[str, Tensor]:
"""
Share the model weights (on the cpu)
"""
return OrderedDict({k: v.cpu() for k, v in self.model.state_dict().items()})

def aggregate(
self,
models_wts: List[OrderedDict[str, Tensor]],
models_wts: List[OrderedDict[str, Any]],
collab_weights: Optional[List[float]] = None,
keys_to_ignore: List[str] = [],
) -> None:
Expand All @@ -768,6 +773,7 @@ def aggregate(
Returns:
None
"""

models_coeffs: List[Tuple[OrderedDict[str, Tensor], float]] = []
# insert the current model weights at the position self.node_id
models_wts.insert(self.node_id - 1, self.get_model_weights())
Expand All @@ -778,6 +784,12 @@ def aggregate(
models_wts, collab_weights = self.strip_empty_models(models_wts, collab_weights)
collab_weights = [w / sum(collab_weights) for w in collab_weights]

senders = [model["sender"] for model in models_wts if "sender" in model]
rounds = [model["round"] for model in models_wts if "round" in model]
for i in range(len(models_wts)):
assert "model" in models_wts[i], "Model not found in the received message"
models_wts[i] = models_wts[i]["model"]

for idx, model_wts in enumerate(models_wts):
models_coeffs.append((model_wts, collab_weights[idx]))

Expand Down
28 changes: 20 additions & 8 deletions src/algos/fl.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,42 @@ def local_test(self, **kwargs: Any):
return [test_loss, test_acc, time_taken]


def get_model_weights(self, **kwargs: Any) -> Dict[str, Tensor]:
def get_model_weights(self, **kwargs: Any) -> Dict[str, Any]:
"""
Overwrite the get_model_weights method of the BaseNode
to add malicious attacks
TODO: this should be moved to BaseClient
"""

message = {"sender": self.node_id, "round": self.round}

malicious_type = self.config.get("malicious_type", "normal")

if malicious_type == "normal":
return self.model.state_dict() # type: ignore
message["model"] = self.model.state_dict() # type: ignore
elif malicious_type == "bad_weights":
# Corrupt the weights
return BadWeightsAttack(
message["model"] = BadWeightsAttack(
self.config, self.model.state_dict()
).get_representation()
elif malicious_type == "sign_flip":
# Flip the sign of the weights, also TODO: consider label flipping
return SignFlipAttack(
message["model"] = SignFlipAttack(
self.config, self.model.state_dict()
).get_representation()
elif malicious_type == "add_noise":
# Add noise to the weights
return AddNoiseAttack(
message["model"] = AddNoiseAttack(
self.config, self.model.state_dict()
).get_representation()
else:
return self.model.state_dict() # type: ignore
return self.model.state_dict() # type: ignore
message["model"] = self.model.state_dict() # type: ignore

# move the model to cpu before sending
for key in message["model"].keys():
message["model"][key] = message["model"][key].to("cpu")

return message # type: ignore

def run_protocol(self):
stats: Dict[str, Any] = {}
Expand Down Expand Up @@ -106,13 +113,18 @@ def fed_avg(self, model_wts: List[OrderedDict[str, Tensor]]):
return avgd_wts

def aggregate(
self, representation_list: List[OrderedDict[str, Tensor]], **kwargs: Any
self, representation_list: List[OrderedDict[str, Any]], **kwargs: Any
) -> OrderedDict[str, Tensor]:
"""
Aggregate the model weights
"""
representation_list, _ = self.strip_empty_models(representation_list)
if len(representation_list) > 0:
senders = [rep["sender"] for rep in representation_list if "sender" in rep]
rounds = [rep["round"] for rep in representation_list if "round" in rep]
for i in range(len(representation_list)):
representation_list[i] = representation_list[i]["model"]

avg_wts = self.fed_avg(representation_list)
return avg_wts
else:
Expand Down
15 changes: 13 additions & 2 deletions src/configs/sys_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def get_algo_configs(
assignment_method: Literal[
"sequential", "random", "mapping", "distribution"
] = "sequential",
seed: Optional[int] = 1,
mapping: Optional[List[int]] = None,
distribution: Optional[Dict[int, int]] = None,
) -> Dict[str, ConfigType]:
Expand Down Expand Up @@ -75,10 +76,20 @@ def get_algo_configs(
)
total_users = sum(distribution.values())
assert total_users == num_users
current_index = 1

# List of node indices to assign
node_indices = list(range(1, total_users + 1))
# Seed for reproducibility
random.seed(seed)
# Shuffle the node indices based on the seed
random.shuffle(node_indices)

# Assign nodes based on the shuffled indices
current_index = 0
for algo_index, num_nodes in distribution.items():
for i in range(num_nodes):
algo_config_map[f"node_{current_index}"] = algo_configs[algo_index]
node_id = node_indices[current_index]
algo_config_map[f"node_{node_id}"] = algo_configs[algo_index]
current_index += 1
else:
raise ValueError(f"Invalid assignment method: {assignment_method}")
Expand Down
71 changes: 62 additions & 9 deletions src/main_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,43 @@
import subprocess
from typing import List

from utils.types import ConfigType
from utils.config_utils import process_config
from utils.post_hoc_plot_utils_copy import aggregate_metrics_across_users, plot_all_metrics

from configs.sys_config import get_algo_configs, get_device_ids
from configs.algo_config import traditional_fl
from configs.sys_config import grpc_system_config

post_hoc_plot: bool = True

# for each experiment key, write the modifications to the config file
gpu_ids = [2, 3, 5, 6]
exp_dict = {
"experiment_1": {
"algo_config": traditional_fl,
"sys_config": grpc_system_config,
"algo": {
"rounds": 3,
},
"sys": {
"seed": 3,
"num_users": 3,
},
},
"experiment_2": {
"algo_config": traditional_fl,
"sys_config": grpc_system_config,
"algo": {
"rounds": 4,
},
"sys": {
"seed": 4,
"num_users": 4,
},
},
}

# parse the arguments
parser = argparse.ArgumentParser(description="host address of the nodes")
parser.add_argument(
Expand All @@ -21,20 +56,28 @@

args = parser.parse_args()

# for each experiment key
# write the new config file
exp_ids = ["test_automation_1", "test_automation_2", "test_automation_3"]
for exp_id, exp_config in exp_dict.items():
# update the algo config with config settings
base_algo_config = exp_config["algo_config"].copy()
base_algo_config.update(exp_config["algo"])

# update the sys config with config settings
base_sys_config = exp_config["sys_config"].copy()
base_sys_config.update(exp_config["sys"])

for e, exp_id in enumerate(exp_ids):
current_config = grpc_system_config
current_config["exp_id"] = exp_id
# set up the full config file by combining the algo and sys config
n: int = base_sys_config["num_users"]
seed: int = base_sys_config["seed"]
base_sys_config["algos"] = get_algo_configs(num_users=n, algo_configs=[base_algo_config], seed=seed)
base_sys_config["device_ids"] = get_device_ids(n, gpu_ids)

full_config = base_sys_config.copy()
full_config["exp_id"] = exp_id

# write the config file as python file configs/temp_config.py
with open("./configs/temp_config.py", "w") as f:
f.write("current_config = ")
f.write(str(current_config))

n: int = current_config["num_users"]
f.write(str(full_config))

# start the supernode
supernode_command: List[str] = ["python", "main.py", "-host", args.host, "-super", "true", "-s", "./configs/temp_config.py"]
Expand All @@ -51,5 +94,15 @@
# Wait for the supernode process to finish
process.wait()

# run the post-hoc analysis
if post_hoc_plot:
full_config = process_config(full_config) # this populates the results path
logs_dir = full_config["results_path"] + '/logs/'

# aggregate metrics across all users
aggregate_metrics_across_users(logs_dir)
# plot all metrics
plot_all_metrics(logs_dir)

# Continue with the next set of commands after supernode finishes
print(f"Supernode process {exp_id} finished. Proceeding to next set of commands.")
Loading

0 comments on commit d6b4e5e

Please sign in to comment.