Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.11] Restore FlowControlDataQueue class #9963

Merged
merged 3 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES/9963.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Restored the ``FlowControlDataQueue`` class -- by :user:`bdraco`.

This class is no longer used internally, and will be permanently removed in the next major version.
2 changes: 2 additions & 0 deletions aiohttp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
EMPTY_PAYLOAD as EMPTY_PAYLOAD,
DataQueue as DataQueue,
EofStream as EofStream,
FlowControlDataQueue as FlowControlDataQueue,
StreamReader as StreamReader,
)
from .tracing import (
Expand Down Expand Up @@ -148,6 +149,7 @@
"ConnectionTimeoutError",
"ContentTypeError",
"Fingerprint",
"FlowControlDataQueue",
"InvalidURL",
"InvalidUrlClientError",
"InvalidUrlRedirectClientError",
Expand Down
43 changes: 43 additions & 0 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,3 +677,46 @@ async def read(self) -> _T:

def __aiter__(self) -> AsyncStreamIterator[_T]:
return AsyncStreamIterator(self.read)


class FlowControlDataQueue(DataQueue[_T]):
"""FlowControlDataQueue resumes and pauses an underlying stream.
It is a destination for parsed data.
This class is deprecated and will be removed in version 4.0.
"""

def __init__(
self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
) -> None:
super().__init__(loop=loop)
self._size = 0
self._protocol = protocol
self._limit = limit * 2

def feed_data(self, data: _T, size: int = 0) -> None:
super().feed_data(data, size)
self._size += size

if self._size > self._limit and not self._protocol._reading_paused:
self._protocol.pause_reading()

async def read(self) -> _T:
if not self._buffer and not self._eof:
assert not self._waiter
self._waiter = self._loop.create_future()
try:
await self._waiter
except (asyncio.CancelledError, asyncio.TimeoutError):
self._waiter = None
raise
if self._buffer:
data, size = self._buffer.popleft()
self._size -= size
if self._size < self._limit and self._protocol._reading_paused:
self._protocol.resume_reading()
return data
if self._exception is not None:
raise self._exception
raise EofStream
77 changes: 77 additions & 0 deletions tests/test_flowcontrol_streams.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from unittest import mock

import pytest
Expand All @@ -15,6 +16,13 @@ def stream(loop, protocol):
return streams.StreamReader(protocol, limit=1, loop=loop)


@pytest.fixture
def buffer(loop, protocol: mock.Mock) -> streams.FlowControlDataQueue:
out = streams.FlowControlDataQueue(protocol, limit=1, loop=loop)
out._allow_pause = True
return out


class TestFlowControlStreamReader:
async def test_read(self, stream) -> None:
stream.feed_data(b"da", 2)
Expand Down Expand Up @@ -103,3 +111,72 @@ async def test_read_nowait(self, stream) -> None:
res = stream.read_nowait(5)
assert res == b""
assert stream._protocol.resume_reading.call_count == 1 # type: ignore[attr-defined]


async def test_flow_control_data_queue_waiter_cancelled(
buffer: streams.FlowControlDataQueue,
) -> None:
"""Test that the waiter is cancelled it is cleared."""
task = asyncio.create_task(buffer.read())
await asyncio.sleep(0)
assert buffer._waiter is not None
buffer._waiter.cancel()

with pytest.raises(asyncio.CancelledError):
await task
assert buffer._waiter is None


async def test_flow_control_data_queue_has_buffer(
buffer: streams.FlowControlDataQueue,
) -> None:
"""Test reading from the buffer."""
data = object()
buffer.feed_data(data, 100)
assert buffer._size == 100
read_data = await buffer.read()
assert read_data is data
assert buffer._size == 0


async def test_flow_control_data_queue_read_with_exception(
buffer: streams.FlowControlDataQueue,
) -> None:
"""Test reading when the buffer is empty and an exception is set."""
buffer.set_exception(ValueError("unique_string"))
with pytest.raises(ValueError, match="unique_string"):
await buffer.read()


def test_flow_control_data_queue_feed_pause(
buffer: streams.FlowControlDataQueue,
) -> None:
"""Test feeding data and pausing the reader."""
buffer._protocol._reading_paused = False
buffer.feed_data(object(), 100)
assert buffer._protocol.pause_reading.called

buffer._protocol._reading_paused = True
buffer._protocol.pause_reading.reset_mock()
buffer.feed_data(object(), 100)
assert not buffer._protocol.pause_reading.called


async def test_flow_control_data_queue_resume_on_read(
buffer: streams.FlowControlDataQueue,
) -> None:
"""Test that the reader is resumed when reading."""
buffer.feed_data(object(), 100)

buffer._protocol._reading_paused = True
await buffer.read()
assert buffer._protocol.resume_reading.called


async def test_flow_control_data_queue_read_eof(
buffer: streams.FlowControlDataQueue,
) -> None:
"""Test that reading after eof raises EofStream."""
buffer.feed_eof()
with pytest.raises(streams.EofStream):
await buffer.read()
Loading