Skip to content

Commit

Permalink
first version working end to end for fl.py
Browse files Browse the repository at this point in the history
  • Loading branch information
tremblerz committed Aug 15, 2024
1 parent 773bdb4 commit cca0128
Show file tree
Hide file tree
Showing 7 changed files with 495 additions and 72 deletions.
40 changes: 3 additions & 37 deletions src/configs/sys_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# TODO: Set up multiple non-iid configurations here. The goal of a separate system config
# is to simulate different real-world scenarios without changing the algorithm configuration.
from typing import Dict, List
import socket


mpi_system_config = {
Expand All @@ -27,58 +26,25 @@
"folder_deletion_signal_path":"./expt_dump/folder_deletion.signal"
}

def is_port_available(port: int) -> bool:
"""
Check if a port is available for use.
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: # type: ignore
return s.connect_ex(('localhost', port)) != 0 # type: ignore


def generate_ports(num_users: int) -> List[int]:
"""
Generate a list of ports that are available for use.
"""
ports: List[int] = []
i = 0
while len(ports) < num_users:
port = 50051 + i
# check if the port is available
if is_port_available(port):
ports.append(port)
else:
print(f"Port {port} is not available, skipping...")
i += 1
return ports

def generate_peer_ids(num_users: int) -> List[str]:
"""
Generate a list of peer IDs for the users.
"""
peer_ids: List[str] = []
ports = generate_ports(num_users)
for i in range(num_users):
peer_ids.append(f"localhost:{ports[i]}")
return peer_ids

def get_device_ids(num_users: int, gpus_available: List[int]) -> Dict[str, List[int]]:
"""
Get the GPU device IDs for the users.
"""
# TODO: Make it multi-host
device_ids: Dict[str, List[int]] = {}
for i in range(num_users):
index = i % len(gpus_available)
gpu_id = gpus_available[index]
device_ids[f"node_{i}"] = [gpu_id]
return device_ids

num_users = 50
num_users = 10
gpu_ids = [0, 1, 2, 3, 4, 5, 6, 7]
grpc_system_config = {
"num_users": num_users,
"comm": {
"type": "GRPC",
"all_peer_ids": generate_peer_ids(num_users + 1) # +1 for the super-node,
"peer_ids": ["localhost:50050"] # The super-node
},
"dset": "cifar10",
"dump_dir": "./expt_dump/",
Expand Down
2 changes: 1 addition & 1 deletion src/main_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
# 1. find the number of users in the system configuration
# 2. start separate processes by running python main.py for each user

num_users = sys_config["num_users"] + 1 # +1 for the server
num_users = sys_config["num_users"] + 1 # +1 for the super-node
for i in range(num_users):
print(f"Starting process for user {i}")
# start a Popen process
Expand Down
28 changes: 27 additions & 1 deletion src/utils/communication/grpc/comm.proto
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
syntax = "proto3";

service CommunicationServer {
rpc SendData (Data) returns (Empty) {}
rpc send_data (Data) returns (Empty) {}
rpc get_rank (Empty) returns (Rank) {}
rpc update_port (PeerId) returns (Empty) {}
rpc send_peer_ids (PeerIds) returns (Empty) {}
rpc send_quorum (Quorum) returns (Empty) {}
}

message Empty {}
Expand All @@ -14,3 +18,25 @@ message Data {
string id = 1;
Model model = 2;
}

message Rank {
int32 rank = 1;
}

message Port {
int32 port = 1;
}

message PeerId {
Rank rank = 1;
Port port = 2;
string ip = 3;
}

message PeerIds {
map<int32, PeerId> peer_ids = 1;
}

message Quorum {
bool quorum = 1;
}
20 changes: 17 additions & 3 deletions src/utils/communication/grpc/comm_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions src/utils/communication/grpc/comm_pb2.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from google.protobuf.internal import containers as _containers
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Mapping as _Mapping, Optional as _Optional, Union as _Union

DESCRIPTOR: _descriptor.FileDescriptor

class Empty(_message.Message):
__slots__ = ()
def __init__(self) -> None: ...

class Model(_message.Message):
__slots__ = ("buffer",)
BUFFER_FIELD_NUMBER: _ClassVar[int]
buffer: bytes
def __init__(self, buffer: _Optional[bytes] = ...) -> None: ...

class Data(_message.Message):
__slots__ = ("id", "model")
ID_FIELD_NUMBER: _ClassVar[int]
MODEL_FIELD_NUMBER: _ClassVar[int]
id: str
model: Model
def __init__(self, id: _Optional[str] = ..., model: _Optional[_Union[Model, _Mapping]] = ...) -> None: ...

class Rank(_message.Message):
__slots__ = ("rank",)
RANK_FIELD_NUMBER: _ClassVar[int]
rank: int
def __init__(self, rank: _Optional[int] = ...) -> None: ...

class Port(_message.Message):
__slots__ = ("port",)
PORT_FIELD_NUMBER: _ClassVar[int]
port: int
def __init__(self, port: _Optional[int] = ...) -> None: ...

class PeerId(_message.Message):
__slots__ = ("rank", "port", "ip")
RANK_FIELD_NUMBER: _ClassVar[int]
PORT_FIELD_NUMBER: _ClassVar[int]
IP_FIELD_NUMBER: _ClassVar[int]
rank: Rank
port: Port
ip: str
def __init__(self, rank: _Optional[_Union[Rank, _Mapping]] = ..., port: _Optional[_Union[Port, _Mapping]] = ..., ip: _Optional[str] = ...) -> None: ...

class PeerIds(_message.Message):
__slots__ = ("peer_ids",)
class PeerIdsEntry(_message.Message):
__slots__ = ("key", "value")
KEY_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
key: int
value: PeerId
def __init__(self, key: _Optional[int] = ..., value: _Optional[_Union[PeerId, _Mapping]] = ...) -> None: ...
PEER_IDS_FIELD_NUMBER: _ClassVar[int]
peer_ids: _containers.MessageMap[int, PeerId]
def __init__(self, peer_ids: _Optional[_Mapping[int, PeerId]] = ...) -> None: ...

class Quorum(_message.Message):
__slots__ = ("quorum",)
QUORUM_FIELD_NUMBER: _ClassVar[int]
quorum: bool
def __init__(self, quorum: bool = ...) -> None: ...
Loading

0 comments on commit cca0128

Please sign in to comment.