Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Add flwr log #3573

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c0601ba
feat(framework) Add superexec
charlesbvll Jun 5, 2024
485616b
Add debug setup
charlesbvll Jun 5, 2024
12e2d3b
Improve
charlesbvll Jun 5, 2024
156dc4d
Refactoring
charlesbvll Jun 6, 2024
53c7106
Rename correctly
charlesbvll Jun 6, 2024
eb7f6e7
Merge branch 'main' into add-superexec-proto
charlesbvll Jun 6, 2024
788cfee
Fully working
charlesbvll Jun 6, 2024
b316114
Merge branch 'main' into add-superexec-proto
charlesbvll Jun 6, 2024
f1809e2
Fix imports ordering
charlesbvll Jun 6, 2024
7cd19cf
Try fixing issues
charlesbvll Jun 6, 2024
fd003ba
Fix imports
charlesbvll Jun 6, 2024
6a4308d
Fix ruff errors
charlesbvll Jun 6, 2024
d3e71a9
Use subscript for `Popen`
charlesbvll Jun 6, 2024
69886ba
Improve test and servicer
charlesbvll Jun 6, 2024
f818900
Fix pylint errors
charlesbvll Jun 6, 2024
4708671
Try with string type
charlesbvll Jun 6, 2024
c8b5ef7
Define new type
charlesbvll Jun 6, 2024
5bee556
Use correct syntax
charlesbvll Jun 6, 2024
6b0f9bc
Remove unused import
charlesbvll Jun 6, 2024
f5d7788
Disable pylint error
charlesbvll Jun 6, 2024
d4ad793
Remove subscript
charlesbvll Jun 6, 2024
4e31732
Ignore type
charlesbvll Jun 6, 2024
a7e990f
Revert driver and server_app changes
charlesbvll Jun 6, 2024
6f0d3cb
Correct number of proto files
charlesbvll Jun 6, 2024
a386324
Initial commit of flwr log
chongshenng Jun 10, 2024
0042ca4
Merge remote-tracking branch 'origin/add-superexec-proto' into add-fl…
chongshenng Jun 11, 2024
4a5aa00
Add placeholder for superexec address
chongshenng Jun 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ flwr = "flwr.cli.app:app"
flower-driver-api = "flwr.server:run_driver_api"
flower-fleet-api = "flwr.server:run_fleet_api"
flower-superlink = "flwr.server:run_superlink"
flower-superexec = "flwr.superexec:run_superexec"
flower-supernode = "flwr.client:run_supernode"
flower-client-app = "flwr.client:run_client_app"
flower-server-app = "flwr.server:run_server_app"
Expand Down
31 changes: 31 additions & 0 deletions src/proto/flwr/proto/exec.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 Flower Labs GmbH. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ==============================================================================

syntax = "proto3";

package flwr.proto;

service Exec {
// Start run upon request
rpc StartRun(StartRunRequest) returns (StartRunResponse) {}

// Start log stream upon request
rpc StreamLogs(StreamLogsRequest) returns (stream StreamLogsResponse) {}
}

message StartRunRequest { bytes fab_file = 1; }
message StartRunResponse { sint64 run_id = 1; }
message StreamLogsRequest { sint64 run_id = 1; }
message StreamLogsResponse { string log_output = 1; }
3 changes: 2 additions & 1 deletion src/py/flwr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

from flwr.common.version import package_version as _package_version

from . import client, common, server, simulation
from . import client, common, server, simulation, superexec

__all__ = [
"client",
"common",
"server",
"simulation",
"superexec",
]

__version__ = _package_version
2 changes: 2 additions & 0 deletions src/py/flwr/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from .build import build
from .example import example
from .log import log
from .new import new
from .run import run

Expand All @@ -34,6 +35,7 @@
app.command()(example)
app.command()(run)
app.command()(build)
app.command()(log)

if __name__ == "__main__":
app()
65 changes: 65 additions & 0 deletions src/py/flwr/cli/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2024 Flower Labs GmbH. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Flower command line interface `log` command."""

import typer
from typing_extensions import Annotated


def log(
run_id: Annotated[
int,
typer.Option(case_sensitive=False, help="The Flower run ID to query"),
],
follow: Annotated[
bool,
typer.Option(case_sensitive=False, help="Use this flag to follow logstream"),
] = True,
) -> None:
"""Get logs from Flower run."""
from logging import DEBUG, INFO

from flwr.common.grpc import GRPC_MAX_MESSAGE_LENGTH, create_channel
from flwr.common.logger import log
from flwr.proto.exec_pb2 import StreamLogsRequest
from flwr.proto.exec_pb2_grpc import ExecStub

# TODO: Set SuperExec address

def on_channel_state_change(channel_connectivity: str) -> None:
"""Log channel connectivity."""
log(DEBUG, channel_connectivity)

channel = create_channel(
server_address="127.0.0.1:9093",
insecure=True,
root_certificates=None,
max_message_length=GRPC_MAX_MESSAGE_LENGTH,
interceptors=None,
)
channel.subscribe(on_channel_state_change)

try:
stub = ExecStub(channel)
req = StreamLogsRequest(run_id=run_id)

for res in stub.StreamLogs(req):
print(res.log_output)
if follow:
continue
else:
break
except KeyboardInterrupt:
log(INFO, "Exiting `flwr log`.")
118 changes: 77 additions & 41 deletions src/py/flwr/cli/run/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@

import sys
from enum import Enum
from logging import DEBUG
from typing import Optional

import typer
from typing_extensions import Annotated

from flwr.cli import config_utils
from flwr.common.grpc import GRPC_MAX_MESSAGE_LENGTH, create_channel
from flwr.common.logger import log
from flwr.proto.exec_pb2 import StartRunRequest # pylint: disable=E0611
from flwr.proto.exec_pb2_grpc import ExecStub
from flwr.simulation.run_simulation import _run_simulation


Expand All @@ -31,54 +36,85 @@ class Engine(str, Enum):
SIMULATION = "simulation"


# pylint: disable-next=too-many-locals
def run(
engine: Annotated[
Optional[Engine],
typer.Option(case_sensitive=False, help="The ML framework to use"),
] = None,
use_superexec: Annotated[
bool,
typer.Option(
case_sensitive=False, help="Use this flag to use the new SuperExec API"
),
] = False,
) -> None:
"""Run Flower project."""
typer.secho("Loading project configuration... ", fg=typer.colors.BLUE)

config, errors, warnings = config_utils.load_and_validate()

if config is None:
typer.secho(
"Project configuration could not be loaded.\npyproject.toml is invalid:\n"
+ "\n".join([f"- {line}" for line in errors]),
fg=typer.colors.RED,
bold=True,
)
sys.exit()

if warnings:
typer.secho(
"Project configuration is missing the following "
"recommended properties:\n" + "\n".join([f"- {line}" for line in warnings]),
fg=typer.colors.RED,
bold=True,
if use_superexec:

def on_channel_state_change(channel_connectivity: str) -> None:
"""Log channel connectivity."""
log(DEBUG, channel_connectivity)

channel = create_channel(
server_address="127.0.0.1:9093",
insecure=True,
root_certificates=None,
max_message_length=GRPC_MAX_MESSAGE_LENGTH,
interceptors=None,
)
channel.subscribe(on_channel_state_change)
stub = ExecStub(channel)

typer.secho("Success", fg=typer.colors.GREEN)

server_app_ref = config["flower"]["components"]["serverapp"]
client_app_ref = config["flower"]["components"]["clientapp"]

if engine is None:
engine = config["flower"]["engine"]["name"]

if engine == Engine.SIMULATION:
num_supernodes = config["flower"]["engine"]["simulation"]["supernode"]["num"]

typer.secho("Starting run... ", fg=typer.colors.BLUE)
_run_simulation(
server_app_attr=server_app_ref,
client_app_attr=client_app_ref,
num_supernodes=num_supernodes,
)
req = StartRunRequest()
res = stub.StartRun(req)
print(res)
else:
typer.secho(
f"Engine '{engine}' is not yet supported in `flwr run`",
fg=typer.colors.RED,
bold=True,
)
typer.secho("Loading project configuration... ", fg=typer.colors.BLUE)

config, errors, warnings = config_utils.load_and_validate()

if config is None:
typer.secho(
"Project configuration could not be loaded.\n"
"pyproject.toml is invalid:\n"
+ "\n".join([f"- {line}" for line in errors]),
fg=typer.colors.RED,
bold=True,
)
sys.exit()

if warnings:
typer.secho(
"Project configuration is missing the following "
"recommended properties:\n"
+ "\n".join([f"- {line}" for line in warnings]),
fg=typer.colors.RED,
bold=True,
)

typer.secho("Success", fg=typer.colors.GREEN)

server_app_ref = config["flower"]["components"]["serverapp"]
client_app_ref = config["flower"]["components"]["clientapp"]

if engine is None:
engine = config["flower"]["engine"]["name"]

if engine == Engine.SIMULATION:
num_supernodes = config["flower"]["engine"]["simulation"]["supernode"][
"num"
]

typer.secho("Starting run... ", fg=typer.colors.BLUE)
_run_simulation(
server_app_attr=server_app_ref,
client_app_attr=client_app_ref,
num_supernodes=num_supernodes,
)
else:
typer.secho(
f"Engine '{engine}' is not yet supported in `flwr run`",
fg=typer.colors.RED,
bold=True,
)
4 changes: 4 additions & 0 deletions src/py/flwr/common/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ def _generate_next_value_(name: str, start: int, count: int, last_values: List[A
RUN_SUPERNODE_ENTER = auto()
RUN_SUPERNODE_LEAVE = auto()

# SuperExec
RUN_SUPEREXEC_ENTER = auto()
RUN_SUPEREXEC_LEAVE = auto()


# Use the ThreadPoolExecutor with max_workers=1 to have a queue
# and also ensure that telemetry calls are not blocking.
Expand Down
34 changes: 34 additions & 0 deletions src/py/flwr/proto/exec_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 55 additions & 0 deletions src/py/flwr/proto/exec_pb2.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""
@generated by mypy-protobuf. Do not edit manually!
isort:skip_file
"""
import builtins
import google.protobuf.descriptor
import google.protobuf.message
import typing
import typing_extensions

DESCRIPTOR: google.protobuf.descriptor.FileDescriptor

class StartRunRequest(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
FAB_FILE_FIELD_NUMBER: builtins.int
fab_file: builtins.bytes
def __init__(self,
*,
fab_file: builtins.bytes = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["fab_file",b"fab_file"]) -> None: ...
global___StartRunRequest = StartRunRequest

class StartRunResponse(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
RUN_ID_FIELD_NUMBER: builtins.int
run_id: builtins.int
def __init__(self,
*,
run_id: builtins.int = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["run_id",b"run_id"]) -> None: ...
global___StartRunResponse = StartRunResponse

class StreamLogsRequest(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
RUN_ID_FIELD_NUMBER: builtins.int
run_id: builtins.int
def __init__(self,
*,
run_id: builtins.int = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["run_id",b"run_id"]) -> None: ...
global___StreamLogsRequest = StreamLogsRequest

class StreamLogsResponse(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
LOG_OUTPUT_FIELD_NUMBER: builtins.int
log_output: typing.Text
def __init__(self,
*,
log_output: typing.Text = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["log_output",b"log_output"]) -> None: ...
global___StreamLogsResponse = StreamLogsResponse
Loading
Loading