Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Discussion PR for HTTP/2 support (DO NOT MERGE). #882

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions aiohttp/protocol2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""Http2 related parsers and protocol."""

from wsgiref.handlers import format_date_time

import aiohttp.hdrs
import aiohttp.protocol

HttpVersion20 = aiohttp.protocol.HttpVersion(2, 0)


class HTTP2Parser:
def __init__(self, conn):
self._conn = conn

def __call__(self, out, buf):
"""
Receives data from the buffer, parses it, and then sends any events
out.

This only terminates when the connection is terminated by the remote
peer.
"""
while True:
# XXX: 65kb is totally arbitrary here: consider tuning.
data = yield from buf.readsome(size=65535)

if not data:
out.feed_eof()
break

events = self._conn.receive_data(data)
out.feed_data(events, len(data))


class Http2Message(aiohttp.protocol.HttpMessage):
"""
A HTTP/2-specific version of the ``HttpMessage`` ABC.
"""
HOP_HEADERS = []

def __init__(self, conn, transport, stream_id):
self._conn = conn
self._stream_id = stream_id
super().__init__(transport, version=HttpVersion20, close=False)

def keep_alive(self):
return True

def add_header(self, name, value):
# HTTP/2 doesn't do chunked.
if name == aiohttp.hdrs.TRANSFER_ENCODING:
return

# Nor does it do Connection.
if name == aiohttp.hdrs.CONNECTION:
return

return super().add_header(name, value)

def send_headers(self, *args, **kwargs):
"""
A complete override of the equivalent method from the ABC.
"""
assert not self.headers_sent, 'headers have been sent already'
self.headers_sent = True

# We always use either the EOF payload writer or the length payload
# writer.
self.writer = self._write_h2_payload()
next(self.writer)
self._add_default_headers()

# Send the headers.
headers = [(':status', str(self.status))]
headers.extend(self.headers.items())
self._conn.send_headers(stream_id=self._stream_id, headers=headers)
headers_data = self._conn.data_to_send()

self.output_length += len(headers_data)
self.headers_length = len(headers_data)
self.transport.write(headers_data)

def write(self, chunk, *,
drain=False, EOF_MARKER=aiohttp.protocol.EOF_MARKER,
EOL_MARKER=aiohttp.protocol.EOL_MARKER):
"""Writes chunk of data to a stream by using different writers.

writer uses filter to modify chunk of data.
write_eof() indicates end of stream.
writer can't be used after write_eof() method being called.
write() return drain future.
"""
assert (isinstance(chunk, (bytes, bytearray)) or
chunk is EOF_MARKER), chunk

size = self.output_length

if self._send_headers and not self.headers_sent:
self.send_headers()

assert self.writer is not None, 'send_headers() is not called.'

if self.filter:
chunk = self.filter.send(chunk)
while chunk not in (EOF_MARKER, EOL_MARKER):
if chunk:
self.writer.send(chunk)
chunk = next(self.filter)
else:
if chunk is not EOF_MARKER:
self.writer.send(chunk)

self._output_size += self.output_length - size

if self._output_size > 64 * 1024:
if drain:
self._output_size = 0
return self.transport.drain()

return ()

def _write_h2_payload(self):
while True:
try:
chunk = yield
except aiohttp.EofStream:
break

self._conn.send_data(stream_id=self._stream_id, data=chunk)
sent_data = self._conn.data_to_send()
self.transport.write(sent_data)
self.output_length += len(sent_data)

self._conn.end_stream(stream_id=self._stream_id)
sent_data = self._conn.data_to_send()
self.transport.write(sent_data)

def _add_default_headers(self):
# This is a no-op for HTTP/2, we don't want to add Connection headers.
return


class Http2Response(Http2Message):
"""
A HTTP/2-equivalent of aiohttp.protocol.HttpResponse
"""
def __init__(self, conn, transport, status, stream_id):
self._status = status
super().__init__(conn, transport, stream_id)

def status_line(self):
return ""

@staticmethod
def calc_reason(*args, **kwargs):
return ""

@property
def status(self):
return self._status

@property
def reason(self):
return ""

def autochunked(self):
return False

def _add_default_headers(self):
super()._add_default_headers()

if aiohttp.hdrs.DATE not in self.headers:
# format_date_time(None) is quite expensive
self.headers.setdefault(aiohttp.hdrs.DATE, format_date_time(None))
self.headers.setdefault(aiohttp.hdrs.SERVER, self.SERVER_SOFTWARE)
116 changes: 116 additions & 0 deletions aiohttp/server2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""HTTP/2 Server"""

import asyncio

import h2.connection
import h2.events

import aiohttp.errors
import aiohttp.hdrs
import aiohttp.helpers
import aiohttp.protocol
import aiohttp.protocol2
import aiohttp.server
import aiohttp.streams

from multidict import CIMultiDict


class ServerHTTP2Protocol(aiohttp.server.ServerHttpProtocol):
"""
A class that implements ServerHTTPProtocol for HTTP/2 servers.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self._streams = {}
self._conn = None

@asyncio.coroutine
def start(self):
"""
Start processing incoming requests.

Unlike in the case of ServerHTTPProtocol, this loop can repeatedly
handle many requests in parallel.
"""
conn = self._conn = h2.connection.H2Connection(
client_side=False, header_encoding=None
)
conn.initiate_connection()

eventstream = self.reader.set_parser(
aiohttp.protocol2.HTTP2Parser(conn)
)

while True:
try:
events = yield from eventstream.read()
except aiohttp.errors.ClientDisconnectedError:
# Unclean termination, but only a problem if streams are open.
# TODO: check that there are none.
break

# TODO: Refactor the event handlers to be multiple methods.
# TODO: Flow control.
# TODO: Priority.
for event in events:
if isinstance(event, h2.events.RequestReceived):
hdrs = CIMultiDict(
(
k.decode('utf-8', 'surrogateescape'),
v.decode('utf-8', 'surrogateescape'),
)
for k, v in event.headers
)
# These pops need to be capitalised, because of
# https://github.com/aio-libs/multidict/issues/1
host = hdrs.pop(':AUTHORITY')
hdrs.pop(':SCHEME')
hdrs[aiohttp.hdrs.HOST] = host

# TODO: This isn't quite right, but it's close enough
# for now.
encoding = hdrs.get('content-encoding', None)
content_length = hdrs.get('content-length', 0)

msg = aiohttp.protocol.RawRequestMessage(
method=hdrs.pop(':METHOD'),
path=hdrs.pop(':PATH'),
version=aiohttp.protocol2.HttpVersion20,
headers=hdrs,
raw_headers=event.headers,
should_close=False,
compression=encoding,
)
if content_length:
reader = aiohttp.streams.StreamReader(loop=self._loop)
else:
reader = aiohttp.server.EMPTY_PAYLOAD

self._streams[event.stream_id] = reader

# Dispatch the handler
aiohttp.helpers.ensure_future(
self.handle_request(msg, reader, event.stream_id)
)
elif isinstance(event, h2.events.DataReceived):
reader = self._streams[event.stream_id]
reader.feed_data(event.data)
elif isinstance(event, h2.events.StreamEnded):
reader = self._streams.pop(event.stream_id)
reader.feed_eof()
elif isinstance(event, h2.events.StreamReset):
try:
reader = self._streams.pop(event.stream_id)
except KeyError:
pass
else:
reader.feed_eof()
elif isinstance(event, h2.events.ConnectionTerminated):
# TODO: Logging of errors is good.
break

data = conn.data_to_send()
if data:
self.transport.write(data)
100 changes: 100 additions & 0 deletions aiohttp/web2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""HTTP/2 version of classes from web.py"""

import asyncio

import aiohttp.abc
import aiohttp.hdrs
import aiohttp.web
import aiohttp.web_exceptions
import aiohttp.web_reqrep
import aiohttp.server2


class Http2RequestHandler(aiohttp.server2.ServerHTTP2Protocol):

_meth = 'none'
_path = 'none'

def __init__(self, manager, app, router, *,
secure_proxy_ssl_header=None, **kwargs):
super().__init__(**kwargs)

self._manager = manager
self._app = app
self._router = router
self._middlewares = app.middlewares
self._secure_proxy_ssl_header = secure_proxy_ssl_header

def __repr__(self):
return "<{} {}:{} {}>".format(
self.__class__.__name__, self._meth, self._path,
'connected' if self.transport is not None else 'disconnected')

def connection_made(self, transport):
super().connection_made(transport)

self._manager.connection_made(self, transport)

def connection_lost(self, exc):
self._manager.connection_lost(self, exc)

super().connection_lost(exc)

@asyncio.coroutine
def handle_request(self, message, payload, stream_id):
if self.access_log:
now = self._loop.time()

app = self._app
request = aiohttp.web_reqrep.Request(
app, message, payload,
self.transport, self.reader, self.writer,
secure_proxy_ssl_header=self._secure_proxy_ssl_header,
h2_conn=self._conn, h2_stream_id=stream_id)
self._meth = request.method
self._path = request.path
try:
match_info = yield from self._router.resolve(request)

assert isinstance(match_info, aiohttp.abc.AbstractMatchInfo), \
match_info

resp = None
request._match_info = match_info
expect = request.headers.get(aiohttp.hdrs.EXPECT)
if expect:
resp = (
yield from match_info.expect_handler(request))

if resp is None:
handler = match_info.handler
for factory in reversed(self._middlewares):
handler = yield from factory(app, handler)
resp = yield from handler(request)

assert isinstance(resp, aiohttp.web_reqrep.StreamResponse), \
("Handler {!r} should return response instance, "
"got {!r} [middlewares {!r}]").format(
match_info.handler, type(resp), self._middlewares)
except aiohttp.web_exceptions.HTTPException as exc:
resp = exc

resp_msg = yield from resp.prepare(request)
yield from resp.write_eof()

# notify server about keep-alive
self.keep_alive(resp_msg.keep_alive())

# log access
if self.access_log:
self.log_access(message, None, resp_msg, self._loop.time() - now)

# for repr
self._meth = 'none'
self._path = 'none'


class Http2RequestHandlerFactory(aiohttp.web.RequestHandlerFactory):
def __init__(self, *args, **kwargs):
kwargs['handler'] = Http2RequestHandler
super().__init__(*args, **kwargs)
Loading