From a86fe4f380f7beae4587c2980478fe36234d802f Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Fri, 20 May 2016 10:14:45 +0100 Subject: [PATCH 1/2] Very simple HTTP/2 stub. --- aiohttp/protocol2.py | 175 +++++++++++++++++++++++++++++++++++++++++ aiohttp/server2.py | 116 +++++++++++++++++++++++++++ aiohttp/web2.py | 100 +++++++++++++++++++++++ aiohttp/web_reqrep.py | 5 +- aiohttp/web_reqrep2.py | 37 +++++++++ 5 files changed, 432 insertions(+), 1 deletion(-) create mode 100644 aiohttp/protocol2.py create mode 100644 aiohttp/server2.py create mode 100644 aiohttp/web2.py create mode 100644 aiohttp/web_reqrep2.py diff --git a/aiohttp/protocol2.py b/aiohttp/protocol2.py new file mode 100644 index 00000000000..4750a30396b --- /dev/null +++ b/aiohttp/protocol2.py @@ -0,0 +1,175 @@ +"""Http2 related parsers and protocol.""" + +from wsgiref.handlers import format_date_time + +import aiohttp.hdrs +import aiohttp.protocol + +HttpVersion20 = aiohttp.protocol.HttpVersion(2, 0) + + +class HTTP2Parser: + def __init__(self, conn): + self._conn = conn + + def __call__(self, out, buf): + """ + Receives data from the buffer, parses it, and then sends any events + out. + + This only terminates when the connection is terminated by the remote + peer. + """ + while True: + # XXX: 65kb is totally arbitrary here: consider tuning. + data = yield from buf.readsome(size=65535) + + if not data: + out.feed_eof() + break + + events = self._conn.receive_data(data) + out.feed_data(events, len(data)) + + +class Http2Message(aiohttp.protocol.HttpMessage): + """ + A HTTP/2-specific version of the ``HttpMessage`` ABC. + """ + HOP_HEADERS = [] + + def __init__(self, conn, transport, stream_id): + self._conn = conn + self._stream_id = stream_id + super().__init__(transport, version=HttpVersion20, close=False) + + def keep_alive(self): + return True + + def add_header(self, name, value): + # HTTP/2 doesn't do chunked. + if name == aiohttp.hdrs.TRANSFER_ENCODING: + return + + # Nor does it do Connection. + if name == aiohttp.hdrs.CONNECTION: + return + + return super().add_header(name, value) + + def send_headers(self, *args, **kwargs): + """ + A complete override of the equivalent method from the ABC. + """ + assert not self.headers_sent, 'headers have been sent already' + self.headers_sent = True + + # We always use either the EOF payload writer or the length payload + # writer. + self.writer = self._write_h2_payload() + next(self.writer) + self._add_default_headers() + + # Send the headers. + headers = [(':status', str(self.status))] + headers.extend(self.headers.items()) + self._conn.send_headers(stream_id=self._stream_id, headers=headers) + headers_data = self._conn.data_to_send() + + self.output_length += len(headers_data) + self.headers_length = len(headers_data) + self.transport.write(headers_data) + + def write(self, chunk, *, + drain=False, EOF_MARKER=aiohttp.protocol.EOF_MARKER, + EOL_MARKER=aiohttp.protocol.EOL_MARKER): + """Writes chunk of data to a stream by using different writers. + + writer uses filter to modify chunk of data. + write_eof() indicates end of stream. + writer can't be used after write_eof() method being called. + write() return drain future. + """ + assert (isinstance(chunk, (bytes, bytearray)) or + chunk is EOF_MARKER), chunk + + size = self.output_length + + if self._send_headers and not self.headers_sent: + self.send_headers() + + assert self.writer is not None, 'send_headers() is not called.' + + if self.filter: + chunk = self.filter.send(chunk) + while chunk not in (EOF_MARKER, EOL_MARKER): + if chunk: + self.writer.send(chunk) + chunk = next(self.filter) + else: + if chunk is not EOF_MARKER: + self.writer.send(chunk) + + self._output_size += self.output_length - size + + if self._output_size > 64 * 1024: + if drain: + self._output_size = 0 + return self.transport.drain() + + return () + + def _write_h2_payload(self): + while True: + try: + chunk = yield + except aiohttp.EofStream: + break + + self._conn.send_data(stream_id=self._stream_id, data=chunk) + sent_data = self._conn.data_to_send() + self.transport.write(sent_data) + self.output_length += len(sent_data) + + self._conn.end_stream(stream_id=self._stream_id) + sent_data = self._conn.data_to_send() + self.transport.write(sent_data) + + def _add_default_headers(self): + # This is a no-op for HTTP/2, we don't want to add Connection headers. + return + + +class Http2Response(Http2Message): + """ + A HTTP/2-equivalent of aiohttp.protocol.HttpResponse + """ + def __init__(self, conn, transport, status, stream_id): + self._status = status + super().__init__(conn, transport, stream_id) + + def status_line(self): + return "" + + @staticmethod + def calc_reason(*args, **kwargs): + return "" + + @property + def status(self): + return self._status + + @property + def reason(self): + return "" + + def autochunked(self): + return False + + def _add_default_headers(self): + super()._add_default_headers() + + if aiohttp.hdrs.DATE not in self.headers: + # format_date_time(None) is quite expensive + self.headers.setdefault(aiohttp.hdrs.DATE, format_date_time(None)) + self.headers.setdefault(aiohttp.hdrs.SERVER, self.SERVER_SOFTWARE) diff --git a/aiohttp/server2.py b/aiohttp/server2.py new file mode 100644 index 00000000000..2a382b9b28b --- /dev/null +++ b/aiohttp/server2.py @@ -0,0 +1,116 @@ +"""HTTP/2 Server""" + +import asyncio + +import h2.connection +import h2.events + +import aiohttp.errors +import aiohttp.hdrs +import aiohttp.helpers +import aiohttp.protocol +import aiohttp.protocol2 +import aiohttp.server +import aiohttp.streams + +from multidict import CIMultiDict + + +class ServerHTTP2Protocol(aiohttp.server.ServerHttpProtocol): + """ + A class that implements ServerHTTPProtocol for HTTP/2 servers. + """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._streams = {} + self._conn = None + + @asyncio.coroutine + def start(self): + """ + Start processing incoming requests. + + Unlike in the case of ServerHTTPProtocol, this loop can repeatedly + handle many requests in parallel. + """ + conn = self._conn = h2.connection.H2Connection( + client_side=False, header_encoding=None + ) + conn.initiate_connection() + + eventstream = self.reader.set_parser( + aiohttp.protocol2.HTTP2Parser(conn) + ) + + while True: + try: + events = yield from eventstream.read() + except aiohttp.errors.ClientDisconnectedError: + # Unclean termination, but only a problem if streams are open. + # TODO: check that there are none. + break + + # TODO: Refactor the event handlers to be multiple methods. + # TODO: Flow control. + # TODO: Priority. + for event in events: + if isinstance(event, h2.events.RequestReceived): + hdrs = CIMultiDict( + ( + k.decode('utf-8', 'surrogateescape'), + v.decode('utf-8', 'surrogateescape'), + ) + for k, v in event.headers + ) + # These pops need to be capitalised, because of + # https://github.com/aio-libs/multidict/issues/1 + host = hdrs.pop(':AUTHORITY') + hdrs.pop(':SCHEME') + hdrs[aiohttp.hdrs.HOST] = host + + # TODO: This isn't quite right, but it's close enough + # for now. + encoding = hdrs.get('content-encoding', None) + content_length = hdrs.get('content-length', 0) + + msg = aiohttp.protocol.RawRequestMessage( + method=hdrs.pop(':METHOD'), + path=hdrs.pop(':PATH'), + version=aiohttp.protocol2.HttpVersion20, + headers=hdrs, + raw_headers=event.headers, + should_close=False, + compression=encoding, + ) + if content_length: + reader = aiohttp.streams.StreamReader(loop=self._loop) + else: + reader = aiohttp.server.EMPTY_PAYLOAD + + self._streams[event.stream_id] = reader + + # Dispatch the handler + aiohttp.helpers.ensure_future( + self.handle_request(msg, reader, event.stream_id) + ) + elif isinstance(event, h2.events.DataReceived): + reader = self._streams[event.stream_id] + reader.feed_data(event.data) + elif isinstance(event, h2.events.StreamEnded): + reader = self._streams.pop(event.stream_id) + reader.feed_eof() + elif isinstance(event, h2.events.StreamReset): + try: + reader = self._streams.pop(event.stream_id) + except KeyError: + pass + else: + reader.feed_eof() + elif isinstance(event, h2.events.ConnectionTerminated): + # TODO: Logging of errors is good. + break + + data = conn.data_to_send() + if data: + self.transport.write(data) diff --git a/aiohttp/web2.py b/aiohttp/web2.py new file mode 100644 index 00000000000..e8969cdd725 --- /dev/null +++ b/aiohttp/web2.py @@ -0,0 +1,100 @@ +"""HTTP/2 version of classes from web.py""" + +import asyncio + +import aiohttp.abc +import aiohttp.hdrs +import aiohttp.web +import aiohttp.web_exceptions +import aiohttp.web_reqrep +import aiohttp.server2 + + +class Http2RequestHandler(aiohttp.server2.ServerHTTP2Protocol): + + _meth = 'none' + _path = 'none' + + def __init__(self, manager, app, router, *, + secure_proxy_ssl_header=None, **kwargs): + super().__init__(**kwargs) + + self._manager = manager + self._app = app + self._router = router + self._middlewares = app.middlewares + self._secure_proxy_ssl_header = secure_proxy_ssl_header + + def __repr__(self): + return "<{} {}:{} {}>".format( + self.__class__.__name__, self._meth, self._path, + 'connected' if self.transport is not None else 'disconnected') + + def connection_made(self, transport): + super().connection_made(transport) + + self._manager.connection_made(self, transport) + + def connection_lost(self, exc): + self._manager.connection_lost(self, exc) + + super().connection_lost(exc) + + @asyncio.coroutine + def handle_request(self, message, payload, stream_id): + if self.access_log: + now = self._loop.time() + + app = self._app + request = aiohttp.web_reqrep.Request( + app, message, payload, + self.transport, self.reader, self.writer, + secure_proxy_ssl_header=self._secure_proxy_ssl_header, + h2_conn=self._conn, h2_stream_id=stream_id) + self._meth = request.method + self._path = request.path + try: + match_info = yield from self._router.resolve(request) + + assert isinstance(match_info, aiohttp.abc.AbstractMatchInfo), \ + match_info + + resp = None + request._match_info = match_info + expect = request.headers.get(aiohttp.hdrs.EXPECT) + if expect: + resp = ( + yield from match_info.expect_handler(request)) + + if resp is None: + handler = match_info.handler + for factory in reversed(self._middlewares): + handler = yield from factory(app, handler) + resp = yield from handler(request) + + assert isinstance(resp, aiohttp.web_reqrep.StreamResponse), \ + ("Handler {!r} should return response instance, " + "got {!r} [middlewares {!r}]").format( + match_info.handler, type(resp), self._middlewares) + except aiohttp.web_exceptions.HTTPException as exc: + resp = exc + + resp_msg = yield from resp.prepare(request) + yield from resp.write_eof() + + # notify server about keep-alive + self.keep_alive(resp_msg.keep_alive()) + + # log access + if self.access_log: + self.log_access(message, None, resp_msg, self._loop.time() - now) + + # for repr + self._meth = 'none' + self._path = 'none' + + +class Http2RequestHandlerFactory(aiohttp.web.RequestHandlerFactory): + def __init__(self, *args, **kwargs): + kwargs['handler'] = Http2RequestHandler + super().__init__(*args, **kwargs) diff --git a/aiohttp/web_reqrep.py b/aiohttp/web_reqrep.py index 68ab7fdf5f2..faee9aa3921 100644 --- a/aiohttp/web_reqrep.py +++ b/aiohttp/web_reqrep.py @@ -100,7 +100,8 @@ class Request(dict, HeadersMixin): hdrs.METH_TRACE, hdrs.METH_DELETE} def __init__(self, app, message, payload, transport, reader, writer, *, - secure_proxy_ssl_header=None): + secure_proxy_ssl_header=None, h2_conn=None, + h2_stream_id=None): self._app = app self._message = message self._transport = transport @@ -119,6 +120,8 @@ def __init__(self, app, message, payload, transport, reader, writer, *, self._has_body = not payload.at_eof() self._secure_proxy_ssl_header = secure_proxy_ssl_header + self._h2_conn = h2_conn + self._h2_stream_id = h2_stream_id @reify def scheme(self): diff --git a/aiohttp/web_reqrep2.py b/aiohttp/web_reqrep2.py new file mode 100644 index 00000000000..e2121c26240 --- /dev/null +++ b/aiohttp/web_reqrep2.py @@ -0,0 +1,37 @@ +"""HTTP/2 equivalent of web_reqrep.py""" + +import aiohttp.web_reqrep + +from aiohttp.protocol2 import Http2Response as ResponseImpl + + +class Http2Response(aiohttp.web_reqrep.Response): + """ + Overrides the basic response object to ensure that HTTP/2 is used. + """ + def _start(self, request): + self._req = request + keep_alive = self._keep_alive + if keep_alive is None: + keep_alive = request.keep_alive + self._keep_alive = keep_alive + + resp_impl = self._resp_impl = ResponseImpl( + request._h2_conn, + request._writer, + self._status, + request._h2_stream_id) + + self._copy_cookies() + + if self._compression: + self._start_compression(request) + + assert not self._chunked, "Chunked not supported in HTTP/2" + + headers = self.headers.items() + for key, val in headers: + resp_impl.add_header(key, val) + + resp_impl.send_headers() + return resp_impl From 28cfe9b4897a96e5338351e4821097cab090dd09 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Fri, 20 May 2016 10:15:15 +0100 Subject: [PATCH 2/2] Add h2 dependency. --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 27ebc0ae387..63b0fb3b7fd 100644 --- a/setup.py +++ b/setup.py @@ -54,7 +54,7 @@ def build_extension(self, ext): raise RuntimeError('Unable to determine version.') -install_requires = ['chardet', 'multidict'] +install_requires = ['chardet', 'multidict', 'h2'] if sys.version_info < (3, 4, 1): raise RuntimeError("aiohttp requires Python 3.4.1+")