Skip to content

Commit

Permalink
feat: add JSON RPC protocol support
Browse files Browse the repository at this point in the history
Adds JSON RPC protocol support for operations such as StartLiveTail from CloudwatchLogs. This change also modifies the DecodingEventStreamIterator so it can handle responses where its body is streamable.
  • Loading branch information
yenfryherrerafeliz committed Apr 1, 2024
1 parent 2fd8cae commit 1058abc
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 10 deletions.
66 changes: 58 additions & 8 deletions src/Api/Parser/DecodingEventStreamIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class DecodingEventStreamIterator implements Iterator

/** @var int $currentPosition */
private $currentPosition;
/** @var array $tempBytesHolder */
private $tempBytesHolder;

/**
* DecodingEventStreamIterator constructor.
Expand All @@ -75,6 +77,7 @@ public function __construct(StreamInterface $stream)
{
$this->stream = $stream;
$this->rewind();
$this->tempBytesHolder = [];
}

private function parseHeaders($headerBytes)
Expand Down Expand Up @@ -130,25 +133,24 @@ private function parsePrelude()
private function parseEvent()
{
$event = [];

if ($this->stream->tell() < $this->stream->getSize()) {
if (($this->stream->isSeekable() && $this->stream->tell() < $this->stream->getSize()) || !$this->stream->isSeekable()) {
$this->hashContext = hash_init('crc32b');

$bytesLeft = $this->stream->getSize() - $this->stream->tell();
list($prelude, $numBytes) = $this->parsePrelude();
if ($prelude[self::LENGTH_TOTAL] > $bytesLeft) {
if ($this->stream->isSeekable() && $prelude[self::LENGTH_TOTAL] > $bytesLeft) {
throw new ParserException('Message length too long.');
}
$bytesLeft -= $numBytes;

if ($prelude[self::LENGTH_HEADERS] > $bytesLeft) {
if ($this->stream->isSeekable() && $prelude[self::LENGTH_HEADERS] > $bytesLeft) {
throw new ParserException('Headers length too long.');
}

list(
$event[self::HEADERS],
$numBytes
) = $this->parseHeaders($prelude[self::LENGTH_HEADERS]);
) = $this->parseHeaders($prelude[self::LENGTH_HEADERS]);

$event[self::PAYLOAD] = Psr7\Utils::streamFor(
$this->readAndHashBytes(
Expand Down Expand Up @@ -200,26 +202,74 @@ public function next()
#[\ReturnTypeWillChange]
public function rewind()
{
$this->stream->rewind();
if ($this->stream->isSeekable()) {
$this->stream->rewind();
}

$this->key = 0;
$this->currentPosition = 0;
$this->currentEvent = $this->parseEvent();
}

/**
* This method decides whether at next item is available, which
* means if a next event should be parsed, and we have two scenarios:
* - The first one is for when the stream is seekable, and in this case
* we validate if the position of the pointer within the stream is
* before the size of the stream, which means we have not reached the
* end of the stream.
* - The other scenario is when dealing with no-seekable stream where
* we may do not know the size of the stream, we read a byte from the
* stream, and if not more bytes ara available then `eof` method will
* be forced to return true.
*
* @return bool
*/
#[\ReturnTypeWillChange]
public function valid()
{
return $this->currentPosition < $this->stream->getSize();
if ($this->stream->isSeekable()) {
return $this->currentPosition < $this->stream->getSize();
}

// We do this to make sure the return statement is accurate.
// Reading a byte when the stream has not more bytes available
// will force `eof` to return true.
$this->tempBytesHolder[] = $this->stream->read(1);

return !$this->stream->eof();
}

// Decoding Utilities

/**
* This method read bytes from the stream being decoded,
* and hash them. When dealing with no-seekable streams,
* in the valid method we put bytes into a temporary
* variable holder, due to the validation there, therefore
* here we need to handle the read by looking for available
* bytes first into that temp variable holder. This is just
* when the stream is not seekable, and a stream will most of
* the time be not seekable when the flag `stream` is passed
* as true when performing the guzzle request.
*
* @param int $num is the number of bytes to be read.
*
* @return string
*/
private function readAndHashBytes($num)
{
$bytes = $this->stream->read($num);
$bytes = '';
if (!$this->stream->isSeekable() && !empty($this->tempBytesHolder)) {
while (!empty($this->tempBytesHolder) && $num > 0) {
$byte = array_shift($this->tempBytesHolder);
$bytes .= $byte;
$num = $num - 1;
}
}

$bytes = $bytes . $this->stream->read($num);

hash_update($this->hashContext, $bytes);
return $bytes;
}
Expand Down
25 changes: 24 additions & 1 deletion src/Api/Parser/JsonRpcParser.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php
namespace Aws\Api\Parser;

use Aws\Api\Operation;
use Aws\Api\StructureShape;
use Aws\Api\Service;
use Aws\Result;
Expand Down Expand Up @@ -30,6 +31,28 @@ public function __invoke(
ResponseInterface $response
) {
$operation = $this->api->getOperation($command->getName());

return $this->parseResponse($response, $operation);
}

/**
* This method parses a response for a json rpc protocol
* @param ResponseInterface $response
* @param Operation $operation
*
* @return Result
*/
private function parseResponse(ResponseInterface $response, Operation $operation)
{
$outputShape = $operation->getOutput();
foreach ($outputShape->getMembers() as $memberName => $memberProps) {
if (!empty($memberProps['eventstream'])) {
return new Result( [
$memberName => new EventParsingIterator($response->getBody(), $outputShape->getMember($memberName), $this)
]);
}
}

$result = null === $operation['output']
? null
: $this->parseMemberFromStream(
Expand All @@ -38,7 +61,7 @@ public function __invoke(
$response
);

return new Result($result ?: []);
return new Result($result ?? []);
}

public function parseMemberFromStream(
Expand Down
45 changes: 44 additions & 1 deletion src/CloudWatchLogs/CloudWatchLogsClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
namespace Aws\CloudWatchLogs;

use Aws\AwsClient;
use Aws\CommandInterface;

/**
* This client is used to interact with the **Amazon CloudWatch Logs** service.
Expand Down Expand Up @@ -155,4 +156,46 @@
* @method \Aws\Result updateLogAnomalyDetector(array $args = [])
* @method \GuzzleHttp\Promise\Promise updateLogAnomalyDetectorAsync(array $args = [])
*/
class CloudWatchLogsClient extends AwsClient {}
class CloudWatchLogsClient extends AwsClient {
static $streamingCommands = [
'StartLiveTail' => true
];

public function __construct(array $args)
{
parent::__construct($args);
$this->appendClientCustomMiddlewares();
}

private function appendClientCustomMiddlewares()
{
$this->getHandlerList()->appendInit($this->getSetStreamingFlagMiddleware(), 'streaming-flag-middleware');
}

private function getSetStreamingFlagMiddleware(): callable
{
return function (callable $handler) {
return function (CommandInterface $command, $request = null) use ($handler) {
if (!empty(self::$streamingCommands[$command->getName()])) {
$command['@http']['stream'] = true;
}

return $handler($command, $request);
};
};
}

public function startLiveTailCheckingForResults($args): \Generator
{
$response = $this->startLiveTail($args);
foreach ($response['responseStream'] as $streamChunk) {
if (isset($streamChunk['sessionUpdate'])) {
if (!empty($streamChunk['sessionUpdate']['sessionResults'])) {
yield $streamChunk;
}
} else {
yield $streamChunk;
}
}
}
}
38 changes: 38 additions & 0 deletions tests/CloudWatchLogs/CloudWatchLogsClientTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

namespace Aws\Test\CloudWatchLogs;

use Aws\CloudWatchLogs\CloudWatchLogsClient;
use Aws\CommandInterface;
use GuzzleHttp\Psr7\Response;
use Psr\Http\Message\RequestInterface;
use Yoast\PHPUnitPolyfills\TestCases\TestCase;

class CloudWatchLogsClientTest extends TestCase
{
public function testSetStreamingFlagMiddleware()
{
$client = new CloudWatchLogsClient([
'region' => 'us-east-2',
'http_handler' => function (RequestInterface $request) {
return new Response(200);
}
]);
$client->getHandlerList()->appendInit(function ($handler) {
return function (CommandInterface $command, $request=null) use ($handler) {
self::assertNotEmpty($command['@http']['stream']);
self::assertTrue($command['@http']['stream']);

return $handler($command, $request);
};
});
$client->startLiveTail([
'logGroupIdentifiers' => [
'arn:aws:logs:us-east-1:1234567890123:log-group:TestLogGroup'
],
'logStreamNames' => [
'TestLogStream'
]
]);
}
}

0 comments on commit 1058abc

Please sign in to comment.