-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathio.py
134 lines (102 loc) · 3.95 KB
/
io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from asyncio import IncompleteReadError, StreamReader
from typing import Protocol, Union
def _copy_doc(source):
def decorate(target):
target.__doc__ = source.__doc__
return target
return decorate
class ParserBuffer(Protocol):
"""Protocol used by parsers to read from a bytes buffer."""
async def get(self, key: Union[int, slice]) -> bytes:
"""Get a range from from the buffer.
Blocks if the requested range is not available yet. If the requested
range will never become available (e.g. because it would go past the
end of file), as much as possible is returned.
Parameters
----------
key
Range to return. An integer ``i`` is treated as a single byte range
``slice(i, i + 1)``.
Returns
-------
:
A ``bytes`` object containing the requested range from the buffer.
This should always be a ``bytes`` objects even if a single integer
index was provided (this is in contrast to how indexing works on
``bytes`` objects). A smaller range than requested may be returned if
the buffer is unable to provide that range (however, the method must
block if that range may become available).
# noqa: DAR202
"""
def get_current(self) -> bytes:
"""Get all bytes currently stored in the buffer."""
def at_eof(self) -> bool:
"""Whether the end of file has been found.
If the end of file has been found, all possible bytes have been read
into the buffer and no new bytes will be added.
"""
class BytesBuffer:
"""Implements the `ParserBuffer` protocol for a ``bytes`` object.
The buffer is static, i.e. all bytes are already provided.
Parameters
----------
data:
Data for the buffer.
"""
def __init__(self, data: bytes):
self._data = data
@_copy_doc(ParserBuffer.get)
async def get(self, key: Union[int, slice]) -> bytes:
if not isinstance(key, slice):
key = slice(key, key + 1)
return self._data[key]
@_copy_doc(ParserBuffer.get_current)
def get_current(self) -> bytes:
return self._data
def at_eof(self) -> bool:
"""Always returns True as the complete buffer is provided at
construction time."""
return True
class StreamReaderBuffer:
"""Implements the `ParserBuffer` protocol for a :class:`asyncio.StreamReader`.
Parameters
----------
reader:
Reader providing the bytes to be read into the buffer.
"""
def __init__(self, reader: StreamReader):
self._reader = reader
self._buf = bytearray()
@_copy_doc(ParserBuffer.get)
async def get(self, key: Union[int, slice]) -> bytes:
if not isinstance(key, slice):
key = slice(key, key + 1)
if key.step is None or key.step > 0:
max_index = key.stop
elif key.start is not None:
max_index = key.start + 1
else:
max_index = None
if max_index is None or max_index < 0:
self._buf.extend(await self._reader.read())
elif len(self._buf) <= max_index:
try:
self._buf.extend(
await self._reader.readexactly(max_index - len(self._buf))
)
except IncompleteReadError as err:
self._buf.extend(err.partial)
return self._buf[key]
@_copy_doc(ParserBuffer.get_current)
def get_current(self) -> bytes:
return self._buf[:]
async def drop_prefix(self, n: int):
"""Drop the first *n* bytes in the buffer."""
if len(self._buf) < n:
await self._reader.readexactly(n - len(self._buf))
self._buf = bytearray()
else:
self._buf = bytearray(self._buf[n:])
@_copy_doc(ParserBuffer.at_eof)
def at_eof(self) -> bool:
return len(self._buf) == 0 and self._reader.at_eof()