From eb70ad666a5a460b3de9da2ba941c6963056788b Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Fri, 12 Jan 2024 19:42:04 +0100 Subject: [PATCH] Handle stopping --- src/Driver/DefaultHttpDriverFactory.php | 1 - src/Driver/Http3Driver.php | 132 +++++++++++++++------- src/Driver/Internal/Http3/Http3Parser.php | 2 +- src/Driver/Internal/Http3/Http3Writer.php | 11 +- 4 files changed, 103 insertions(+), 43 deletions(-) diff --git a/src/Driver/DefaultHttpDriverFactory.php b/src/Driver/DefaultHttpDriverFactory.php index 2d551d9b..46dc8233 100644 --- a/src/Driver/DefaultHttpDriverFactory.php +++ b/src/Driver/DefaultHttpDriverFactory.php @@ -37,7 +37,6 @@ public function createHttpDriver( streamTimeout: $this->streamTimeout, headerSizeLimit: $this->headerSizeLimit, bodySizeLimit: $this->bodySizeLimit, - pushEnabled: $this->pushEnabled, ); } diff --git a/src/Driver/Http3Driver.php b/src/Driver/Http3Driver.php index b3cccf3f..db9a76b4 100644 --- a/src/Driver/Http3Driver.php +++ b/src/Driver/Http3Driver.php @@ -2,15 +2,15 @@ namespace Amp\Http\Server\Driver; +use Amp\ByteStream\ClosedException; use Amp\ByteStream\ReadableIterableStream; +use Amp\CancelledException; +use Amp\DeferredCancellation; use Amp\DeferredFuture; -use Amp\Http\Http2\Http2ConnectionException; use Amp\Http\Http2\Http2Parser; -use Amp\Http\Http2\Http2StreamException; use Amp\Http\InvalidHeaderException; use Amp\Http\Server\ClientException; use Amp\Http\Server\Driver\Internal\ConnectionHttpDriver; -use Amp\Http\Server\Driver\Internal\Http2Stream; use Amp\Http\Server\Driver\Internal\Http3\Http3ConnectionException; use Amp\Http\Server\Driver\Internal\Http3\Http3Error; use Amp\Http\Server\Driver\Internal\Http3\Http3Frame; @@ -25,7 +25,6 @@ use Amp\Http\Server\RequestHandler; use Amp\Http\Server\Response; use Amp\Http\Server\Trailers; -use Amp\NullCancellation; use Amp\Pipeline\Queue; use Amp\Quic\QuicConnection; use Amp\Quic\QuicSocket; @@ -39,16 +38,16 @@ class Http3Driver extends ConnectionHttpDriver { - private bool $allowsPush; - private Client $client; - private QuicConnection $connection; /** @var \WeakMap */ private \WeakMap $requestStreams; private Http3Writer $writer; private QPack $qpack; + private int $highestStreamId = 0; + private bool $stopping = false; + private DeferredCancellation $closeCancellation; public function __construct( RequestHandler $requestHandler, @@ -57,15 +56,13 @@ public function __construct( private readonly int $streamTimeout = Http2Driver::DEFAULT_STREAM_TIMEOUT, private readonly int $headerSizeLimit = Http2Driver::DEFAULT_HEADER_SIZE_LIMIT, private readonly int $bodySizeLimit = Http2Driver::DEFAULT_BODY_SIZE_LIMIT, - private bool $pushEnabled = true, private readonly ?string $settings = null, ) { parent::__construct($requestHandler, $errorHandler, $logger); - $this->allowsPush = $pushEnabled; - $this->qpack = new QPack; $this->requestStreams = new \WeakMap; + $this->closeCancellation = new DeferredCancellation; } // TODO copied from Http2Driver... @@ -88,7 +85,6 @@ protected function write(Request $request, Response $response): void { /** @var QuicSocket $stream */ $stream = $this->requestStreams[$request]; - unset($this->requestStreams[$request]); $status = $response->getStatus(); $headers = [ @@ -108,9 +104,6 @@ protected function write(Request $request, Response $response): void foreach ($response->getPushes() as $push) { $headers["link"][] = "<{$push->getUri()}>; rel=preload"; - if ($this->allowsPush) { - // TODO $this->sendPushPromise($request, $id, $push); - } } $this->writer->sendHeaderFrame($stream, $this->encodeHeaders($headers)); @@ -119,23 +112,31 @@ protected function write(Request $request, Response $response): void return; } - $cancellation = new NullCancellation; // TODO just dummy + try { + $cancellation = $this->closeCancellation->getCancellation(); - $body = $response->getBody(); - $chunk = $body->read($cancellation); + $body = $response->getBody(); + $chunk = $body->read($cancellation); - while ($chunk !== null) { - $this->writer->sendData($stream, $chunk); + while ($chunk !== null) { + $this->writer->sendData($stream, $chunk); - $chunk = $body->read($cancellation); - } + $chunk = $body->read($cancellation); + } - if ($trailers !== null) { - $trailers = $trailers->await($cancellation); - $this->writer->sendHeaderFrame($stream, $this->encodeHeaders($trailers->getHeaders())); + if ($trailers !== null) { + $trailers = $trailers->await($cancellation); + $this->writer->sendHeaderFrame($stream, $this->encodeHeaders($trailers->getHeaders())); + } + + $stream->end(); + if (!$stream->isClosed()) { + $stream->endReceiving(); + } + } catch (CancelledException) { } - $stream->end(); + unset($this->requestStreams[$request]); } public function getApplicationLayerProtocols(): array @@ -149,11 +150,12 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti \assert(!isset($this->client), "The driver has already been setup"); $this->client = $client; - $this->connection = $connection; $this->writer = new Http3Writer($connection, [[Http3Settings::MAX_FIELD_SECTION_SIZE, $this->headerSizeLimit]]); $largestPushId = (1 << 62) - 1; $maxAllowedPushId = 0; + $connection->onClose($this->closeCancellation->cancel(...)); + $parser = new Http3Parser($connection, $this->headerSizeLimit, $this->qpack); try { foreach ($parser->process() as $frame) { @@ -164,11 +166,16 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti break; case Http3Frame::HEADERS: - EventLoop::queue(function () use ($parser, $frame) { + /** @var QuicSocket $stream */ + [, $stream, $generator] = $frame; + if ($this->stopping) { + [, $stream] = $frame; + $stream->close(Http3Error::H3_NO_ERROR->value); + break; + } + EventLoop::queue(function () use ($parser, $stream, $generator) { try { - /** @var QuicSocket $stream */ - $stream = $frame[1]; - $generator = $frame[2]; + $streamId = $stream->getId(); [$headers, $pseudo] = $generator->current(); foreach ($pseudo as $name => $value) { @@ -208,7 +215,7 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti $host = $matches[1]; $port = isset($matches[2]) - ? (int)$matches[2] + ? (int) $matches[2] : ($address instanceof InternetAddress ? $address->getPort() : null); if ($position = \strpos($target, "#")) { @@ -284,7 +291,7 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti ); } - $expectedLength = (int)$contentLength; + $expectedLength = (int) $contentLength; } else { $expectedLength = null; } @@ -314,6 +321,11 @@ function (int $bodySize) use (&$bodySizeLimit, &$dataSuspension) { $trailers ); $this->requestStreams[$request] = $stream; + + if ($this->highestStreamId < $streamId) { + $this->highestStreamId = $streamId; + } + async($this->handleRequest(...), $request); $generator->next(); @@ -400,17 +412,16 @@ function (int $bodySize) use (&$bodySizeLimit, &$dataSuspension) { break; case Http3Frame::GOAWAY: - $maxPushId = $frame[1]; + [, $maxPushId] = $frame; if ($maxPushId > $largestPushId) { $parser->abort(new Http3ConnectionException("A GOAWAY id must not be larger than a prior one", Http3Error::H3_ID_ERROR)); break; } - $this->pushEnabled = false; - // TODO abort pending server pushes + // Nothing to do here, we don't support pushes. break; case Http3Frame::MAX_PUSH_ID: - $maxPushId = $frame[1]; + [, $maxPushId] = $frame; if ($maxPushId < $maxAllowedPushId) { $parser->abort(new Http3ConnectionException("A MAX_PUSH_ID id must not be smaller than a prior one", Http3Error::H3_ID_ERROR)); break; @@ -419,8 +430,13 @@ function (int $bodySize) use (&$bodySizeLimit, &$dataSuspension) { break; case Http3Frame::CANCEL_PUSH: - $pushId = $frame[1]; - // TODO stop push + [, $pushId] = $frame; + // Without pushes sent, this frame is always invalid + $parser->abort(new Http3ConnectionException("An CANCEL_PUSH for a not promised $pushId was received", Http3Error::H3_ID_ERROR)); + break; + + case Http3Frame::PUSH_PROMISE: + $parser->abort(new Http3ConnectionException("A push stream must not be initiated by the client", Http3Error::H3_STREAM_CREATION_ERROR)); break; default: @@ -437,11 +453,47 @@ function (int $bodySize) use (&$bodySizeLimit, &$dataSuspension) { public function getPendingRequestCount(): int { - return 0; + return $this->requestStreams->count(); } public function stop(): void { - // TODO emit goaway frames + if ($this->stopping) { + return; + } + + $this->stopping = true; + $this->writer->sendGoaway($this->highestStreamId); + + /** @psalm-suppress RedundantCondition */ + \assert($this->logger->debug(\sprintf( + "Gracefully shutting down HTTP/3 client @ %s #%d; last-id: %d", + $this->client->getRemoteAddress()->toString(), + $this->client->getId(), + $this->highestStreamId, + )) || true); + + + $outstanding = $this->requestStreams->count(); + if ($outstanding === 0) { + $this->writer->close(); + return; + } + + $deferred = new DeferredFuture; + foreach ($this->requestStreams as $stream) { + $stream->onClose(function () use (&$outstanding, $deferred) { + if (--$outstanding === 0) { + $deferred->complete(); + } + }); + } + + try { + $deferred->getFuture()->await($this->closeCancellation->getCancellation()); + } catch (CancelledException) { + } finally { + $this->writer->close(); + } } } diff --git a/src/Driver/Internal/Http3/Http3Parser.php b/src/Driver/Internal/Http3/Http3Parser.php index 20e6581e..51665323 100644 --- a/src/Driver/Internal/Http3/Http3Parser.php +++ b/src/Driver/Internal/Http3/Http3Parser.php @@ -332,7 +332,7 @@ public function process(): ConcurrentIterator throw new Http3ConnectionException("The push stream was closed too early", Http3Error::H3_FRAME_ERROR); } } - $this->queue->push([Http3StreamType::Push, $pushId, fn () => $this->readHttpMessage($stream, $buf, $off)]); + $this->queue->push([Http3Frame::PUSH_PROMISE, $pushId, fn () => $this->readHttpMessage($stream, $buf, $off)]); break; // We don't do anything with these streams yet, but we must not close them according to RFC 9204 Section 4.2 diff --git a/src/Driver/Internal/Http3/Http3Writer.php b/src/Driver/Internal/Http3/Http3Writer.php index 790eb952..e3199063 100644 --- a/src/Driver/Internal/Http3/Http3Writer.php +++ b/src/Driver/Internal/Http3/Http3Writer.php @@ -7,7 +7,7 @@ class Http3Writer { - private $controlStream; + private QuicSocket $controlStream; public function __construct(private QuicConnection $connection, private array $settings) { @@ -43,6 +43,11 @@ public function sendData(QuicSocket $stream, string $payload) self::sendFrame($stream, Http3Frame::DATA, $payload); } + public function sendGoaway(int $highestStreamId) + { + self::sendFrame($this->controlStream, Http3Frame::GOAWAY, self::encodeVarint($highestStreamId)); + } + private function startControlStream() { $this->controlStream = $this->connection->openStream(); @@ -56,4 +61,8 @@ private function startControlStream() self::sendFrame($this->controlStream, Http3Frame::SETTINGS, \implode($ints)); } + public function close() + { + $this->connection->close(Http3Error::H3_NO_ERROR->value); + } }