Skip to content

Commit

Permalink
Fully test Http3Parser
Browse files Browse the repository at this point in the history
  • Loading branch information
bwoebi committed Feb 10, 2024
1 parent 6409275 commit 12aecf8
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 33 deletions.
54 changes: 33 additions & 21 deletions src/Driver/Internal/Http3/Http3Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Amp\ByteStream\ReadableStream;
use Amp\Cancellation;
use Amp\CancelledException;
use Amp\DeferredCancellation;
use Amp\Http\StructuredFields\Boolean;
use Amp\Http\StructuredFields\Number;
Expand All @@ -19,6 +20,7 @@ class Http3Parser
{
private ?QuicSocket $qpackDecodeStream = null;
private ?QuicSocket $qpackEncodeStream = null;
private bool $receivedControlStream = false;
private Queue $queue;
/** @var array<int, EventLoop\Suspension> */
private array $datagramReceivers = [];
Expand All @@ -42,13 +44,13 @@ public static function decodeVarint(string $string, int &$off): int
--$off;
return -1;
}
return ($int << 8) + \ord($string[$off++]);
return (($int & 0x3F) << 8) + \ord($string[$off++]);
case 0x80:
if (\strlen($string) < $off + 3) {
--$off;
return -1;
}
return ($int << 24) + (\ord($string[$off++]) << 16) + (\ord($string[$off++]) << 8) + \ord($string[$off++]);
return (($int & 0x3F) << 24) + (\ord($string[$off++]) << 16) + (\ord($string[$off++]) << 8) + \ord($string[$off++]);
default:
if (\strlen($string) < $off-- + 7) {
return -1;
Expand Down Expand Up @@ -297,8 +299,8 @@ private static function processHeaders(array $decoded): ?array
return [$headers, $pseudo];
}

// I'm unable to suppress https://github.com/vimeo/psalm/issues/10669
/* @return ConcurrentIterator<list{Http3Frame::HEADERS, QuicSocket, \Generator}|list{Http3Frame::GOAWAY|Http3Frame::MAX_PUSH_ID|Http3Frame::CANCEL_PUSH, int}|list{Http3Frame::PRIORITY_UPDATE_Push|Http3Frame::PRIORITY_UPDATE_Request, int, string}|list{Http3Frame::PUSH_PROMISE, int, callable(): \Generator}|list{int, string, QuicSocket}> */
// Omitting it due to https://github.com/vimeo/psalm/issues/10002
/* @return ConcurrentIterator<list{(Http3Frame::HEADERS), QuicSocket, \Generator}|list{(Http3Frame::GOAWAY|Http3Frame::MAX_PUSH_ID|Http3Frame::CANCEL_PUSH), int}|list{(Http3Frame::PRIORITY_UPDATE_Push|Http3Frame::PRIORITY_UPDATE_Request), int, string}|list{(Http3Frame::PUSH_PROMISE), int, callable(): \Generator}|list{int, string, QuicSocket}> */
public function process(): ConcurrentIterator
{
EventLoop::queue(function () {
Expand Down Expand Up @@ -332,6 +334,11 @@ public function process(): ConcurrentIterator
// unidirectional stream
switch (Http3StreamType::tryFrom($type)) {
case Http3StreamType::Control:
if ($this->receivedControlStream) {
throw new Http3ConnectionException("There must be only one control stream", Http3Error::H3_STREAM_CREATION_ERROR);
}
$this->receivedControlStream = true;

if (![$frame, $contents] = $this->readFullFrame($stream, $buf, $off, 0x1000)) {
if (!$stream->getConnection()->isClosed()) {
throw new Http3ConnectionException("The control stream was closed", Http3Error::H3_CLOSED_CRITICAL_STREAM);
Expand All @@ -345,7 +352,7 @@ public function process(): ConcurrentIterator
$this->parseSettings($contents);

while (true) {
if (![$frame, $contents] = $this->readFullFrame($stream, $buf, $off, 0x100)) {
if (![$frame, $contents] = $this->readFullFrame($stream, $buf, $off, 0x1000)) {
if (!$stream->getConnection()->isClosed()) {
throw new Http3ConnectionException("The control stream was closed", Http3Error::H3_CLOSED_CRITICAL_STREAM);
}
Expand Down Expand Up @@ -385,14 +392,14 @@ public function process(): ConcurrentIterator
// We don't do anything with these streams yet, but we must not close them according to RFC 9204 Section 4.2
case Http3StreamType::QPackEncode:
if ($this->qpackEncodeStream) {
return;
throw new Http3ConnectionException("There must be only one QPACK encoding stream", Http3Error::H3_STREAM_CREATION_ERROR);
}
$this->qpackEncodeStream = $stream;
break;

case Http3StreamType::QPackDecode:
if ($this->qpackDecodeStream) {
return;
throw new Http3ConnectionException("There must be only one QPACK decoding stream", Http3Error::H3_STREAM_CREATION_ERROR);
}
$this->qpackDecodeStream = $stream;
break;
Expand Down Expand Up @@ -455,22 +462,25 @@ private function datagramReceiver(): void
$this->datagramReceiveEmpty = new DeferredCancellation;
$cancellation = $this->datagramReceiveEmpty->getCancellation();
EventLoop::queue(function () use ($cancellation) {
while (null !== $buf = $this->connection->receive($cancellation)) {
$off = 0;
$quarterStreamId = self::decodeVarint($buf, $off);
if (isset($this->datagramReceivers[$quarterStreamId])) {
$this->datagramReceivers[$quarterStreamId]->resume(\substr($buf, $off));
unset($this->datagramReceivers[$quarterStreamId]);
try {
while (null !== $buf = $this->connection->receive($cancellation)) {
$off = 0;
$quarterStreamId = self::decodeVarint($buf, $off);
if (isset($this->datagramReceivers[$quarterStreamId])) {
$this->datagramReceivers[$quarterStreamId]->resume(\substr($buf, $off));
unset($this->datagramReceivers[$quarterStreamId]);

if (!$this->datagramReceivers) {
return;
}

if (!$this->datagramReceivers) {
return;
// We need to await a tick to allow datagram receivers to request a new datagram to avoid needlessly discarding datagram frames
$suspension = EventLoop::getSuspension();
EventLoop::queue($suspension->resume(...));
$suspension->suspend();
}

// We need to await a tick to allow datagram receivers to request a new datagram to avoid needlessly discarding datagram frames
$suspension = EventLoop::getSuspension();
EventLoop::queue($suspension->resume(...));
$suspension->suspend();
}
} catch (CancelledException) {
}
});
}
Expand All @@ -489,7 +499,9 @@ public function receiveDatagram(QuicSocket $stream, ?Cancellation $cancellation
if (!isset($this->datagramCloseHandlerInstalled[$quarterStreamId])) {
$this->datagramCloseHandlerInstalled[$quarterStreamId] = true;
$stream->onClose(function () use ($quarterStreamId) {
$this->datagramReceivers[$quarterStreamId]->resume();
if (isset($this->datagramReceivers[$quarterStreamId])) {
$this->datagramReceivers[$quarterStreamId]->resume();
}
unset($this->datagramReceivers[$quarterStreamId], $this->datagramCloseHandlerInstalled[$quarterStreamId]);
if (!$this->datagramReceivers) {
$this->datagramReceiveEmpty->cancel();
Expand Down
20 changes: 20 additions & 0 deletions src/Driver/Internal/Http3/Http3Writer.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,26 @@ public function sendData(QuicSocket $stream, string $payload): void
self::sendKnownFrame($stream, Http3Frame::DATA, $payload);
}

public function sendPriorityRequest(int $streamId, string $structuredPriorityData): void
{
self::sendKnownFrame($this->controlStream, Http3Frame::PRIORITY_UPDATE_Request, self::encodeVarint($streamId) . $structuredPriorityData);
}

public function sendPriorityPush(int $streamId, string $structuredPriorityData): void
{
self::sendKnownFrame($this->controlStream, Http3Frame::PRIORITY_UPDATE_Push, self::encodeVarint($streamId) . $structuredPriorityData);
}

public function sendMaxPushId(int $pushId): void
{
self::sendKnownFrame($this->controlStream, Http3Frame::MAX_PUSH_ID, self::encodeVarint($pushId));
}

public function sendCancelPush(int $pushId): void
{
self::sendKnownFrame($this->controlStream, Http3Frame::CANCEL_PUSH, self::encodeVarint($pushId));
}

public function sendGoaway(int $highestStreamId): void
{
self::sendKnownFrame($this->controlStream, Http3Frame::GOAWAY, self::encodeVarint($highestStreamId));
Expand Down
2 changes: 1 addition & 1 deletion src/Driver/Internal/Http3/QPack.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private static function decodeDynamicInteger(string $input, int $maxBits, int &$
$int += ($c & 0x7f) << $bitshift;
$bitshift += 7;

if ($int > 2147483647) {
if ($int > 0x7FFFFFFF) {
throw new QPackException(Http3Error::QPACK_DECOMPRESSION_FAILED, 'Invalid integer, too large');
}
} while ($c & 0x80);
Expand Down
Loading

0 comments on commit 12aecf8

Please sign in to comment.