Skip to content

Commit 08c5628

Browse files
authored
Merge pull request #121 from psqlpy-python/feature/listen_notify_functionality
Added `Listener` class that supports `LISTEN` functionality in PostgreSQL.
2 parents 12f3c63 + 0167363 commit 08c5628

32 files changed

+1652
-228
lines changed

.github/workflows/test.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030
- uses: actions-rs/clippy-check@v1
3131
with:
3232
token: ${{ secrets.GITHUB_TOKEN }}
33-
args: -p psqlpy --all-features -- -W clippy::all -W clippy::pedantic -D warnings
33+
args: -p psqlpy --all-features -- -W clippy::all -W clippy::pedantic
3434
pytest:
3535
name: ${{matrix.job.os}}-${{matrix.py_version}}
3636
strategy:

.pre-commit-config.yaml

-2
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ repos:
5858
- clippy::all
5959
- -W
6060
- clippy::pedantic
61-
- -D
62-
- warnings
6361

6462
- id: check
6563
types:

Cargo.lock

+14-12
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+4-9
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,8 @@ crate-type = ["cdylib"]
1010

1111
[dependencies]
1212
deadpool-postgres = { git = "https://github.com/chandr-andr/deadpool.git", branch = "psqlpy" }
13-
pyo3 = { version = "*", features = [
14-
"chrono",
15-
"experimental-async",
16-
"rust_decimal",
17-
"py-clone",
18-
"gil-refs",
19-
"macros",
20-
] }
21-
pyo3-async-runtimes = { git = "https://github.com/chandr-andr/pyo3-async-runtimes.git", branch = "main", features = [
13+
pyo3 = { version = "0.23.4", features = ["chrono", "experimental-async", "rust_decimal", "py-clone", "macros"] }
14+
pyo3-async-runtimes = { git = "https://github.com/chandr-andr/pyo3-async-runtimes.git", branch = "psqlpy", features = [
2215
"tokio-runtime",
2316
] }
2417

@@ -59,3 +52,5 @@ pg_interval = { git = "https://github.com/chandr-andr/rust-postgres-interval.git
5952
pgvector = { git = "https://github.com/chandr-andr/pgvector-rust.git", branch = "psqlpy", features = [
6053
"postgres",
6154
] }
55+
futures-channel = "0.3.31"
56+
futures = "0.3.31"

docs/.vuepress/sidebar.ts

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export default sidebar({
2323
"connection",
2424
"transaction",
2525
"cursor",
26+
"listener",
2627
"results",
2728
"exceptions",
2829
],

docs/components/components_overview.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ title: Components
88
- `Connection`: represents single database connection, can be retrieved from `ConnectionPool`.
99
- `Transaction`: represents database transaction, can be made from `Connection`.
1010
- `Cursor`: represents database cursor, can be made from `Transaction`.
11+
- `Listener`: object to work with [LISTEN](https://www.postgresql.org/docs/current/sql-listen.html)/[NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html) functionality, can be mode from `ConnectionPool`.
1112
- `QueryResult`: represents list of results from database.
1213
- `SingleQueryResult`: represents single result from the database.
1314
- `Exceptions`: we have some custom exceptions.

docs/components/connection_pool.md

+12
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,18 @@ This is the preferable way to work with the PostgreSQL.
254254
:::
255255

256256

257+
### Listener
258+
259+
Create a new instance of a listener.
260+
261+
```python
262+
async def main() -> None:
263+
...
264+
listener = db_pool.listener()
265+
```
266+
```
267+
268+
257269
### Close
258270
259271
To close the connection pool at the stop of your application.

docs/components/exceptions.md

+18
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ stateDiagram-v2
1515
RustPSQLDriverPyBaseError --> BaseConnectionError
1616
RustPSQLDriverPyBaseError --> BaseTransactionError
1717
RustPSQLDriverPyBaseError --> BaseCursorError
18+
RustPSQLDriverPyBaseError --> BaseListenerError
1819
RustPSQLDriverPyBaseError --> RustException
1920
RustPSQLDriverPyBaseError --> RustToPyValueMappingError
2021
RustPSQLDriverPyBaseError --> PyToRustValueMappingError
@@ -44,6 +45,11 @@ stateDiagram-v2
4445
[*] --> CursorFetchError
4546
[*] --> CursorClosedError
4647
}
48+
state BaseListenerError {
49+
[*] --> ListenerStartError
50+
[*] --> ListenerClosedError
51+
[*] --> ListenerCallbackError
52+
}
4753
state RustException {
4854
[*] --> DriverError
4955
[*] --> MacAddrParseError
@@ -127,3 +133,15 @@ Error in cursor fetch (any fetch).
127133

128134
#### CursorClosedError
129135
Error if underlying connection is closed.
136+
137+
### BaseListenerError
138+
Base error for all Listener errors.
139+
140+
#### ListenerStartError
141+
Error if listener start failed.
142+
143+
#### ListenerClosedError
144+
Error if listener manipulated but it's closed
145+
146+
#### ListenerCallbackError
147+
Error if callback passed to listener isn't a coroutine

docs/components/listener.md

+204
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
---
2+
title: Listener
3+
---
4+
5+
`Listener` object allows users to work with [LISTEN](https://www.postgresql.org/docs/current/sql-listen.html)/[NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html) functionality.
6+
7+
## Usage
8+
9+
There are two ways of using `Listener` object:
10+
- Async iterator
11+
- Background task
12+
13+
::: tabs
14+
15+
@tab Background task
16+
```python
17+
from psqlpy import ConnectionPool, Connection, Listener
18+
19+
20+
db_pool = ConnectionPool(
21+
dsn="postgres://postgres:postgres@localhost:5432/postgres",
22+
)
23+
24+
async def test_channel_callback(
25+
connection: Connection,
26+
payload: str,
27+
channel: str,
28+
process_id: int,
29+
) -> None:
30+
# do some important staff
31+
...
32+
33+
async def main() -> None:
34+
# Create listener object
35+
listener: Listener = db_pool.listener()
36+
37+
# Add channel to listen and callback for it.
38+
await listener.add_callback(
39+
channel="test_channel",
40+
callback=test_channel_callback,
41+
)
42+
43+
# Startup the listener
44+
await listener.startup()
45+
46+
# Start listening.
47+
# `listen` method isn't blocking, it returns None and starts background
48+
# task in the Rust event loop.
49+
listener.listen()
50+
51+
# You can stop listening.
52+
listener.abort_listen()
53+
```
54+
55+
@tab Async Iterator
56+
```python
57+
from psqlpy import (
58+
ConnectionPool,
59+
Connection,
60+
Listener,
61+
ListenerNotificationMsg,
62+
)
63+
64+
65+
db_pool = ConnectionPool(
66+
dsn="postgres://postgres:postgres@localhost:5432/postgres",
67+
)
68+
69+
async def main() -> None:
70+
# Create listener object
71+
listener: Listener = db_pool.listener()
72+
73+
# Startup the listener
74+
await listener.startup()
75+
76+
listener_msg: ListenerNotificationMsg
77+
async for listener_msg in listener:
78+
print(listener_msg)
79+
```
80+
81+
:::
82+
83+
## Listener attributes
84+
85+
- `connection`: Instance of `Connection`.
86+
If `startup` wasn't called, raises `ListenerStartError`.
87+
88+
- `is_started`: Flag that shows whether the `Listener` is running or not.
89+
90+
## Listener methods
91+
92+
### Startup
93+
94+
Startup `Listener` instance and can be called once or again only after `shutdown`.
95+
96+
::: important
97+
`Listener` must be started up.
98+
:::
99+
100+
```python
101+
async def main() -> None:
102+
listener: Listener = db_pool.listener()
103+
104+
await listener.startup()
105+
```
106+
107+
### Shutdown
108+
Abort listen (if called) and release underlying connection.
109+
110+
```python
111+
async def main() -> None:
112+
listener: Listener = db_pool.listener()
113+
114+
await listener.startup()
115+
await listener.shutdown()
116+
```
117+
118+
### Add Callback
119+
120+
#### Parameters:
121+
- `channel`: name of the channel to listen.
122+
- `callback`: coroutine callback.
123+
124+
Add new callback to the channel, can be called more than 1 times.
125+
126+
Callback signature is like this:
127+
```python
128+
from psqlpy import Connection
129+
130+
async def callback(
131+
connection: Connection,
132+
payload: str,
133+
channel: str,
134+
process_id: int,
135+
) -> None:
136+
...
137+
```
138+
139+
Parameters for callback are based like `args`, so this signature is correct to:
140+
```python
141+
async def callback(
142+
connection: Connection,
143+
*args,
144+
) -> None:
145+
...
146+
```
147+
148+
**Example:**
149+
```python
150+
async def test_channel_callback(
151+
connection: Connection,
152+
payload: str,
153+
channel: str,
154+
process_id: int,
155+
) -> None:
156+
...
157+
158+
async def main() -> None:
159+
listener = db_pool.listener()
160+
161+
await listener.add_callback(
162+
channel="test_channel",
163+
callback=test_channel_callback,
164+
)
165+
```
166+
167+
### Clear Channel Callbacks
168+
169+
#### Parameters:
170+
- `channel`: name of the channel
171+
172+
Remove all callbacks for the channel
173+
174+
```python
175+
async def main() -> None:
176+
listener = db_pool.listener()
177+
await listener.clear_channel_callbacks()
178+
```
179+
180+
### Clear All Channels
181+
Clear all channels and callbacks.
182+
183+
```python
184+
async def main() -> None:
185+
listener = db_pool.listener()
186+
await listener.clear_all_channels()
187+
```
188+
189+
### Listen
190+
Start listening.
191+
192+
It's a non-blocking operation.
193+
In the background it creates task in Rust event loop.
194+
195+
```python
196+
async def main() -> None:
197+
listener = db_pool.listener()
198+
await listener.startup()
199+
await listener.listen()
200+
```
201+
202+
### Abort Listen
203+
Abort listen.
204+
If `listen()` method was called, stop listening, else don't do anything.

0 commit comments

Comments
 (0)