Skip to content

Commit 05f333b

Browse files
authored
Update to latest SDK and dispatch-client versions (#51)
2 parents fdde894 + 11dd7b9 commit 05f333b

File tree

6 files changed

+53
-71
lines changed

6 files changed

+53
-71
lines changed

README.md

+9-6
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,20 @@ The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-pyth
2121

2222
```python
2323
import os
24-
import grpc.aio
24+
from frequenz.dispatch import Dispatcher, RunningState
2525
from unittest.mock import MagicMock
2626

2727
async def run():
28-
host = os.getenv("DISPATCH_API_HOST", "localhost")
29-
port = os.getenv("DISPATCH_API_PORT", "50051")
28+
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
29+
key = os.getenv("DISPATCH_API_KEY", "some-key")
3030

31-
service_address = f"{host}:{port}"
32-
grpc_channel = grpc.aio.insecure_channel(service_address)
3331
microgrid_id = 1
34-
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
32+
33+
dispatcher = Dispatcher(
34+
microgrid_id=microgrid_id,
35+
server_url=url,
36+
key=key
37+
)
3538
await dispatcher.start()
3639

3740
actor = MagicMock() # replace with your actor

RELEASE_NOTES.md

+5-2
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@
66

77
## Upgrading
88

9-
- The dispatch high level interface now depends on `frequenz-sdk` version `v1.0.0-rc801`.
9+
- The dispatch high level interface now depends on `frequenz-sdk` version `v1.0.0-rc900`.
10+
- We are now using the version `0.6.0` of the underlying `frequenz-client-dispatch` client library.
11+
- The init parameter of the `Dispatcher` class has been changed to accept a `server_url` instead.
1012

1113
## New Features
1214

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
15+
* Using the new dispatch client, we now have support for pagination in the dispatch list request.
16+
* The new client version also supports streaming, however it is not yet used internally in the high level interface.
1417

1518
## Bug Fixes
1619

mkdocs.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ plugins:
116116
# See https://mkdocstrings.github.io/python/usage/#import for details
117117
- https://docs.python.org/3/objects.inv
118118
- https://frequenz-floss.github.io/frequenz-channels-python/v1.0/objects.inv
119-
- https://frequenz-floss.github.io/frequenz-client-dispatch-python/v0.5/objects.inv
119+
- https://frequenz-floss.github.io/frequenz-client-dispatch-python/v0.6/objects.inv
120120
- https://frequenz-floss.github.io/frequenz-sdk-python/v1.0-pre/objects.inv
121121
- https://grpc.github.io/grpc/python/objects.inv
122122
- https://typing-extensions.readthedocs.io/en/stable/objects.inv

pyproject.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ dependencies = [
3939
# Make sure to update the version for cross-referencing also in the
4040
# mkdocs.yml file when changing the version here (look for the config key
4141
# plugins.mkdocstrings.handlers.python.import)
42-
"frequenz-sdk == 1.0.0-rc801",
42+
"frequenz-sdk == 1.0.0-rc900",
4343
"frequenz-channels >= 1.1.0, < 2.0.0",
44-
"frequenz-client-dispatch >= 0.5.0, < 0.6.0",
44+
"frequenz-client-dispatch >= 0.6.0, < 0.7.0",
4545
]
4646
dynamic = ["version"]
4747

src/frequenz/dispatch/_dispatcher.py

+15-38
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import abc
77
from typing import Protocol, TypeVar
88

9-
import grpc.aio
109
from frequenz.channels import Broadcast, Receiver
1110
from frequenz.client.dispatch import Client
1211

@@ -55,24 +54,18 @@ class Dispatcher:
5554
Example: Processing running state change dispatches
5655
```python
5756
import os
58-
import grpc.aio
5957
from frequenz.dispatch import Dispatcher, RunningState
6058
from unittest.mock import MagicMock
6159
6260
async def run():
63-
host = os.getenv("DISPATCH_API_HOST", "localhost")
64-
port = os.getenv("DISPATCH_API_PORT", "50051")
61+
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
6562
key = os.getenv("DISPATCH_API_KEY", "some-key")
6663
67-
service_address = f"{host}:{port}"
68-
grpc_channel = grpc.aio.secure_channel(
69-
service_address,
70-
credentials=grpc.ssl_channel_credentials()
71-
)
64+
microgrid_id = 1
65+
7266
dispatcher = Dispatcher(
73-
microgrid_id=1,
74-
grpc_channel=grpc_channel,
75-
svc_addr=service_address,
67+
microgrid_id=microgrid_id,
68+
server_url=url,
7669
key=key
7770
)
7871
await dispatcher.start()
@@ -112,23 +105,17 @@ async def run():
112105
import os
113106
from typing import assert_never
114107
115-
import grpc.aio
116108
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
117109
118110
async def run():
119-
host = os.getenv("DISPATCH_API_HOST", "localhost")
120-
port = os.getenv("DISPATCH_API_PORT", "50051")
111+
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
121112
key = os.getenv("DISPATCH_API_KEY", "some-key")
122113
123-
service_address = f"{host}:{port}"
124-
grpc_channel = grpc.aio.secure_channel(
125-
service_address,
126-
credentials=grpc.ssl_channel_credentials()
127-
)
114+
microgrid_id = 1
115+
128116
dispatcher = Dispatcher(
129-
microgrid_id=1,
130-
grpc_channel=grpc_channel,
131-
svc_addr=service_address,
117+
microgrid_id=microgrid_id,
118+
server_url=url,
132119
key=key
133120
)
134121
await dispatcher.start() # this will start the actor
@@ -154,27 +141,19 @@ async def run():
154141
import os
155142
from datetime import datetime, timedelta, timezone
156143
157-
import grpc.aio
158144
from frequenz.client.common.microgrid.components import ComponentCategory
159145
160146
from frequenz.dispatch import Dispatcher
161147
162148
async def run():
163-
host = os.getenv("DISPATCH_API_HOST", "localhost")
164-
port = os.getenv("DISPATCH_API_PORT", "50051")
149+
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
165150
key = os.getenv("DISPATCH_API_KEY", "some-key")
166151
167152
microgrid_id = 1
168153
169-
service_address = f"{host}:{port}"
170-
grpc_channel = grpc.aio.secure_channel(
171-
service_address,
172-
credentials=grpc.ssl_channel_credentials()
173-
)
174154
dispatcher = Dispatcher(
175155
microgrid_id=microgrid_id,
176-
grpc_channel=grpc_channel,
177-
svc_addr=service_address,
156+
server_url=url,
178157
key=key
179158
)
180159
await dispatcher.start() # this will start the actor
@@ -208,23 +187,21 @@ def __init__(
208187
self,
209188
*,
210189
microgrid_id: int,
211-
grpc_channel: grpc.aio.Channel,
212-
svc_addr: str,
190+
server_url: str,
213191
key: str,
214192
):
215193
"""Initialize the dispatcher.
216194
217195
Args:
218196
microgrid_id: The microgrid id.
219-
grpc_channel: The gRPC channel.
220-
svc_addr: The service address.
197+
server_url: The URL of the dispatch service.
221198
key: The key to access the service.
222199
"""
223200
self._running_state_channel = Broadcast[Dispatch](name="running_state_change")
224201
self._lifecycle_events_channel = Broadcast[DispatchEvent](
225202
name="lifecycle_events"
226203
)
227-
self._client = Client(grpc_channel=grpc_channel, svc_addr=svc_addr, key=key)
204+
self._client = Client(server_url=server_url, key=key)
228205
self._actor = DispatchingActor(
229206
microgrid_id,
230207
self._client,

src/frequenz/dispatch/actor.py

+21-22
Original file line numberDiff line numberDiff line change
@@ -89,28 +89,27 @@ async def _fetch(self) -> None:
8989

9090
try:
9191
_logger.info("Fetching dispatches for microgrid %s", self._microgrid_id)
92-
async for client_dispatch in self._client.list(
93-
microgrid_id=self._microgrid_id
94-
):
95-
dispatch = Dispatch(client_dispatch)
96-
97-
self._dispatches[dispatch.id] = Dispatch(client_dispatch)
98-
old_dispatch = old_dispatches.pop(dispatch.id, None)
99-
if not old_dispatch:
100-
self._update_dispatch_schedule(dispatch, None)
101-
_logger.info("New dispatch: %s", dispatch)
102-
await self._lifecycle_updates_sender.send(
103-
Created(dispatch=dispatch)
104-
)
105-
elif dispatch.update_time != old_dispatch.update_time:
106-
self._update_dispatch_schedule(dispatch, old_dispatch)
107-
_logger.info("Updated dispatch: %s", dispatch)
108-
await self._lifecycle_updates_sender.send(
109-
Updated(dispatch=dispatch)
110-
)
111-
112-
if self._running_state_change(dispatch, old_dispatch):
113-
await self._send_running_state_change(dispatch)
92+
async for page in self._client.list(microgrid_id=self._microgrid_id):
93+
for client_dispatch in page:
94+
dispatch = Dispatch(client_dispatch)
95+
96+
self._dispatches[dispatch.id] = Dispatch(client_dispatch)
97+
old_dispatch = old_dispatches.pop(dispatch.id, None)
98+
if not old_dispatch:
99+
self._update_dispatch_schedule(dispatch, None)
100+
_logger.info("New dispatch: %s", dispatch)
101+
await self._lifecycle_updates_sender.send(
102+
Created(dispatch=dispatch)
103+
)
104+
elif dispatch.update_time != old_dispatch.update_time:
105+
self._update_dispatch_schedule(dispatch, old_dispatch)
106+
_logger.info("Updated dispatch: %s", dispatch)
107+
await self._lifecycle_updates_sender.send(
108+
Updated(dispatch=dispatch)
109+
)
110+
111+
if self._running_state_change(dispatch, old_dispatch):
112+
await self._send_running_state_change(dispatch)
114113

115114
except grpc.aio.AioRpcError as error:
116115
_logger.error("Error fetching dispatches: %s", error)

0 commit comments

Comments
 (0)