Skip to content

Commit

Permalink
Merge branch 'main' into add-windows-ci
Browse files Browse the repository at this point in the history
  • Loading branch information
danieljanes authored Feb 13, 2025
2 parents 7c0bee1 + be506a3 commit 3e0f472
Show file tree
Hide file tree
Showing 14 changed files with 8 additions and 42 deletions.
1 change: 0 additions & 1 deletion src/proto/flwr/proto/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ message Task {
Node consumer = 2;
double created_at = 3;
string delivered_at = 4;
double pushed_at = 5;
double ttl = 6;
repeated string ancestry = 7;
string task_type = 8;
Expand Down
12 changes: 6 additions & 6 deletions src/py/flwr/proto/task_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions src/py/flwr/proto/task_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class Task(google.protobuf.message.Message):
CONSUMER_FIELD_NUMBER: builtins.int
CREATED_AT_FIELD_NUMBER: builtins.int
DELIVERED_AT_FIELD_NUMBER: builtins.int
PUSHED_AT_FIELD_NUMBER: builtins.int
TTL_FIELD_NUMBER: builtins.int
ANCESTRY_FIELD_NUMBER: builtins.int
TASK_TYPE_FIELD_NUMBER: builtins.int
Expand All @@ -32,7 +31,6 @@ class Task(google.protobuf.message.Message):
def consumer(self) -> flwr.proto.node_pb2.Node: ...
created_at: builtins.float
delivered_at: typing.Text
pushed_at: builtins.float
ttl: builtins.float
@property
def ancestry(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[typing.Text]: ...
Expand All @@ -47,15 +45,14 @@ class Task(google.protobuf.message.Message):
consumer: typing.Optional[flwr.proto.node_pb2.Node] = ...,
created_at: builtins.float = ...,
delivered_at: typing.Text = ...,
pushed_at: builtins.float = ...,
ttl: builtins.float = ...,
ancestry: typing.Optional[typing.Iterable[typing.Text]] = ...,
task_type: typing.Text = ...,
recordset: typing.Optional[flwr.proto.recordset_pb2.RecordSet] = ...,
error: typing.Optional[flwr.proto.error_pb2.Error] = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["consumer",b"consumer","error",b"error","producer",b"producer","recordset",b"recordset"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["ancestry",b"ancestry","consumer",b"consumer","created_at",b"created_at","delivered_at",b"delivered_at","error",b"error","producer",b"producer","pushed_at",b"pushed_at","recordset",b"recordset","task_type",b"task_type","ttl",b"ttl"]) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["ancestry",b"ancestry","consumer",b"consumer","created_at",b"created_at","delivered_at",b"delivered_at","error",b"error","producer",b"producer","recordset",b"recordset","task_type",b"task_type","ttl",b"ttl"]) -> None: ...
global___Task = Task

class TaskIns(google.protobuf.message.Message):
Expand Down
1 change: 0 additions & 1 deletion src/py/flwr/server/driver/inmemory_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ def push_messages(self, messages: Iterable[Message]) -> Iterable[str]:
# Convert Message to TaskIns
taskins = message_to_taskins(msg)
# Store in state
taskins.task.pushed_at = time.time()
task_id = self.state.store_task_ins(taskins)
if task_id:
task_ids.append(str(task_id))
Expand Down
1 change: 0 additions & 1 deletion src/py/flwr/server/driver/inmemory_driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ def get_replies(
msg = message_from_taskins(taskin)
reply_msg = msg.create_reply(RecordSet())
task_res = message_to_taskres(reply_msg)
task_res.task.pushed_at = time.time()
driver.state.store_task_res(task_res=task_res)

# Execute: Pull messages
Expand Down
6 changes: 1 addition & 5 deletions src/py/flwr/server/superlink/driver/serverappio_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import grpc

from flwr.common import ConfigsRecord, now
from flwr.common import ConfigsRecord
from flwr.common.constant import Status
from flwr.common.logger import log
from flwr.common.serde import (
Expand Down Expand Up @@ -151,9 +151,6 @@ def PushMessages(
context,
)

# Set pushed_at (timestamp in seconds)
pushed_at = now().timestamp()

# Validate request and insert in State
_raise_if(
validation_error=len(request.messages_list) == 0,
Expand All @@ -165,7 +162,6 @@ def PushMessages(
message_proto = request.messages_list.pop(0)
message = message_from_proto(message_proto=message_proto)
task_ins = message_to_taskins(message=message)
task_ins.task.pushed_at = pushed_at
validation_errors = validate_task_ins_or_res(task_ins)
_raise_if(
validation_error=bool(validation_errors),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
"""Fleet API message handlers."""


import time
from typing import Optional
from uuid import UUID

Expand Down Expand Up @@ -122,9 +121,6 @@ def push_messages(
if abort_msg:
raise InvalidRunStatusException(abort_msg)

# Set pushed_at (timestamp in seconds)
task_res.task.pushed_at = time.time()

# Store TaskRes in State
message_id: Optional[UUID] = state.store_task_res(task_res=task_res)

Expand Down
1 change: 0 additions & 1 deletion src/py/flwr/server/superlink/fleet/vce/vce_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ def worker(
# Convert to TaskRes
task_res = message_to_taskres(out_mssg)
# Store TaskRes in state
task_res.task.pushed_at = time.time()
taskres_queue.put(task_res)


Expand Down
4 changes: 0 additions & 4 deletions src/py/flwr/server/superlink/fleet/vce/vce_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@


import threading
import time
from itertools import cycle
from json import JSONDecodeError
from math import pi
Expand Down Expand Up @@ -160,9 +159,6 @@ def register_messages_into_state(
)
# Convert Message to TaskIns
taskins = message_to_taskins(message)
# Normally recorded by the driver servicer
# but since we don't have one in this test, we do this manually
taskins.task.pushed_at = time.time()
# Instert in state
task_id = state.store_task_ins(taskins)
if task_id:
Expand Down
3 changes: 0 additions & 3 deletions src/py/flwr/server/superlink/linkstate/linkstate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ def test_store_task_ins_one(self) -> None:

assert actual_task.delivered_at != ""

assert actual_task.created_at < actual_task.pushed_at
assert datetime.fromisoformat(actual_task.delivered_at) > datetime(
2020, 1, 1, tzinfo=timezone.utc
)
Expand Down Expand Up @@ -1128,7 +1127,6 @@ def create_task_ins(
created_at=time.time(),
),
)
task.task.pushed_at = time.time()
return task


Expand Down Expand Up @@ -1193,7 +1191,6 @@ def create_task_res(
created_at=time.time(),
),
)
task_res.task.pushed_at = time.time()
return task_res


Expand Down
6 changes: 0 additions & 6 deletions src/py/flwr/server/superlink/linkstate/sqlite_linkstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@
consumer_node_id INTEGER,
created_at REAL,
delivered_at TEXT,
pushed_at REAL,
ttl REAL,
ancestry TEXT,
task_type TEXT,
Expand All @@ -144,7 +143,6 @@
consumer_node_id INTEGER,
created_at REAL,
delivered_at TEXT,
pushed_at REAL,
ttl REAL,
ancestry TEXT,
task_type TEXT,
Expand Down Expand Up @@ -1053,7 +1051,6 @@ def task_ins_to_dict(task_msg: TaskIns) -> dict[str, Any]:
"consumer_node_id": task_msg.task.consumer.node_id,
"created_at": task_msg.task.created_at,
"delivered_at": task_msg.task.delivered_at,
"pushed_at": task_msg.task.pushed_at,
"ttl": task_msg.task.ttl,
"ancestry": ",".join(task_msg.task.ancestry),
"task_type": task_msg.task.task_type,
Expand All @@ -1072,7 +1069,6 @@ def task_res_to_dict(task_msg: TaskRes) -> dict[str, Any]:
"consumer_node_id": task_msg.task.consumer.node_id,
"created_at": task_msg.task.created_at,
"delivered_at": task_msg.task.delivered_at,
"pushed_at": task_msg.task.pushed_at,
"ttl": task_msg.task.ttl,
"ancestry": ",".join(task_msg.task.ancestry),
"task_type": task_msg.task.task_type,
Expand All @@ -1099,7 +1095,6 @@ def dict_to_task_ins(task_dict: dict[str, Any]) -> TaskIns:
),
created_at=task_dict["created_at"],
delivered_at=task_dict["delivered_at"],
pushed_at=task_dict["pushed_at"],
ttl=task_dict["ttl"],
ancestry=task_dict["ancestry"].split(","),
task_type=task_dict["task_type"],
Expand Down Expand Up @@ -1127,7 +1122,6 @@ def dict_to_task_res(task_dict: dict[str, Any]) -> TaskRes:
),
created_at=task_dict["created_at"],
delivered_at=task_dict["delivered_at"],
pushed_at=task_dict["pushed_at"],
ttl=task_dict["ttl"],
ancestry=task_dict["ancestry"].split(","),
task_type=task_dict["task_type"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def test_ins_res_to_dict(self) -> None:
"consumer_node_id",
"created_at",
"delivered_at",
"pushed_at",
"ttl",
"ancestry",
"task_type",
Expand Down
3 changes: 0 additions & 3 deletions src/py/flwr/server/utils/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ def validate_task_ins_or_res(tasks_ins_res: Union[TaskIns, TaskRes]) -> list[str
validation_errors.append("`delivered_at` must be an empty str")
if tasks_ins_res.task.ttl <= 0:
validation_errors.append("`ttl` must be higher than zero")
if tasks_ins_res.task.pushed_at < 1711497600.0:
# unix timestamp of 27 March 2024 00h:00m:00s UTC
validation_errors.append("`pushed_at` is not a recent timestamp")

# Verify TTL and created_at time
current_time = time.time()
Expand Down
2 changes: 0 additions & 2 deletions src/py/flwr/server/utils/validator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ def create_task_ins(
),
)

task.task.pushed_at = time.time()
return task


Expand All @@ -139,5 +138,4 @@ def create_task_res(
),
)

task_res.task.pushed_at = time.time()
return task_res

0 comments on commit 3e0f472

Please sign in to comment.