From 89f5796772fca014560fc59a6c9c50b0b69b639f Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Mon, 25 Mar 2024 20:34:47 +0100 Subject: [PATCH 1/2] Create Ping endpoint --- src/proto/flwr/proto/fleet.proto | 5 +++ src/py/flwr/proto/fleet_pb2.py | 34 +++++++++++-------- src/py/flwr/proto/fleet_pb2.pyi | 25 ++++++++++++++ src/py/flwr/proto/fleet_pb2_grpc.py | 33 ++++++++++++++++++ src/py/flwr/proto/fleet_pb2_grpc.pyi | 10 ++++++ .../fleet/grpc_rere/fleet_servicer.py | 10 ++++++ .../fleet/message_handler/message_handler.py | 10 ++++++ 7 files changed, 112 insertions(+), 15 deletions(-) diff --git a/src/proto/flwr/proto/fleet.proto b/src/proto/flwr/proto/fleet.proto index c900a3b1148d..fcb301181f5a 100644 --- a/src/proto/flwr/proto/fleet.proto +++ b/src/proto/flwr/proto/fleet.proto @@ -23,6 +23,7 @@ import "flwr/proto/task.proto"; service Fleet { rpc CreateNode(CreateNodeRequest) returns (CreateNodeResponse) {} rpc DeleteNode(DeleteNodeRequest) returns (DeleteNodeResponse) {} + rpc Ping(PingRequest) returns (PingResponse) {} // Retrieve one or more tasks, if possible // @@ -43,6 +44,10 @@ message CreateNodeResponse { Node node = 1; } message DeleteNodeRequest { Node node = 1; } message DeleteNodeResponse {} +// Ping messages +message PingRequest { Node node = 1; } +message PingResponse { bool success = 1; } + // PullTaskIns messages message PullTaskInsRequest { Node node = 1; diff --git a/src/py/flwr/proto/fleet_pb2.py b/src/py/flwr/proto/fleet_pb2.py index e8443c296f0c..dbf64fb850a5 100644 --- a/src/py/flwr/proto/fleet_pb2.py +++ b/src/py/flwr/proto/fleet_pb2.py @@ -16,7 +16,7 @@ from flwr.proto import task_pb2 as flwr_dot_proto_dot_task__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x16\x66lwr/proto/fleet.proto\x12\nflwr.proto\x1a\x15\x66lwr/proto/node.proto\x1a\x15\x66lwr/proto/task.proto\"\x13\n\x11\x43reateNodeRequest\"4\n\x12\x43reateNodeResponse\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\"3\n\x11\x44\x65leteNodeRequest\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\"\x14\n\x12\x44\x65leteNodeResponse\"F\n\x12PullTaskInsRequest\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\x12\x10\n\x08task_ids\x18\x02 \x03(\t\"k\n\x13PullTaskInsResponse\x12(\n\treconnect\x18\x01 \x01(\x0b\x32\x15.flwr.proto.Reconnect\x12*\n\rtask_ins_list\x18\x02 \x03(\x0b\x32\x13.flwr.proto.TaskIns\"@\n\x12PushTaskResRequest\x12*\n\rtask_res_list\x18\x01 \x03(\x0b\x32\x13.flwr.proto.TaskRes\"\xae\x01\n\x13PushTaskResResponse\x12(\n\treconnect\x18\x01 \x01(\x0b\x32\x15.flwr.proto.Reconnect\x12=\n\x07results\x18\x02 \x03(\x0b\x32,.flwr.proto.PushTaskResResponse.ResultsEntry\x1a.\n\x0cResultsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\r:\x02\x38\x01\"\x1e\n\tReconnect\x12\x11\n\treconnect\x18\x01 \x01(\x04\x32\xc9\x02\n\x05\x46leet\x12M\n\nCreateNode\x12\x1d.flwr.proto.CreateNodeRequest\x1a\x1e.flwr.proto.CreateNodeResponse\"\x00\x12M\n\nDeleteNode\x12\x1d.flwr.proto.DeleteNodeRequest\x1a\x1e.flwr.proto.DeleteNodeResponse\"\x00\x12P\n\x0bPullTaskIns\x12\x1e.flwr.proto.PullTaskInsRequest\x1a\x1f.flwr.proto.PullTaskInsResponse\"\x00\x12P\n\x0bPushTaskRes\x12\x1e.flwr.proto.PushTaskResRequest\x1a\x1f.flwr.proto.PushTaskResResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x16\x66lwr/proto/fleet.proto\x12\nflwr.proto\x1a\x15\x66lwr/proto/node.proto\x1a\x15\x66lwr/proto/task.proto\"\x13\n\x11\x43reateNodeRequest\"4\n\x12\x43reateNodeResponse\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\"3\n\x11\x44\x65leteNodeRequest\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\"\x14\n\x12\x44\x65leteNodeResponse\"-\n\x0bPingRequest\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\"\x1f\n\x0cPingResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\"F\n\x12PullTaskInsRequest\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\x12\x10\n\x08task_ids\x18\x02 \x03(\t\"k\n\x13PullTaskInsResponse\x12(\n\treconnect\x18\x01 \x01(\x0b\x32\x15.flwr.proto.Reconnect\x12*\n\rtask_ins_list\x18\x02 \x03(\x0b\x32\x13.flwr.proto.TaskIns\"@\n\x12PushTaskResRequest\x12*\n\rtask_res_list\x18\x01 \x03(\x0b\x32\x13.flwr.proto.TaskRes\"\xae\x01\n\x13PushTaskResResponse\x12(\n\treconnect\x18\x01 \x01(\x0b\x32\x15.flwr.proto.Reconnect\x12=\n\x07results\x18\x02 \x03(\x0b\x32,.flwr.proto.PushTaskResResponse.ResultsEntry\x1a.\n\x0cResultsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\r:\x02\x38\x01\"\x1e\n\tReconnect\x12\x11\n\treconnect\x18\x01 \x01(\x04\x32\x86\x03\n\x05\x46leet\x12M\n\nCreateNode\x12\x1d.flwr.proto.CreateNodeRequest\x1a\x1e.flwr.proto.CreateNodeResponse\"\x00\x12M\n\nDeleteNode\x12\x1d.flwr.proto.DeleteNodeRequest\x1a\x1e.flwr.proto.DeleteNodeResponse\"\x00\x12;\n\x04Ping\x12\x17.flwr.proto.PingRequest\x1a\x18.flwr.proto.PingResponse\"\x00\x12P\n\x0bPullTaskIns\x12\x1e.flwr.proto.PullTaskInsRequest\x1a\x1f.flwr.proto.PullTaskInsResponse\"\x00\x12P\n\x0bPushTaskRes\x12\x1e.flwr.proto.PushTaskResRequest\x1a\x1f.flwr.proto.PushTaskResResponse\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -33,18 +33,22 @@ _globals['_DELETENODEREQUEST']._serialized_end=210 _globals['_DELETENODERESPONSE']._serialized_start=212 _globals['_DELETENODERESPONSE']._serialized_end=232 - _globals['_PULLTASKINSREQUEST']._serialized_start=234 - _globals['_PULLTASKINSREQUEST']._serialized_end=304 - _globals['_PULLTASKINSRESPONSE']._serialized_start=306 - _globals['_PULLTASKINSRESPONSE']._serialized_end=413 - _globals['_PUSHTASKRESREQUEST']._serialized_start=415 - _globals['_PUSHTASKRESREQUEST']._serialized_end=479 - _globals['_PUSHTASKRESRESPONSE']._serialized_start=482 - _globals['_PUSHTASKRESRESPONSE']._serialized_end=656 - _globals['_PUSHTASKRESRESPONSE_RESULTSENTRY']._serialized_start=610 - _globals['_PUSHTASKRESRESPONSE_RESULTSENTRY']._serialized_end=656 - _globals['_RECONNECT']._serialized_start=658 - _globals['_RECONNECT']._serialized_end=688 - _globals['_FLEET']._serialized_start=691 - _globals['_FLEET']._serialized_end=1020 + _globals['_PINGREQUEST']._serialized_start=234 + _globals['_PINGREQUEST']._serialized_end=279 + _globals['_PINGRESPONSE']._serialized_start=281 + _globals['_PINGRESPONSE']._serialized_end=312 + _globals['_PULLTASKINSREQUEST']._serialized_start=314 + _globals['_PULLTASKINSREQUEST']._serialized_end=384 + _globals['_PULLTASKINSRESPONSE']._serialized_start=386 + _globals['_PULLTASKINSRESPONSE']._serialized_end=493 + _globals['_PUSHTASKRESREQUEST']._serialized_start=495 + _globals['_PUSHTASKRESREQUEST']._serialized_end=559 + _globals['_PUSHTASKRESRESPONSE']._serialized_start=562 + _globals['_PUSHTASKRESRESPONSE']._serialized_end=736 + _globals['_PUSHTASKRESRESPONSE_RESULTSENTRY']._serialized_start=690 + _globals['_PUSHTASKRESRESPONSE_RESULTSENTRY']._serialized_end=736 + _globals['_RECONNECT']._serialized_start=738 + _globals['_RECONNECT']._serialized_end=768 + _globals['_FLEET']._serialized_start=771 + _globals['_FLEET']._serialized_end=1161 # @@protoc_insertion_point(module_scope) diff --git a/src/py/flwr/proto/fleet_pb2.pyi b/src/py/flwr/proto/fleet_pb2.pyi index 86bc358858d2..39edb61ca0d7 100644 --- a/src/py/flwr/proto/fleet_pb2.pyi +++ b/src/py/flwr/proto/fleet_pb2.pyi @@ -53,6 +53,31 @@ class DeleteNodeResponse(google.protobuf.message.Message): ) -> None: ... global___DeleteNodeResponse = DeleteNodeResponse +class PingRequest(google.protobuf.message.Message): + """Ping messages""" + DESCRIPTOR: google.protobuf.descriptor.Descriptor + NODE_FIELD_NUMBER: builtins.int + @property + def node(self) -> flwr.proto.node_pb2.Node: ... + def __init__(self, + *, + node: typing.Optional[flwr.proto.node_pb2.Node] = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["node",b"node"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["node",b"node"]) -> None: ... +global___PingRequest = PingRequest + +class PingResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + SUCCESS_FIELD_NUMBER: builtins.int + success: builtins.bool + def __init__(self, + *, + success: builtins.bool = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["success",b"success"]) -> None: ... +global___PingResponse = PingResponse + class PullTaskInsRequest(google.protobuf.message.Message): """PullTaskIns messages""" DESCRIPTOR: google.protobuf.descriptor.Descriptor diff --git a/src/py/flwr/proto/fleet_pb2_grpc.py b/src/py/flwr/proto/fleet_pb2_grpc.py index 2b53ec43e851..c31a4ec73f0e 100644 --- a/src/py/flwr/proto/fleet_pb2_grpc.py +++ b/src/py/flwr/proto/fleet_pb2_grpc.py @@ -24,6 +24,11 @@ def __init__(self, channel): request_serializer=flwr_dot_proto_dot_fleet__pb2.DeleteNodeRequest.SerializeToString, response_deserializer=flwr_dot_proto_dot_fleet__pb2.DeleteNodeResponse.FromString, ) + self.Ping = channel.unary_unary( + '/flwr.proto.Fleet/Ping', + request_serializer=flwr_dot_proto_dot_fleet__pb2.PingRequest.SerializeToString, + response_deserializer=flwr_dot_proto_dot_fleet__pb2.PingResponse.FromString, + ) self.PullTaskIns = channel.unary_unary( '/flwr.proto.Fleet/PullTaskIns', request_serializer=flwr_dot_proto_dot_fleet__pb2.PullTaskInsRequest.SerializeToString, @@ -51,6 +56,12 @@ def DeleteNode(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def Ping(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def PullTaskIns(self, request, context): """Retrieve one or more tasks, if possible @@ -82,6 +93,11 @@ def add_FleetServicer_to_server(servicer, server): request_deserializer=flwr_dot_proto_dot_fleet__pb2.DeleteNodeRequest.FromString, response_serializer=flwr_dot_proto_dot_fleet__pb2.DeleteNodeResponse.SerializeToString, ), + 'Ping': grpc.unary_unary_rpc_method_handler( + servicer.Ping, + request_deserializer=flwr_dot_proto_dot_fleet__pb2.PingRequest.FromString, + response_serializer=flwr_dot_proto_dot_fleet__pb2.PingResponse.SerializeToString, + ), 'PullTaskIns': grpc.unary_unary_rpc_method_handler( servicer.PullTaskIns, request_deserializer=flwr_dot_proto_dot_fleet__pb2.PullTaskInsRequest.FromString, @@ -136,6 +152,23 @@ def DeleteNode(request, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + @staticmethod + def Ping(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/flwr.proto.Fleet/Ping', + flwr_dot_proto_dot_fleet__pb2.PingRequest.SerializeToString, + flwr_dot_proto_dot_fleet__pb2.PingResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + @staticmethod def PullTaskIns(request, target, diff --git a/src/py/flwr/proto/fleet_pb2_grpc.pyi b/src/py/flwr/proto/fleet_pb2_grpc.pyi index cfa83f737439..33ba9440793a 100644 --- a/src/py/flwr/proto/fleet_pb2_grpc.pyi +++ b/src/py/flwr/proto/fleet_pb2_grpc.pyi @@ -16,6 +16,10 @@ class FleetStub: flwr.proto.fleet_pb2.DeleteNodeRequest, flwr.proto.fleet_pb2.DeleteNodeResponse] + Ping: grpc.UnaryUnaryMultiCallable[ + flwr.proto.fleet_pb2.PingRequest, + flwr.proto.fleet_pb2.PingResponse] + PullTaskIns: grpc.UnaryUnaryMultiCallable[ flwr.proto.fleet_pb2.PullTaskInsRequest, flwr.proto.fleet_pb2.PullTaskInsResponse] @@ -46,6 +50,12 @@ class FleetServicer(metaclass=abc.ABCMeta): context: grpc.ServicerContext, ) -> flwr.proto.fleet_pb2.DeleteNodeResponse: ... + @abc.abstractmethod + def Ping(self, + request: flwr.proto.fleet_pb2.PingRequest, + context: grpc.ServicerContext, + ) -> flwr.proto.fleet_pb2.PingResponse: ... + @abc.abstractmethod def PullTaskIns(self, request: flwr.proto.fleet_pb2.PullTaskInsRequest, diff --git a/src/py/flwr/server/superlink/fleet/grpc_rere/fleet_servicer.py b/src/py/flwr/server/superlink/fleet/grpc_rere/fleet_servicer.py index 278474477379..ec8aaa45ec4d 100644 --- a/src/py/flwr/server/superlink/fleet/grpc_rere/fleet_servicer.py +++ b/src/py/flwr/server/superlink/fleet/grpc_rere/fleet_servicer.py @@ -26,6 +26,8 @@ CreateNodeResponse, DeleteNodeRequest, DeleteNodeResponse, + PingRequest, + PingResponse, PullTaskInsRequest, PullTaskInsResponse, PushTaskResRequest, @@ -61,6 +63,14 @@ def DeleteNode( state=self.state_factory.state(), ) + def Ping(self, request: PingRequest, context: grpc.ServicerContext) -> PingResponse: + """.""" + log(INFO, "FleetServicer.Ping") + return message_handler.ping( + request=request, + state=self.state_factory.state(), + ) + def PullTaskIns( self, request: PullTaskInsRequest, context: grpc.ServicerContext ) -> PullTaskInsResponse: diff --git a/src/py/flwr/server/superlink/fleet/message_handler/message_handler.py b/src/py/flwr/server/superlink/fleet/message_handler/message_handler.py index c99a7854d53a..2e696dde78e1 100644 --- a/src/py/flwr/server/superlink/fleet/message_handler/message_handler.py +++ b/src/py/flwr/server/superlink/fleet/message_handler/message_handler.py @@ -23,6 +23,8 @@ CreateNodeResponse, DeleteNodeRequest, DeleteNodeResponse, + PingRequest, + PingResponse, PullTaskInsRequest, PullTaskInsResponse, PushTaskResRequest, @@ -55,6 +57,14 @@ def delete_node(request: DeleteNodeRequest, state: State) -> DeleteNodeResponse: return DeleteNodeResponse() +def ping( + request: PingRequest, # pylint: disable=unused-argument + state: State, # pylint: disable=unused-argument +) -> PingResponse: + """.""" + return PingResponse(success=True) + + def pull_task_ins(request: PullTaskInsRequest, state: State) -> PullTaskInsResponse: """Pull TaskIns handler.""" # Get node_id if client node is not anonymous From 9c7ba62b72a860a29cf2c01d4b7286664897799d Mon Sep 17 00:00:00 2001 From: "Daniel J. Beutel" Date: Mon, 25 Mar 2024 21:24:42 +0100 Subject: [PATCH 2/2] Change log level to DEBUG --- .../flwr/server/superlink/fleet/grpc_rere/fleet_servicer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/server/superlink/fleet/grpc_rere/fleet_servicer.py b/src/py/flwr/server/superlink/fleet/grpc_rere/fleet_servicer.py index ec8aaa45ec4d..eb8dd800ea37 100644 --- a/src/py/flwr/server/superlink/fleet/grpc_rere/fleet_servicer.py +++ b/src/py/flwr/server/superlink/fleet/grpc_rere/fleet_servicer.py @@ -15,7 +15,7 @@ """Fleet API gRPC request-response servicer.""" -from logging import INFO +from logging import DEBUG, INFO import grpc @@ -65,7 +65,7 @@ def DeleteNode( def Ping(self, request: PingRequest, context: grpc.ServicerContext) -> PingResponse: """.""" - log(INFO, "FleetServicer.Ping") + log(DEBUG, "FleetServicer.Ping") return message_handler.ping( request=request, state=self.state_factory.state(),