Skip to content

Commit

Permalink
New grpc (#34)
Browse files Browse the repository at this point in the history
* reduce logging

* revamp communication interface; mpi version works in this commit

* enable strict type checking

* first version of grpc working

* automate configuration

* first version working end to end for fl.py

* add instructions for later

* reduce memory footprint and improve type checking

* keep a small model as default
  • Loading branch information
tremblerz authored Aug 17, 2024
1 parent fc633c1 commit eb345dc
Show file tree
Hide file tree
Showing 23 changed files with 1,111 additions and 203 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"python.analysis.typeCheckingMode": "strict"
}
5 changes: 5 additions & 0 deletions src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ Client Metrics

Server Metrics
<img src="../resources/images/Server_metrics.png" width=50% height=50%>

### Debugging instructions
GRPC simulation starts a lot of threads and even if one of them fail right now then you will have to kill all of them and start all over.
So, here is a command to get the pid of all the threads and kill them all at once:
`for pid in $(ps aux|grep 'python main.py -r' | cut -b 10-16); do kill -9 $pid; done`
79 changes: 35 additions & 44 deletions src/algos/base_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
from torch.utils.data import DataLoader, Subset

from collections import OrderedDict
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
from torch import Tensor
import copy
import random
import numpy as np

from utils.communication.comm_utils import CommunicationManager
from utils.plot_utils import PlotUtils
from utils.comm_utils import CommUtils
from utils.data_utils import (
random_samples,
filter_by_class,
Expand All @@ -28,15 +28,15 @@
get_dset_balanced_communities,
get_dset_communities,
)
import torchvision.transforms as T
import torchvision.transforms as T # type: ignore
import os

from yolo import YOLOLoss

class BaseNode(ABC):
def __init__(self, config) -> None:
self.comm_utils = CommUtils()
self.node_id = self.comm_utils.rank
def __init__(self, config: Dict[str, Any], comm_utils: CommunicationManager) -> None:
self.comm_utils = comm_utils
self.node_id = self.comm_utils.get_rank()

if self.node_id == 0:
self.log_dir = config['log_path']
Expand All @@ -54,21 +54,21 @@ def __init__(self, config) -> None:
if isinstance(config["dset"], dict):
if self.node_id != 0:
config["dset"].pop("0")
self.dset = config["dset"][str(self.node_id)]
self.dset = str(config["dset"][str(self.node_id)])
config["dpath"] = config["dpath"][self.dset]
else:
self.dset = config["dset"]

self.setup_cuda(config)
self.model_utils = ModelUtils()
self.model_utils = ModelUtils(self.device)

self.dset_obj = get_dataset(self.dset, dpath=config["dpath"])
self.set_constants()

def set_constants(self):
self.best_acc = 0.0

def setup_cuda(self, config):
def setup_cuda(self, config: Dict[str, Any]):
# Need a mapping from rank to device id
device_ids_map = config["device_ids"]
node_name = "node_{}".format(self.node_id)
Expand All @@ -82,7 +82,7 @@ def setup_cuda(self, config):
self.device = torch.device("cpu")
print("Using CPU")

def set_model_parameters(self, config):
def set_model_parameters(self, config: Dict[str, Any]):
# Model related parameters
optim_name = config.get("optimizer", "adam")
if optim_name == "adam":
Expand Down Expand Up @@ -149,7 +149,7 @@ def set_shared_exp_parameters(self, config):
self.log_utils.log_console("Communities: {}".format(self.communities))

@abstractmethod
def run_protocol(self):
def run_protocol(self) -> None:
raise NotImplementedError


Expand All @@ -158,8 +158,8 @@ class BaseClient(BaseNode):
Abstract class for all algorithms
"""

def __init__(self, config) -> None:
super().__init__(config)
def __init__(self, config, comm_utils) -> None:
super().__init__(config, comm_utils)
self.server_node = 0
self.set_parameters(config)

Expand Down Expand Up @@ -215,8 +215,8 @@ def set_data_parameters(self, config):
train_dset = self.dset_obj.train_dset
test_dset = self.dset_obj.test_dset

print("num train", len(train_dset))
print("num test", len(test_dset))
# print("num train", len(train_dset))
# print("num test", len(test_dset))

if config.get("test_samples_per_class", None) is not None:
test_dset, _ = balanced_subset(test_dset, config["test_samples_per_class"])
Expand Down Expand Up @@ -369,19 +369,19 @@ def is_same_dest(dset):
# TODO: fix print_data_summary
# self.print_data_summary(train_dset, test_dset, val_dset=val_dset)

def local_train(self, dataset, **kwargs):
def local_train(self, round: int, **kwargs: Any) -> None:
"""
Train the model locally
"""
raise NotImplementedError

def local_test(self, dataset, **kwargs):
def local_test(self, **kwargs: Any) -> float | Tuple[float, float] | None:
"""
Test the model locally
"""
raise NotImplementedError

def get_representation(self, **kwargs):
def get_representation(self, **kwargs: Any) -> OrderedDict[str, Tensor] | List[Tensor] | Tensor:
"""
Share the model representation
"""
Expand Down Expand Up @@ -416,30 +416,26 @@ def print_data_summary(self, train_test, test_dset, val_dset=None):
print("test count: ", i)
i += 1

print("Node: {} data distribution summary".format(self.node_id))
print(type(train_sample_per_class.items()))
print(
"Train samples per class: {}".format(sorted(train_sample_per_class.items()))
)
print(
"Train samples per class: {}".format(len(train_sample_per_class.items()))
)
if val_dset is not None:
print(
"Val samples per class: {}".format(len(val_sample_per_class.items()))
)
print(
"Test samples per class: {}".format(len(test_sample_per_class.items()))
)
# print("Node: {} data distribution summary".format(self.node_id))
# print(
# "Train samples per class: {}".format(sorted(train_sample_per_class.items()))
# )
# if val_dset is not None:
# print(
# "Val samples per class: {}".format(sorted(val_sample_per_class.items()))
# )
# print(
# "Test samples per class: {}".format(sorted(test_sample_per_class.items()))
# )


class BaseServer(BaseNode):
"""
Abstract class for orchestrator
"""

def __init__(self, config) -> None:
super().__init__(config)
def __init__(self, config, comm_utils) -> None:
super().__init__(config, comm_utils)
self.num_users = config["num_users"]
self.users = list(range(1, self.num_users + 1))
self.set_data_parameters(config)
Expand All @@ -449,13 +445,13 @@ def set_data_parameters(self, config):
batch_size = config["batch_size"]
self._test_loader = DataLoader(test_dset, batch_size=batch_size)

def aggregate(self, representation_list, **kwargs):
def aggregate(self, representation_list: List[OrderedDict[str, Tensor]], **kwargs: Any) -> OrderedDict[str, Tensor]:
"""
Aggregate the knowledge from the users
"""
raise NotImplementedError

def test(self, dataset, **kwargs):
def test(self, **kwargs: Any) -> List[float]:
"""
Test the model on the server
"""
Expand Down Expand Up @@ -668,10 +664,5 @@ def __init__(self, config, comm_protocol=CommProtocol) -> None:
super().__init__(config)
self.tag = comm_protocol

def send_representations(self, representations, tag=None):
for user_node in self.users:
self.comm_utils.send_signal(
dest=user_node,
data=representations,
tag=self.tag.REPRS_SHARE if tag is None else tag,
)
def send_representations(self, representations: Dict[int, OrderedDict[str, Tensor]]):
self.comm_utils.broadcast(representations)
Loading

0 comments on commit eb345dc

Please sign in to comment.