Skip to content

Commit 1f59627

Browse files
add Queue
1 parent 1fdf78f commit 1f59627

File tree

3 files changed

+709
-2
lines changed

3 files changed

+709
-2
lines changed

src/asyncgui_ext/synctools/all.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
__all__ = (
2-
'Event', 'Box',
2+
'Event', 'Box', 'Queue',
33
)
44
from .event import Event
55
from .box import Box
6-
6+
from .queue import Queue

src/asyncgui_ext/synctools/queue.py

+357
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
'''
2+
.. versionadded:: 0.2.1
3+
4+
.. code::
5+
6+
import asyncgui as ag
7+
from asyncgui_ext.synctools.queue import Queue
8+
9+
async def producer(q):
10+
for c in "ABC":
11+
await q.put(c)
12+
print('produced', c)
13+
14+
async def consumer(q):
15+
async for c in q:
16+
print('consumed', c)
17+
18+
q = Queue(capacity=1)
19+
ag.start(producer(q))
20+
ag.start(consumer(q))
21+
22+
.. code:: text
23+
24+
produced A
25+
produced B
26+
consumed A
27+
produced C
28+
consumed B
29+
consumed C
30+
31+
癖 -- Quirk --
32+
-----------------
33+
34+
.. code::
35+
36+
async def async_fn1(q, consumed):
37+
await q.put('A')
38+
await q.put('B')
39+
item = await q.get()
40+
consumed.append(item)
41+
await q.put('C')
42+
item = await q.get()
43+
consumed.append(item)
44+
45+
async def async_fn2(q, consumed):
46+
item = await q.get()
47+
consumed.append(item)
48+
49+
consumed = []
50+
q = Queue(capacity=1)
51+
ag.start(async_fn1(q, consumed))
52+
ag.start(async_fn2(q, consumed))
53+
print(consumed)
54+
55+
.. code:: text
56+
57+
['B', 'C', 'A']
58+
59+
上記の出力を見てわかるように ``A``, ``B``, ``C`` の順でキューに入れたのに
60+
``consumed`` には ``B``, ``C``, ``A`` の順で入っています。
61+
このような事が起こるのは ``asyncgui`` が自身ではメインループを持たない故にタイマー機能を提供できない事に起因します。
62+
なので外部のタイマー機能を利用する事でこの問題を解消する選択肢を用意する予定なのですが、それまではこういうものだと諦めてください。
63+
因みに ``Kivy`` を使っているのであれば ``Kivy`` のタイマー機能を用いる事でこの問題を解決済みの ``asynckivy-ext-queue``
64+
というモジュールが既にあるので氣になる人はそちらをご利用ください。
65+
'''
66+
67+
__all__ = (
68+
'QueueException', 'WouldBlock', 'Closed',
69+
'Queue', 'Order', 'QueueState',
70+
)
71+
import typing as T
72+
import enum
73+
import heapq
74+
from functools import partial
75+
from collections import deque
76+
77+
from asyncgui import AsyncEvent
78+
79+
80+
class QueueState(enum.Enum):
81+
'''
82+
Enum class that represents the state of the Queue.
83+
'''
84+
85+
OPENED = enum.auto()
86+
'''
87+
All operations are allowed.
88+
89+
:meta hide-value:
90+
'''
91+
92+
HALF_CLOSED = enum.auto()
93+
'''
94+
Putting an item into the queue is not allowed.
95+
96+
:meta hide-value:
97+
'''
98+
99+
CLOSED = enum.auto()
100+
'''
101+
Putting an item into the queue is not allowed.
102+
Getting an item from the queue is not allowed.
103+
104+
:meta hide-value:
105+
'''
106+
107+
108+
class QueueException(Exception):
109+
'''Base class of all the queue-related exceptions.'''
110+
111+
112+
class WouldBlock(QueueException):
113+
'''Raised by X_nowait functions if X would block.'''
114+
115+
116+
class Closed(QueueException):
117+
'''
118+
Occurs when:
119+
120+
* one tries to get an item from a queue that is in the ``CLOSED`` state.
121+
* one tries to get an item from an **empty** queue that is in the ``HALF_CLOSED`` state.
122+
* one tries to put an item into a queue that is in the ``CLOSED`` or ``HALF_CLOSED`` state.
123+
'''
124+
125+
126+
Item: T.TypeAlias = T.Any
127+
Order = T.Literal['fifo', 'lifo', 'small-first']
128+
129+
130+
class Queue:
131+
'''
132+
:param capacity: Cannot be zero. Unlimited if None.
133+
'''
134+
def __init__(self, *, capacity: int | None=None, order: Order='fifo'):
135+
if capacity is None:
136+
pass
137+
elif (not isinstance(capacity, int)) or capacity < 1:
138+
raise ValueError(f"'capacity' must be either a positive integer or None. (was {capacity!r})")
139+
self._init_container(capacity, order)
140+
self._state = QueueState.OPENED
141+
self._putters = deque[tuple[AsyncEvent, Item]]()
142+
self._getters = deque[AsyncEvent]()
143+
self._capacity = capacity
144+
self._order = order
145+
self._is_transferring = False
146+
147+
def _init_container(self, capacity, order):
148+
# If the capacity is 1, there is no point in reordering items.
149+
# Therefore, for performance reasons, treat the order as 'lifo'.
150+
if capacity == 1 or order == 'lifo':
151+
c = []
152+
c_get = c.pop
153+
c_put = c.append
154+
elif order == 'fifo':
155+
c = deque(maxlen=capacity)
156+
c_get = c.popleft
157+
c_put = c.append
158+
elif order == 'small-first':
159+
c = []
160+
c_get = partial(heapq.heappop, c)
161+
c_put = partial(heapq.heappush, c)
162+
else:
163+
raise ValueError(f"'order' must be one of 'lifo', 'fifo' or 'small-first'. (was {order!r})")
164+
self._c = c
165+
self._c_get = c_get
166+
self._c_put = c_put
167+
168+
def __len__(self) -> int:
169+
return len(self._c)
170+
171+
size = property(__len__)
172+
'''Number of items in the queue. This equals to ``len(queue)``. '''
173+
174+
@property
175+
def capacity(self) -> int | None:
176+
'''Number of items allowed in the queue. None if unbounded.'''
177+
return self._capacity
178+
179+
@property
180+
def is_empty(self) -> bool:
181+
return not self._c
182+
183+
@property
184+
def is_full(self) -> bool:
185+
return len(self._c) == self._capacity
186+
187+
@property
188+
def order(self) -> Order:
189+
return self._order
190+
191+
async def get(self) -> T.Awaitable[Item]:
192+
'''
193+
.. code-block::
194+
195+
item = await queue.get()
196+
'''
197+
if self._state is QueueState.CLOSED:
198+
raise Closed
199+
if self._state is QueueState.HALF_CLOSED and self.is_empty:
200+
raise Closed
201+
202+
if self._is_transferring or self.is_empty:
203+
event = AsyncEvent()
204+
self._getters.append(event)
205+
exc, item = (await event.wait())[0]
206+
if exc is not None:
207+
raise exc
208+
return item
209+
210+
item = self._c_get()
211+
if self._putters:
212+
self._transfer_items()
213+
return item
214+
215+
def get_nowait(self) -> Item:
216+
'''
217+
.. code-block::
218+
219+
item = queue.get_nowait()
220+
'''
221+
if self._state is QueueState.CLOSED:
222+
raise Closed
223+
if self.is_empty:
224+
if self._state is QueueState.HALF_CLOSED:
225+
raise Closed
226+
raise WouldBlock
227+
228+
item = self._c_get()
229+
if (not self._is_transferring) and self._putters:
230+
self._transfer_items()
231+
return item
232+
233+
async def put(self, item) -> T.Awaitable:
234+
'''
235+
.. code-block::
236+
237+
await queue.put(item)
238+
'''
239+
if self._state is not QueueState.OPENED:
240+
raise Closed
241+
242+
if self._is_transferring or self.is_full:
243+
event = AsyncEvent()
244+
self._putters.append((event, item, ))
245+
exc = (await event.wait())[0][0]
246+
if exc is not None:
247+
raise exc
248+
return
249+
250+
self._c_put(item)
251+
if self._getters:
252+
self._transfer_items()
253+
254+
def put_nowait(self, item):
255+
'''
256+
.. code-block::
257+
258+
queue.put_nowait(item)
259+
'''
260+
if self._state is not QueueState.OPENED:
261+
raise Closed
262+
if self.is_full:
263+
raise WouldBlock
264+
265+
self._c_put(item)
266+
if (not self._is_transferring) and self._getters:
267+
self._transfer_items()
268+
269+
def half_close(self):
270+
'''
271+
Partially closes the queue.
272+
Putting an item into it is no longer allowed.
273+
'''
274+
if self._state is not QueueState.OPENED:
275+
return
276+
self._state = QueueState.HALF_CLOSED
277+
278+
Closed_ = Closed
279+
for putter, __ in self._putters:
280+
putter.fire(Closed_)
281+
if not self.is_empty:
282+
return
283+
for getter in self._getters:
284+
getter.fire(Closed_, None)
285+
286+
def close(self):
287+
'''
288+
Fully closes the queue.
289+
Putting an item into it is no longer allowed.
290+
Getting an item from it is no longer allowed.
291+
All the items it holds will be discarded.
292+
'''
293+
if self._state is QueueState.CLOSED:
294+
return
295+
self._state = QueueState.CLOSED
296+
self._c.clear()
297+
298+
Closed_ = Closed
299+
for putter, __ in self._putters:
300+
putter.fire(Closed_)
301+
for getter in self._getters:
302+
getter.fire(Closed_, None)
303+
304+
async def __aiter__(self):
305+
'''
306+
Repeats getting an item from the queue until it gets closed.
307+
308+
.. code-block::
309+
310+
async for item in queue:
311+
...
312+
313+
This is equivalent to:
314+
315+
.. code-block::
316+
317+
try:
318+
while True:
319+
item = await queue.get()
320+
...
321+
except Closed:
322+
pass
323+
'''
324+
try:
325+
while True:
326+
yield await self.get()
327+
except Closed:
328+
pass
329+
330+
def _transfer_items(self):
331+
assert not self._is_transferring
332+
self._is_transferring = True
333+
try:
334+
# LOAD_FAST
335+
c_put = self._c_put
336+
c_get = self._c_get
337+
putters = self._putters
338+
getters = self._getters
339+
next_putter = putters.popleft
340+
next_getter = getters.popleft
341+
342+
while True:
343+
while (not self.is_full) and putters:
344+
putter, item = next_putter()
345+
if (cb := putter._callback) is not None:
346+
c_put(item)
347+
cb(None)
348+
if (not getters) or self.is_empty:
349+
break
350+
while (not self.is_empty) and getters:
351+
getter = next_getter()
352+
if (cb := getter._callback) is not None:
353+
cb(None, c_get())
354+
if (not putters) or self.is_full:
355+
break
356+
finally:
357+
self._is_transferring = False

0 commit comments

Comments
 (0)