Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Add ClientApp process function #3977

Merged
merged 40 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
dc1f710
Initial commit
chongshenng Aug 8, 2024
df0ccf8
Merge branch 'add-exec-client-app' into add-multiproc-execution
chongshenng Aug 8, 2024
aef1b3e
Init commit
chongshenng Aug 8, 2024
11ed5cf
Merge branch 'main' into add-exec-client-app
chongshenng Aug 8, 2024
1541f18
Merge branch 'main' into add-exec-client-app
chongshenng Aug 9, 2024
eaed17b
Update internal command to flwr-clientapp
chongshenng Aug 9, 2024
4b32077
Update __init__.py
chongshenng Aug 9, 2024
4e7612a
Merge branch 'main' into add-exec-client-app
chongshenng Aug 9, 2024
a16ab06
Merge branch 'main' into add-exec-client-app
danieljanes Aug 10, 2024
ed64df1
Update src/py/flwr/client/supernode/app.py
danieljanes Aug 10, 2024
1a059ee
Update src/py/flwr/client/supernode/app.py
danieljanes Aug 10, 2024
7c16d32
Merge branch 'main' into add-exec-client-app
chongshenng Aug 11, 2024
e269e2e
Address comments
chongshenng Aug 11, 2024
e3ce501
Merge main
chongshenng Aug 12, 2024
b8c4bc1
Update PR
chongshenng Aug 12, 2024
57d0613
Update PR
chongshenng Aug 12, 2024
d6b5b5d
Address comments
chongshenng Aug 12, 2024
27f2e60
Merge main
chongshenng Aug 13, 2024
5cf506f
Merge branch 'main' into add-multiproc-execution
chongshenng Aug 13, 2024
5d30a7c
Refactor code for tests
chongshenng Aug 13, 2024
18f04e0
Update docstring
chongshenng Aug 13, 2024
808c2fb
Merge main
chongshenng Aug 13, 2024
8925e0b
Merge branch 'main' into add-multiproc-execution
danieljanes Aug 15, 2024
1803bab
Merge branch 'main' into add-multiproc-execution
danieljanes Aug 15, 2024
81c2a83
Fix missing import
chongshenng Aug 15, 2024
3ae8021
Fix return type
chongshenng Aug 15, 2024
91cce4e
Change import
chongshenng Aug 15, 2024
1a48975
fix
chongshenng Aug 15, 2024
cd3b888
Set address
chongshenng Aug 15, 2024
ea3956a
Reorder
chongshenng Aug 15, 2024
f6b9f85
Update src/py/flwr/client/app.py
danieljanes Aug 15, 2024
2a15782
Add exception handling
chongshenng Aug 16, 2024
d3eac24
Merge branch 'main' into add-multiproc-execution
chongshenng Aug 16, 2024
42a0336
Lint
chongshenng Aug 16, 2024
13f557f
add
chongshenng Aug 16, 2024
1d514af
Remove background from name
chongshenng Aug 16, 2024
08153ac
Move on_channel_state_change to top-level
chongshenng Aug 16, 2024
0ea3494
Merge branch 'main' into add-multiproc-execution
danieljanes Aug 16, 2024
7f13cdf
Lint
chongshenng Aug 16, 2024
918e1aa
Merge branch 'main' into add-multiproc-execution
danieljanes Aug 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ flower-supernode = "flwr.client:run_supernode"
flower-client-app = "flwr.client:run_client_app"
flower-server-app = "flwr.server:run_server_app"
flower-simulation = "flwr.simulation.run_simulation:run_simulation_from_cli"
flower-exec-client-app = "flwr.client.supernode:exec_client_app"

[tool.poetry.dependencies]
python = "^3.8"
Expand Down
118 changes: 118 additions & 0 deletions src/py/flwr/client/process/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Flower background ClientApp."""

from logging import DEBUG, ERROR, INFO

import grpc

# from flwr.cli.install import install_from_fab
from flwr.client.client_app import ClientApp
from flwr.common.grpc import GRPC_MAX_MESSAGE_LENGTH, create_channel
from flwr.common.logger import log
from flwr.common.serde import (
context_from_proto,
context_to_proto,
message_from_proto,
message_to_proto,
run_from_proto,
)

# pylint: disable=E0401,E0611
from flwr.proto.appio_pb2 import PullClientAppInputsRequest, PushClientAppOutputsRequest
from flwr.proto.appio_pb2_grpc import ClientAppIoStub, add_ClientAppIoServicer_to_server

# pylint: disable=E0611
from flwr.server.superlink.fleet.grpc_bidi.grpc_server import generic_create_grpc_server

from .clientappio_servicer import ClientAppIoServicer
from .utils import _get_load_client_app_fn


def _run_background_client( # pylint: disable=R0914
address: str,
token: int,
) -> None:
"""Run background Flower ClientApp process."""

def on_channel_state_change(channel_connectivity: str) -> None:
"""Log channel connectivity."""
log(DEBUG, channel_connectivity)

channel = create_channel(
server_address=address,
insecure=True,
)
channel.subscribe(on_channel_state_change)

try:
stub = ClientAppIoStub(channel)

req = PullClientAppInputsRequest(token=token)
res = stub.PullClientAppInputs(req)
# fab_file = res.fab
run = run_from_proto(res.run)
message = message_from_proto(res.message)
context = context_from_proto(res.context)
# Ensures FAB is installed (default is Flower directory)
# install_from_fab(
# fab_file, None, True
# )
load_client_app_fn = _get_load_client_app_fn(
default_app_ref="",
project_dir="",
multi_app=True,
flwr_dir=None,
)
# print(f"FAB ID: {run.fab_id}, FAB version: {run.fab_version}")
client_app: ClientApp = load_client_app_fn(
run.fab_id, run.fab_version # To be optimized later
)
# Execute ClientApp
reply_message = client_app(message=message, context=context)

proto_message = message_to_proto(reply_message)
proto_context = context_to_proto(context)
req = PushClientAppOutputsRequest(
token=token,
message=proto_message,
context=proto_context,
)
res = stub.PushClientAppOutputs(req)
except KeyboardInterrupt:
log(INFO, "Closing connection")
except grpc.RpcError as e:
log(ERROR, "GRPC error occurred: %s", str(e))
finally:
channel.close()


def run_clientappio_api_grpc(
address: str = "0.0.0.0:9094",
) -> tuple[grpc.Server, grpc.Server]:
"""Run ClientAppIo API (gRPC-rere)."""
clientappio_servicer: grpc.Server = ClientAppIoServicer()
clientappio_add_servicer_to_server_fn = add_ClientAppIoServicer_to_server
clientappio_grpc_server = generic_create_grpc_server(
servicer_and_add_fn=(
clientappio_servicer,
clientappio_add_servicer_to_server_fn,
),
server_address=address,
max_message_length=GRPC_MAX_MESSAGE_LENGTH,
)
log(INFO, "Starting Flower ClientAppIo gRPC server on %s", address)
clientappio_grpc_server.start()
return clientappio_servicer, clientappio_grpc_server
96 changes: 96 additions & 0 deletions src/py/flwr/client/process/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Flower ClientApp loading utils."""

from logging import DEBUG, WARN
from pathlib import Path
from typing import Callable, Optional

from flwr.client.client_app import ClientApp, LoadClientAppError
from flwr.common.config import get_flwr_dir, get_project_config, get_project_dir
from flwr.common.logger import log
from flwr.common.object_ref import load_app, validate


def _get_load_client_app_fn(
default_app_ref: str,
project_dir: str,
multi_app: bool,
flwr_dir: Optional[str] = None,
) -> Callable[[str, str], ClientApp]:
"""Get the load_client_app_fn function.
If `multi_app` is True, this function loads the specified ClientApp
based on `fab_id` and `fab_version`. If `fab_id` is empty, a default
ClientApp will be loaded.
If `multi_app` is False, it ignores `fab_id` and `fab_version` and
loads a default ClientApp.
"""
if not multi_app:
log(
DEBUG,
"Flower SuperNode will load and validate ClientApp `%s`",
default_app_ref,
)

valid, error_msg = validate(default_app_ref, project_dir=project_dir)
if not valid and error_msg:
raise LoadClientAppError(error_msg) from None

def _load(fab_id: str, fab_version: str) -> ClientApp:
runtime_project_dir = Path(project_dir).absolute()
# If multi-app feature is disabled
if not multi_app:
# Set app reference
client_app_ref = default_app_ref
# If multi-app feature is enabled but the fab id is not specified
elif fab_id == "":
if default_app_ref == "":
raise LoadClientAppError(
"Invalid FAB ID: The FAB ID is empty.",
) from None

log(WARN, "FAB ID is not provided; the default ClientApp will be loaded.")

# Set app reference
client_app_ref = default_app_ref
# If multi-app feature is enabled
else:
try:
runtime_project_dir = get_project_dir(
fab_id, fab_version, get_flwr_dir(flwr_dir)
)
config = get_project_config(runtime_project_dir)
except Exception as e:
raise LoadClientAppError("Failed to load ClientApp") from e

# Set app reference
client_app_ref = config["tool"]["flwr"]["app"]["components"]["clientapp"]

# Load ClientApp
log(
DEBUG,
"Loading ClientApp `%s`",
client_app_ref,
)
client_app = load_app(client_app_ref, LoadClientAppError, runtime_project_dir)

if not isinstance(client_app, ClientApp):
raise LoadClientAppError(
f"Attribute {client_app_ref} is not of type {ClientApp}",
) from None

return client_app

return _load
2 changes: 2 additions & 0 deletions src/py/flwr/client/supernode/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
"""Flower SuperNode."""


from .app import exec_client_app as exec_client_app
from .app import run_client_app as run_client_app
from .app import run_supernode as run_supernode

__all__ = [
"exec_client_app",
"run_client_app",
"run_supernode",
]
Loading
Loading