Skip to content

Commit

Permalink
refactor(framework) Remove unused rpcs in Fleet API and presence …
Browse files Browse the repository at this point in the history
…in framework (#4874)
  • Loading branch information
jafermarq authored Jan 29, 2025
1 parent 1899570 commit 1350ef4
Show file tree
Hide file tree
Showing 13 changed files with 65 additions and 619 deletions.
29 changes: 2 additions & 27 deletions src/proto/flwr/proto/fleet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ syntax = "proto3";
package flwr.proto;

import "flwr/proto/node.proto";
import "flwr/proto/task.proto";
import "flwr/proto/run.proto";
import "flwr/proto/fab.proto";
import "flwr/proto/message.proto";
Expand All @@ -28,17 +27,13 @@ service Fleet {
rpc DeleteNode(DeleteNodeRequest) returns (DeleteNodeResponse) {}
rpc Ping(PingRequest) returns (PingResponse) {}

// Retrieve one or more tasks, if possible
// Retrieve one or more messages, if possible
//
// HTTP API path: /api/v1/fleet/pull-task-ins
rpc PullTaskIns(PullTaskInsRequest) returns (PullTaskInsResponse) {}
// HTTP API path: /api/v1/fleet/pull-messages
rpc PullMessages(PullMessagesRequest) returns (PullMessagesResponse) {}

// Complete one or more tasks, if possible
// Complete one or more messages, if possible
//
// HTTP API path: /api/v1/fleet/push-task-res
rpc PushTaskRes(PushTaskResRequest) returns (PushTaskResResponse) {}
// HTTP API path: /api/v1/fleet/push-messages
rpc PushMessages(PushMessagesRequest) returns (PushMessagesResponse) {}

Expand All @@ -63,26 +58,6 @@ message PingRequest {
}
message PingResponse { bool success = 1; }

// PullTaskIns messages
message PullTaskInsRequest {
Node node = 1;
repeated string task_ids = 2;
}
message PullTaskInsResponse {
Reconnect reconnect = 1;
repeated TaskIns task_ins_list = 2;
}

// PushTaskRes messages
message PushTaskResRequest {
Node node = 1;
repeated TaskRes task_res_list = 2;
}
message PushTaskResResponse {
Reconnect reconnect = 1;
map<string, uint32> results = 2;
}

// PullMessages messages
message PullMessagesRequest {
Node node = 1;
Expand Down
61 changes: 16 additions & 45 deletions src/py/flwr/client/grpc_rere_client/client_interceptor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,11 @@
DeleteNodeResponse,
PullMessagesRequest,
PullMessagesResponse,
PullTaskInsRequest,
PullTaskInsResponse,
PushMessagesRequest,
PushMessagesResponse,
PushTaskResRequest,
PushTaskResResponse,
)
from flwr.proto.node_pb2 import Node # pylint: disable=E0611
from flwr.proto.run_pb2 import GetRunRequest, GetRunResponse # pylint: disable=E0611
from flwr.proto.task_pb2 import Task, TaskIns # pylint: disable=E0611


class _MockServicer:
Expand All @@ -85,41 +80,27 @@ def unary_unary( # pylint: disable=too-many-return-statements
return CreateNodeResponse(node=Node(node_id=123))
if isinstance(request, DeleteNodeRequest):
return DeleteNodeResponse()
if isinstance(request, PushTaskResRequest):
return PushTaskResResponse()
if isinstance(request, PushMessagesRequest):
return PushMessagesResponse()
if isinstance(request, GetRunRequest):
return GetRunResponse()
if isinstance(request, PullMessagesRequest):

msg = Message(
metadata=Metadata(
run_id=1234,
message_id="",
src_node_id=123,
dst_node_id=SUPERLINK_NODE_ID,
group_id="",
ttl=DEFAULT_TTL,
message_type="mock",
reply_to_message="",
),
content=RecordSet(),
)
proto_msg = serde.message_to_proto(msg)
proto_msg.metadata.created_at = now().timestamp()
return PullMessagesResponse(messages_list=[])

return PullTaskInsResponse(
task_ins_list=[
TaskIns(
task=Task(
consumer=Node(node_id=123),
recordset=serde.recordset_to_proto(RecordSet()),
)
)
]

msg = Message(
metadata=Metadata(
run_id=1234,
message_id="",
src_node_id=123,
dst_node_id=SUPERLINK_NODE_ID,
group_id="",
ttl=DEFAULT_TTL,
message_type="mock",
reply_to_message="",
),
content=RecordSet(),
)
proto_msg = serde.message_to_proto(msg)
proto_msg.metadata.created_at = now().timestamp()
return PullMessagesResponse(messages_list=[])

def received_client_metadata(
self,
Expand All @@ -146,21 +127,11 @@ def _add_generic_handler(servicer: _MockServicer, server: grpc.Server) -> None:
request_deserializer=DeleteNodeRequest.FromString,
response_serializer=DeleteNodeResponse.SerializeToString,
),
"PullTaskIns": grpc.unary_unary_rpc_method_handler(
servicer.unary_unary,
request_deserializer=PullTaskInsRequest.FromString,
response_serializer=PullTaskInsResponse.SerializeToString,
),
"PullMessages": grpc.unary_unary_rpc_method_handler(
servicer.unary_unary,
request_deserializer=PullMessagesRequest.FromString,
response_serializer=PullMessagesResponse.SerializeToString,
),
"PushTaskRes": grpc.unary_unary_rpc_method_handler(
servicer.unary_unary,
request_deserializer=PushTaskResRequest.FromString,
response_serializer=PushTaskResResponse.SerializeToString,
),
"PushMessages": grpc.unary_unary_rpc_method_handler(
servicer.unary_unary,
request_deserializer=PushMessagesRequest.FromString,
Expand Down
16 changes: 0 additions & 16 deletions src/py/flwr/client/grpc_rere_client/grpc_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@
PingResponse,
PullMessagesRequest,
PullMessagesResponse,
PullTaskInsRequest,
PullTaskInsResponse,
PushMessagesRequest,
PushMessagesResponse,
PushTaskResRequest,
PushTaskResResponse,
)
from flwr.proto.grpcadapter_pb2 import MessageContainer # pylint: disable=E0611
from flwr.proto.grpcadapter_pb2_grpc import GrpcAdapterStub
Expand Down Expand Up @@ -130,24 +126,12 @@ def Ping( # pylint: disable=C0103
"""."""
return self._send_and_receive(request, PingResponse, **kwargs)

def PullTaskIns( # pylint: disable=C0103
self, request: PullTaskInsRequest, **kwargs: Any
) -> PullTaskInsResponse:
"""."""
return self._send_and_receive(request, PullTaskInsResponse, **kwargs)

def PullMessages( # pylint: disable=C0103
self, request: PullMessagesRequest, **kwargs: Any
) -> PullMessagesResponse:
"""."""
return self._send_and_receive(request, PullMessagesResponse, **kwargs)

def PushTaskRes( # pylint: disable=C0103
self, request: PushTaskResRequest, **kwargs: Any
) -> PushTaskResResponse:
"""."""
return self._send_and_receive(request, PushTaskResResponse, **kwargs)

def PushMessages( # pylint: disable=C0103
self, request: PushMessagesRequest, **kwargs: Any
) -> PushMessagesResponse:
Expand Down
67 changes: 27 additions & 40 deletions src/py/flwr/proto/fleet_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1350ef4

Please sign in to comment.