Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(framework) Allow clients to exit gracefully #3090

Merged
merged 54 commits into from
Jun 10, 2024
Merged
Changes from 49 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
8796192
Initial draft
charlesbvll Mar 10, 2024
0d2a092
Working draft
charlesbvll Mar 10, 2024
5dd7f1f
Merge branch 'main' of https://github.com/adap/flower into better-cli…
charlesbvll Mar 10, 2024
2959f1f
Cleaner implementation
charlesbvll Mar 10, 2024
dfef6f8
Add type hints
charlesbvll Mar 10, 2024
596551e
Merge branch 'main' of https://github.com/adap/flower into better-cli…
charlesbvll Mar 10, 2024
2147135
Merge branch 'main' into better-client-exit
danieljanes Mar 10, 2024
dc2bed0
Merge branch 'main' of https://github.com/adap/flower into better-cli…
charlesbvll Mar 11, 2024
90ed4f0
Remove nested try-except
charlesbvll Mar 11, 2024
7c48d02
Merge branch 'main' into better-client-exit
charlesbvll Mar 11, 2024
240c31f
Revert log changes
charlesbvll Mar 11, 2024
d902a5b
Merge branch 'main' into better-client-exit
charlesbvll Mar 11, 2024
ec7b57c
Merge branch 'main' into better-client-exit
charlesbvll Mar 11, 2024
7534e01
Merge branch 'main' into better-client-exit
charlesbvll Mar 11, 2024
6a55c4f
Merge branch 'main' of https://github.com/adap/flower into better-cli…
charlesbvll Mar 13, 2024
48910a2
Use signals
charlesbvll Mar 13, 2024
e931d20
Merge branch 'main' of https://github.com/adap/flower into better-cli…
charlesbvll Mar 13, 2024
d968b6c
Merge branch 'main' of https://github.com/adap/flower into better-cli…
charlesbvll Mar 13, 2024
5b17f2e
Better exception
charlesbvll Mar 13, 2024
53a6d6f
Add type hints
charlesbvll Mar 13, 2024
38aed6a
Add disables
charlesbvll Mar 13, 2024
73d68e2
Simplify implementation
charlesbvll Mar 13, 2024
aa76470
Add docstring
charlesbvll Mar 13, 2024
549df99
Merge branch 'main' into better-client-exit
charlesbvll Mar 13, 2024
27ad48c
Move signal handler registration before try/except
charlesbvll Mar 13, 2024
251b299
Merge branch 'main' into better-client-exit
charlesbvll Mar 14, 2024
7f35989
Merge branch 'main' into better-client-exit
charlesbvll Apr 10, 2024
446c46d
Format file
charlesbvll Apr 10, 2024
cb79552
Split comment on 2 lines
charlesbvll Apr 10, 2024
e0e7167
CreateNode on reconnect
charlesbvll Apr 10, 2024
a3c7cbb
Merge branch 'main' into better-client-exit
charlesbvll Apr 15, 2024
517be42
Merge branch 'main' into better-client-exit
panh99 Apr 22, 2024
c7f2cc1
Merge branch 'main' into better-client-exit
charlesbvll Apr 23, 2024
f21d674
Merge branch 'main' into better-client-exit
charlesbvll Apr 25, 2024
0ee74e7
Merge branch 'main' into better-client-exit
jafermarq May 8, 2024
c78892b
Delete run_tracker on exit
charlesbvll May 9, 2024
ccfbbce
Merge branch 'main' into better-client-exit
danieljanes May 19, 2024
d6c8111
Merge branch 'main' into better-client-exit
jafermarq May 27, 2024
13bb889
Merge branch 'main' into better-client-exit
charlesbvll May 28, 2024
adfb32b
Clientapp exit bool test (#3522)
charlesbvll May 28, 2024
39a9ba7
Merge branch 'main' into better-client-exit
charlesbvll May 28, 2024
91d3cb9
Merge branch 'main' into better-client-exit
charlesbvll May 28, 2024
c0f2379
feat(framework:skip) Add delete node on shutdown (#3524)
charlesbvll May 29, 2024
ffdd108
Revert delete_node change
charlesbvll May 30, 2024
5678704
Merge branch 'main' into better-client-exit
charlesbvll May 30, 2024
a5a3066
Merge branch 'main' into better-client-exit
jafermarq Jun 7, 2024
e487ca8
Merge branch 'main' into better-client-exit
danieljanes Jun 8, 2024
5910c1c
Merge branch 'main' into better-client-exit
jafermarq Jun 8, 2024
8ebe7d7
Merge branch 'main' into better-client-exit
danieljanes Jun 8, 2024
dee4bfe
Merge branch 'main' into better-client-exit
charlesbvll Jun 9, 2024
24a67c0
Merge branch 'main' into better-client-exit
danieljanes Jun 9, 2024
5d415a4
Rename and remove create_node
charlesbvll Jun 9, 2024
100ba85
feat(framework:skip) Call delete_node on shutdown (#3531)
charlesbvll Jun 9, 2024
83528a3
Keep ping stop event call
charlesbvll Jun 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 90 additions & 78 deletions src/py/flwr/client/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def _on_backoff(retry_state: RetryState) -> None:

node_state = NodeState()

while True:
while not run_tracker.interrupt:
sleep_duration: int = 0
with connection(
address,
Expand All @@ -323,102 +323,114 @@ def _on_backoff(retry_state: RetryState) -> None:
receive, send, create_node, delete_node, get_run = conn

# Register node
run_tracker.create_node = create_node
if create_node is not None:
create_node() # pylint: disable=not-callable

while True:
# Receive
message = receive()
if message is None:
time.sleep(3) # Wait for 3s before asking again
continue
run_tracker.register_signal_handler()
while not run_tracker.interrupt:
try:
# Receive
message = receive()
if message is None:
time.sleep(3) # Wait for 3s before asking again
continue

log(INFO, "")
if len(message.metadata.group_id) > 0:
log(
INFO,
"[RUN %s, ROUND %s]",
message.metadata.run_id,
message.metadata.group_id,
)

log(INFO, "")
if len(message.metadata.group_id) > 0:
log(
INFO,
"[RUN %s, ROUND %s]",
message.metadata.run_id,
message.metadata.group_id,
"Received: %s message %s",
message.metadata.message_type,
message.metadata.message_id,
)
log(
INFO,
"Received: %s message %s",
message.metadata.message_type,
message.metadata.message_id,
)

# Handle control message
out_message, sleep_duration = handle_control_message(message)
if out_message:
send(out_message)
break

# Register context for this run
node_state.register_context(run_id=message.metadata.run_id)

# Retrieve context for this run
context = node_state.retrieve_context(run_id=message.metadata.run_id)
# Handle control message
out_message, sleep_duration = handle_control_message(message)
if out_message:
send(out_message)
break

# Create an error reply message that will never be used to prevent
# the used-before-assignment linting error
reply_message = message.create_error_reply(
error=Error(code=ErrorCode.UNKNOWN, reason="Unknown")
)

# Handle app loading and task message
try:
# Load ClientApp instance
client_app: ClientApp = load_client_app_fn()

# Execute ClientApp
reply_message = client_app(message=message, context=context)
except Exception as ex: # pylint: disable=broad-exception-caught

# Legacy grpc-bidi
if transport in ["grpc-bidi", None]:
log(ERROR, "Client raised an exception.", exc_info=ex)
# Raise exception, crash process
raise ex

# Don't update/change NodeState

e_code = ErrorCode.CLIENT_APP_RAISED_EXCEPTION
# Reason example: "<class 'ZeroDivisionError'>:<'division by zero'>"
reason = str(type(ex)) + ":<'" + str(ex) + "'>"
exc_entity = "ClientApp"
if isinstance(ex, LoadClientAppError):
reason = (
"An exception was raised when attempting to load "
"`ClientApp`"
)
e_code = ErrorCode.LOAD_CLIENT_APP_EXCEPTION
exc_entity = "SuperNode"
# Register context for this run
node_state.register_context(run_id=message.metadata.run_id)

log(ERROR, "%s raised an exception", exc_entity, exc_info=ex)
# Retrieve context for this run
context = node_state.retrieve_context(
run_id=message.metadata.run_id
)

# Create error message
# Create an error reply message that will never be used to prevent
# the used-before-assignment linting error
reply_message = message.create_error_reply(
error=Error(code=e_code, reason=reason)
)
else:
# No exception, update node state
node_state.update_context(
run_id=message.metadata.run_id,
context=context,
error=Error(code=ErrorCode.UNKNOWN, reason="Unknown")
)

# Send
send(reply_message)
log(INFO, "Sent reply")
# Handle app loading and task message
try:
# Load ClientApp instance
client_app: ClientApp = load_client_app_fn()

# Execute ClientApp
reply_message = client_app(message=message, context=context)
except Exception as ex: # pylint: disable=broad-exception-caught

# Legacy grpc-bidi
if transport in ["grpc-bidi", None]:
log(ERROR, "Client raised an exception.", exc_info=ex)
# Raise exception, crash process
raise ex

# Don't update/change NodeState

e_code = ErrorCode.CLIENT_APP_RAISED_EXCEPTION
# Reason example:
# "<class 'ZeroDivisionError'>:<'division by zero'>"
reason = str(type(ex)) + ":<'" + str(ex) + "'>"
exc_entity = "ClientApp"
if isinstance(ex, LoadClientAppError):
reason = (
"An exception was raised when attempting to load "
"`ClientApp`"
)
e_code = ErrorCode.LOAD_CLIENT_APP_EXCEPTION
exc_entity = "SuperNode"

if not run_tracker.interrupt:
log(
ERROR, "%s raised an exception", exc_entity, exc_info=ex
)

# Create error message
reply_message = message.create_error_reply(
error=Error(code=e_code, reason=reason)
)
else:
# No exception, update node state
node_state.update_context(
run_id=message.metadata.run_id,
context=context,
)

# Unregister node
if delete_node is not None:
delete_node() # pylint: disable=not-callable
# Send
send(reply_message)
log(INFO, "Sent reply")

except StopIteration:
sleep_duration = 0
break

if sleep_duration == 0:
log(INFO, "Disconnect and shut down")
del run_tracker
break

# Sleep and reconnect afterwards
log(
INFO,
Expand Down