Skip to content

Commit

Permalink
feat: add JSON RPC protocol support
Browse files Browse the repository at this point in the history
Adds JSON RPC protocol support for operations such as StartLiveTail from CloudwatchLogs. This change also modifies the DecodingEventStreamIterator so it can handle responses where its body is streamable.
  • Loading branch information
yenfryherrerafeliz committed Apr 30, 2024
1 parent 2fd8cae commit 927d7fb
Show file tree
Hide file tree
Showing 9 changed files with 610 additions and 17 deletions.
23 changes: 14 additions & 9 deletions src/Api/Parser/DecodingEventStreamIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,19 @@ class DecodingEventStreamIterator implements Iterator
];

/** @var StreamInterface Stream of eventstream shape to parse. */
private $stream;
protected $stream;

/** @var array Currently parsed event. */
private $currentEvent;
protected $currentEvent;

/** @var int Current in-order event key. */
private $key;
protected $key;

/** @var resource|\HashContext CRC32 hash context for event validation */
private $hashContext;
protected $hashContext;

/** @var int $currentPosition */
private $currentPosition;
protected $currentPosition;

/**
* DecodingEventStreamIterator constructor.
Expand All @@ -77,7 +77,7 @@ public function __construct(StreamInterface $stream)
$this->rewind();
}

private function parseHeaders($headerBytes)
protected function parseHeaders($headerBytes)
{
$headers = [];
$bytesRead = 0;
Expand All @@ -102,7 +102,7 @@ private function parseHeaders($headerBytes)
return [$headers, $bytesRead];
}

private function parsePrelude()
protected function parsePrelude()
{
$prelude = [];
$bytesRead = 0;
Expand All @@ -127,7 +127,12 @@ private function parsePrelude()
return [$prelude, $bytesRead];
}

private function parseEvent()
/**
* This method decodes an event from the stream.
*
* @return array
*/
protected function parseEvent()
{
$event = [];

Expand Down Expand Up @@ -217,7 +222,7 @@ public function valid()

// Decoding Utilities

private function readAndHashBytes($num)
protected function readAndHashBytes($num)
{
$bytes = $this->stream->read($num);
hash_update($this->hashContext, $bytes);
Expand Down
30 changes: 28 additions & 2 deletions src/Api/Parser/EventParsingIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,28 @@ public function __construct(
StructureShape $shape,
AbstractParser $parser
) {
$this->decodingIterator = new DecodingEventStreamIterator($stream);
$this->decodingIterator = $this->chooseDecodingIterator($stream);
$this->shape = $shape;
$this->parser = $parser;
}

/**
* This method choose a decoding iterator implementation based on if the stream
* is seekable or not.
*
* @param $stream
*
* @return Iterator
*/
private function chooseDecodingIterator($stream)
{
if ($stream->isSeekable()) {
return new DecodingEventStreamIterator($stream);
} else {
return new NonSeekableStreamDecodingEventStreamIterator($stream);
}
}

#[\ReturnTypeWillChange]
public function current()
{
Expand Down Expand Up @@ -80,8 +97,12 @@ private function parseEvent(array $event)
throw new ParserException('Failed to parse without event type.');
}

$eventShape = $this->shape->getMember($eventType);
$eventPayload = $event['payload'];
if ($eventType === 'initial-response') {
return $this->parseInitialResponseEvent($eventPayload);
}

$eventShape = $this->shape->getMember($eventType);

return [
$eventType => array_merge(
Expand Down Expand Up @@ -153,4 +174,9 @@ private function parseError(array $event)
$event['headers'][':error-message']
);
}

private function parseInitialResponseEvent($payload): array
{
return ['initial-response' => json_decode($payload, true)];
}
}
39 changes: 35 additions & 4 deletions src/Api/Parser/JsonRpcParser.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php
namespace Aws\Api\Parser;

use Aws\Api\Operation;
use Aws\Api\StructureShape;
use Aws\Api\Service;
use Aws\Result;
Expand Down Expand Up @@ -30,15 +31,45 @@ public function __invoke(
ResponseInterface $response
) {
$operation = $this->api->getOperation($command->getName());
$result = null === $operation['output']
? null
: $this->parseMemberFromStream(

return $this->parseResponse($response, $operation);
}

/**
* This method parses a response based on JSON RPC protocol.
*
* @param ResponseInterface $response the response to parse.
* @param Operation $operation the operation which holds information for
* parsing the response.
*
* @return Result
*/
private function parseResponse(ResponseInterface $response, Operation $operation)
{
if (null === $operation['output']) {
return new Result([]);
}

$outputShape = $operation->getOutput();
foreach ($outputShape->getMembers() as $memberName => $memberProps) {
if (!empty($memberProps['eventstream'])) {
return new Result([
$memberName => new EventParsingIterator(
$response->getBody(),
$outputShape->getMember($memberName),
$this
)
]);
}
}

$result = $this->parseMemberFromStream(
$response->getBody(),
$operation->getOutput(),
$response
);

return new Result($result ?: []);
return new Result(is_null($result) ? [] : $result);
}

public function parseMemberFromStream(
Expand Down
101 changes: 101 additions & 0 deletions src/Api/Parser/NonSeekableStreamDecodingEventStreamIterator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
<?php

namespace Aws\Api\Parser;

use GuzzleHttp\Psr7;
use Psr\Http\Message\StreamInterface;
use Aws\Api\Parser\Exception\ParserException;

/**
* @inheritDoc
*/
class NonSeekableStreamDecodingEventStreamIterator extends DecodingEventStreamIterator
{
/** @var array $tempBuffer */
private $tempBuffer;

/**
* NonSeekableStreamDecodingEventStreamIterator constructor.
*
* @param StreamInterface $stream
*/
public function __construct(StreamInterface $stream)
{
$this->stream = $stream;
if ($this->stream->isSeekable()) {
throw new \InvalidArgumentException('The stream provided must be not seekable.');
}

$this->tempBuffer = [];
}

/**
* @inheritDoc
*
* @return array
*/
protected function parseEvent(): array
{
$event = [];
$this->hashContext = hash_init('crc32b');
$prelude = $this->parsePrelude()[0];
list(
$event[self::HEADERS],
$numBytes
) = $this->parseHeaders($prelude[self::LENGTH_HEADERS]);
$event[self::PAYLOAD] = Psr7\Utils::streamFor(
$this->readAndHashBytes(
$prelude[self::LENGTH_TOTAL] - self::BYTES_PRELUDE
- $numBytes - self::BYTES_TRAILING
)
);
$calculatedCrc = hash_final($this->hashContext, true);
$messageCrc = $this->stream->read(4);
if ($calculatedCrc !== $messageCrc) {
throw new ParserException('Message checksum mismatch.');
}

return $event;
}

protected function readAndHashBytes($num): string
{
$bytes = '';
while (!empty($this->tempBuffer) && $num > 0) {
$byte = array_shift($this->tempBuffer);
$bytes .= $byte;
$num = $num - 1;
}

$bytes = $bytes . $this->stream->read($num);
hash_update($this->hashContext, $bytes);

return $bytes;
}

// Iterator Functionality

#[\ReturnTypeWillChange]
public function rewind()
{
$this->currentEvent = $this->parseEvent();
}

public function next()
{
$this->tempBuffer[] = $this->stream->read(1);
if ($this->valid()) {
$this->key++;
$this->currentEvent = $this->parseEvent();
}
}

/**
* @return bool
*/
#[\ReturnTypeWillChange]
public function valid()
{
return !$this->stream->eof();
}
}
64 changes: 63 additions & 1 deletion src/CloudWatchLogs/CloudWatchLogsClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
namespace Aws\CloudWatchLogs;

use Aws\AwsClient;
use Aws\CommandInterface;
use Generator;

/**
* This client is used to interact with the **Amazon CloudWatch Logs** service.
Expand Down Expand Up @@ -155,4 +157,64 @@
* @method \Aws\Result updateLogAnomalyDetector(array $args = [])
* @method \GuzzleHttp\Promise\Promise updateLogAnomalyDetectorAsync(array $args = [])
*/
class CloudWatchLogsClient extends AwsClient {}
class CloudWatchLogsClient extends AwsClient {
static $streamingCommands = [
'StartLiveTail' => true
];

public function __construct(array $args)
{
parent::__construct($args);
$this->addStreamingFlagMiddleware();
}

private function addStreamingFlagMiddleware()
{
$this->getHandlerList()
-> appendInit(
$this->getStreamingFlagMiddleware(),
'streaming-flag-middleware'
);
}

private function getStreamingFlagMiddleware(): callable
{
return function (callable $handler) {
return function (CommandInterface $command, $request = null) use ($handler) {
if (!empty(self::$streamingCommands[$command->getName()])) {
$command['@http']['stream'] = true;
}

return $handler($command, $request);
};
};
}

/**
* Helper method for 'startLiveTail' operation that checks for results.
*
* Initiates 'startLiveTail' operation with given arguments, and continuously
* checks response stream for session updates or results, yielding each
* stream chunk when results are not empty. This method abstracts from users
* the need of checking if there are logs entry available to be watched, which means
* that users will always get a next item to be iterated when more log entries are
* available.
*
* @param array $args Command arguments.
*
* @return Generator Yields session update or result stream chunks.
*/
public function startLiveTailCheckingForResults(array $args): Generator
{
$response = $this->startLiveTail($args);
foreach ($response['responseStream'] as $streamChunk) {
if (isset($streamChunk['sessionUpdate'])) {
if (!empty($streamChunk['sessionUpdate']['sessionResults'])) {
yield $streamChunk;
}
} else {
yield $streamChunk;
}
}
}
}
Loading

0 comments on commit 927d7fb

Please sign in to comment.