From 52874eb1880bc853f3ef394a48d6bee7c7a10912 Mon Sep 17 00:00:00 2001 From: Yenfry Herrera Feliz Date: Fri, 17 Nov 2023 09:23:41 -0800 Subject: [PATCH] fix: event parsing iterator The EventParsingIterator implementation does not parse correctly events when their payload do not have the eventpayload property set. The current behavior is that if said property "eventpayload" is not present then it fallback to extracting the value from the headers, but this is not a reliable method for parsing event streams since seems that not all services uses this pattern to provide the events. One example is the InvokeModelWithResponseStream operation from bedrock-runtime service. To fix this we pretty much followed the specs found in the smithy reference: https://smithy.io/2.0/spec/streaming.html#eventpayload-trait#id6, which explains that when eventpayload and eventheader is not set for any of the members of the shape then, the payload itself should be considered as the top level shape structure to be parsed. The test implementation for eventstream have also been refactored as follow: - We now have to specify the rest protocol that the input test data will be using, and based on that protocol we have a factory method to create either a rest xml or rest JSON parser. - Instead of validating if a specific member is present to validate if the parsed data type is the correct one, we now validate each parsed member to make sure they are all parsed into the expected data type. --- src/Api/Parser/EventParsingIterator.php | 80 +++-- tests/Api/Parser/EventParsingIteratorTest.php | 274 +++++++++++++----- .../bedrock_invoke_model_shape.json | 14 + .../headers_payload_shape.json | 26 ++ .../input/bedrock_invoke_model_event | 1 + .../input/headers_payload_event | 1 + .../input/lambda_invoke_event | 1 + .../lambda_invoke_shape.json | 28 ++ .../output/bedrock_invoke_model_event.json | 12 + .../output/headers_payload_event.json | 8 + .../output/lambda_invoke_event.json | 14 + 11 files changed, 373 insertions(+), 86 deletions(-) create mode 100644 tests/Api/eventstream_fixtures/bedrock_invoke_model_shape.json create mode 100644 tests/Api/eventstream_fixtures/headers_payload_shape.json create mode 100644 tests/Api/eventstream_fixtures/input/bedrock_invoke_model_event create mode 100644 tests/Api/eventstream_fixtures/input/headers_payload_event create mode 100644 tests/Api/eventstream_fixtures/input/lambda_invoke_event create mode 100644 tests/Api/eventstream_fixtures/lambda_invoke_shape.json create mode 100644 tests/Api/eventstream_fixtures/output/bedrock_invoke_model_event.json create mode 100644 tests/Api/eventstream_fixtures/output/headers_payload_event.json create mode 100644 tests/Api/eventstream_fixtures/output/lambda_invoke_event.json diff --git a/src/Api/Parser/EventParsingIterator.php b/src/Api/Parser/EventParsingIterator.php index 7ee35fb0d8..20d75a9e88 100644 --- a/src/Api/Parser/EventParsingIterator.php +++ b/src/Api/Parser/EventParsingIterator.php @@ -69,37 +69,81 @@ private function parseEvent(array $event) if ($event['headers'][':message-type'] === 'error') { return $this->parseError($event); } + if ($event['headers'][':message-type'] !== 'event') { throw new ParserException('Failed to parse unknown message type.'); } } - if (empty($event['headers'][':event-type'])) { + $eventType = $event['headers'][':event-type'] ?? null; + if (empty($eventType)) { throw new ParserException('Failed to parse without event type.'); } - $eventShape = $this->shape->getMember($event['headers'][':event-type']); - - $parsedEvent = []; - foreach ($eventShape['members'] as $shape => $details) { - if (!empty($details['eventpayload'])) { - $payloadShape = $eventShape->getMember($shape); - if ($payloadShape['type'] === 'blob') { - $parsedEvent[$shape] = $event['payload']; + + $eventShape = $this->shape->getMember($eventType); + $eventPayload = $event['payload']; + + return [ + $eventType => array_merge( + $this->parseEventHeaders($event['headers'], $eventShape), + $this->parseEventPayload($eventPayload, $eventShape) + ) + ]; + } + + /** + * @param $headers + * @param $eventShape + * + * @return array + */ + private function parseEventHeaders($headers, $eventShape): array + { + $parsedHeaders = []; + foreach ($eventShape->getMembers() as $memberName => $memberProps) { + if (isset($memberProps['eventheader'])) { + $parsedHeaders[$memberName] = $headers[$memberName]; + } + } + + return $parsedHeaders; + } + + /** + * @param $payload + * @param $eventShape + * + * @return array + */ + private function parseEventPayload($payload, $eventShape): array + { + $parsedPayload = []; + foreach ($eventShape->getMembers() as $memberName => $memberProps) { + $memberShape = $eventShape->getMember($memberName); + if (isset($memberProps['eventpayload'])) { + if ($memberShape->getType() === 'blob') { + $parsedPayload[$memberName] = $payload; } else { - $parsedEvent[$shape] = $this->parser->parseMemberFromStream( - $event['payload'], - $payloadShape, + $parsedPayload[$memberName] = $this->parser->parseMemberFromStream( + $payload, + $memberShape, null ); } - } else { - $parsedEvent[$shape] = $event['headers'][$shape]; + + break; } } - return [ - $event['headers'][':event-type'] => $parsedEvent - ]; + if (empty($parsedPayload) && !empty($payload->getContents())) { + /** + * If we did not find a member with an eventpayload trait, then we should deserialize the payload + * using the event's shape. + */ + $parsedPayload = $this->parser->parseMemberFromStream($payload, $eventShape, null); + } + + return $parsedPayload; } private function parseError(array $event) @@ -109,4 +153,4 @@ private function parseError(array $event) $event['headers'][':error-message'] ); } -} \ No newline at end of file +} diff --git a/tests/Api/Parser/EventParsingIteratorTest.php b/tests/Api/Parser/EventParsingIteratorTest.php index be55616f82..0dab7987a6 100644 --- a/tests/Api/Parser/EventParsingIteratorTest.php +++ b/tests/Api/Parser/EventParsingIteratorTest.php @@ -2,8 +2,11 @@ namespace Aws\Test\Api\Parser; +use Aws\Api\DateTimeResult; +use Aws\Api\Parser\AbstractRestParser; use Aws\Api\Parser\EventParsingIterator; use Aws\Api\Parser\Exception\ParserException; +use Aws\Api\Parser\RestJsonParser; use Aws\Api\Parser\RestXmlParser; use Aws\Api\Service; use Aws\Api\ShapeMap; @@ -18,92 +21,175 @@ */ class EventParsingIteratorTest extends TestCase { + const PROTOCOL_XML = 'XML'; + const PROTOCOL_JSON = 'JSON'; + const EVENT_STREAM_SHAPE = __DIR__ . '/../eventstream_fixtures/eventstream_shape.json'; /** @var array */ - private static $successEventNames = [ - 'end_event', - 'headers_event', - 'records_event', - 'stats_event' + private static $eventCases = [ + [ + 'shape' => self::EVENT_STREAM_SHAPE, + 'protocol' => self::PROTOCOL_XML, + 'eventNames' => [ + 'end_event', + 'headers_event', + 'records_event', + 'stats_event', + ] + ], + [ + 'shape' => __DIR__ . '/../eventstream_fixtures/lambda_invoke_shape.json', + 'protocol' => self::PROTOCOL_JSON, + 'eventNames' => [ + 'lambda_invoke_event' + ] + ], + [ + 'shape' => __DIR__ . '/../eventstream_fixtures/bedrock_invoke_model_shape.json', + 'protocol' => self::PROTOCOL_JSON, + 'eventNames' => [ + 'bedrock_invoke_model_event' + ] + ], + [ + 'shape' => __DIR__ . '/../eventstream_fixtures/headers_payload_shape.json', + 'protocol' => self::PROTOCOL_JSON, + 'eventNames' => [ + 'headers_payload_event' + ] + ] ]; - /** @var StructureShape */ - private $eventstreamShape; - public function set_up() + /** + * This method is used to generate the event parsing iterator and the expected output + * for a provided input from a test case. + * + * @return \Generator + */ + public function iteratorDataProvider() { - $shape = json_decode( - file_get_contents( - __DIR__ . '/../eventstream_fixtures/eventstream_shape.json' - ), - true - ); - $this->eventstreamShape = new StructureShape( - $shape, - new ShapeMap(['EventStream' => $shape]) - ); + foreach (self::$eventCases as $eventCase) { + $shape = $this->loadEventStreamShapeFromJson($eventCase['shape']); + $restParser = $this->createRestParser($eventCase['protocol']); + foreach ($eventCase['eventNames'] as $eventName) { + $input = base64_decode(file_get_contents( + __DIR__ . '/../eventstream_fixtures/input/' . $eventName + )); + $output = json_decode( + file_get_contents( + __DIR__ . '/../eventstream_fixtures/output/' . $eventName . '.json' + ), + true + ); + $iterator = new EventParsingIterator(Psr7\Utils::streamFor($input), $shape, $restParser); + + yield $eventName => [$iterator, $output]; + } + } } - public function getEventData() + /** + * This test checks for whether the parsed event matches the expected output. + * When the parsed message is an array with just one element in it then, we evaluate + * this unique element against the expected output, otherwise we evaluate the whole array + * against the expected output. The reason for this is to test parsing either single or multiple + * events. + * + * @dataProvider iteratorDataProvider + */ + public function testParsedEventsMatchExpectedOutput($iterator, $expectedOutput) { - $events = []; - foreach (self::$successEventNames as $name) { - $event = []; - $event['input'] = base64_decode(file_get_contents( - __DIR__ . '/../eventstream_fixtures/input/' . $name - )); - $event['output'] = [json_decode( - file_get_contents( - __DIR__ . '/../eventstream_fixtures/output/' . $name . '.json' - ), - true - )]; - $event['count'] = 1; - $events []= $event; + $parsedMessage = []; + foreach ($iterator as $event) { + $parsedMessage[] = $event; } - $combinedInput = ''; - $combinedOutput = []; - foreach ($events as $event) { - $combinedInput .= $event['input']; - $combinedOutput []= $event['output'][0]; + if (count($parsedMessage) == 1) { + $this->assertEquals($expectedOutput, $parsedMessage[0]); + } else { + $this->assertEquals($expectedOutput, $parsedMessage); } - $events []= [ - 'input' => $combinedInput, - 'output' => $combinedOutput, - 'count' => count($events), - ]; - - return $events; } /** - * @dataProvider getEventData + * This method tests for whether the deserialized event members match the equivalent + * shape member types. + * + * @dataProvider iteratorDataProvider */ - public function testEmitsEvents($input, $output, $expectedCount) + public function testParsedEventsMatchExpectedType($iterator) { - $stream = Psr7\Utils::streamFor($input); - $iterator = new EventParsingIterator( - $stream, - $this->eventstreamShape, - new RestXmlParser( - new Service([], function () { return []; }) - ) - ); - - $count = 0; + $reflectedIteratorClass = new \ReflectionClass(get_class($iterator)); + $shapeProperty = $reflectedIteratorClass->getProperty('shape'); + $shapeProperty->setAccessible(true); + $shape = $shapeProperty->getValue($iterator); foreach ($iterator as $event) { - if (isset($event['Records'])) { - $this->assertInstanceOf( - StreamInterface::class, - $event['Records']['Payload'] - ); + $this->testParsedEventMatchExpectedType($shape, $event); + } + } + + /** + * This method is a helper of testParsedEventsMatchExpectedOutput for testing for whether + * the deserialized event members match the equivalent shape member types. + * + * @param $shape + * @param $event + * + * @return void + */ + private function testParsedEventMatchExpectedType($shape, $event) + { + foreach ($event as $key => $value) { + $this->assertTrue($shape->hasMember($key), "Shape has not member with name $key"); + $shapeMember = $shape->getMember($key); + $this->assertTrue( + $this->shapeTypeMatchesValue($shapeMember->getType(), $value), + 'Shape type "'. $shapeMember->getType(). '" does not match parsed value type "' . gettype($value) . '"' + ); + if (is_array($value)) { + $this->testParsedEventMatchExpectedType($shapeMember, $value); } - $this->assertEquals($output[$count], $event); - $count++; } - $this->assertEquals($expectedCount, $count); } + /** + * This method checks for whether the type for the provided value matches the equivalent + * to the shape type as native type. + * + * @param string $shapeType + * @param mixed $value + * + * @return bool true if matches type otherwise false. + */ + private function shapeTypeMatchesValue($shapeType, $value) + { + switch ($shapeType) { + case 'boolean': + return is_bool($value); + case 'blob': + return $value instanceof StreamInterface || is_string($value); + case 'byte': + case 'integer': + case 'long': + case 'float': + return is_numeric($value); + case 'string': + return is_string($value); + case 'structure': + case 'map': + case 'list': + return is_array($value) || is_object($value); + case 'timestamp': + return $value instanceof DateTimeResult || empty($value); + } + + return false; + } + + /** + * This test checks for if an exception is thrown when an error is returned as an event. + * In such case the header ':message-type' should be set to 'error' + */ public function testThrowsOnErrorEvent() { $stream = Psr7\Utils::streamFor( @@ -111,9 +197,10 @@ public function testThrowsOnErrorEvent() __DIR__ . '/../eventstream_fixtures/input/error_event' )) ); + $shape = $this->loadEventStreamShapeFromJson(self::EVENT_STREAM_SHAPE); $iterator = new EventParsingIterator( $stream, - $this->eventstreamShape, + $shape, new RestXmlParser( new Service([], function () { return []; }) ) @@ -131,10 +218,15 @@ public function testThrowsOnErrorEvent() } } + /** + * This test checks for if an exception is thrown when the header ':message-type' + * is not set or has an unknown type. + */ public function testThrowsOnUnknownMessageType() { $this->expectExceptionMessage("Failed to parse unknown message type."); $this->expectException(\Aws\Api\Parser\Exception\ParserException::class); + $shape = $this->loadEventStreamShapeFromJson(self::EVENT_STREAM_SHAPE); $stream = Psr7\Utils::streamFor( base64_decode(file_get_contents( __DIR__ . '/../eventstream_fixtures/input/unknown_message_type' @@ -142,7 +234,7 @@ public function testThrowsOnUnknownMessageType() ); $iterator = new EventParsingIterator( $stream, - $this->eventstreamShape, + $shape, new RestXmlParser( new Service([], function () { return []; }) ) @@ -151,10 +243,15 @@ public function testThrowsOnUnknownMessageType() $iterator->current(); } + /** + * This test checks for if an exception is thrown when the header ':event-type' + * is not set or has an unknown type. + */ public function testThrowsOnUnknownEventType() { $this->expectExceptionMessage("Failed to parse without event type."); $this->expectException(\Aws\Api\Parser\Exception\ParserException::class); + $shape = $this->loadEventStreamShapeFromJson(self::EVENT_STREAM_SHAPE); $stream = Psr7\Utils::streamFor( base64_decode(file_get_contents( __DIR__ . '/../eventstream_fixtures/input/unknown_event_type' @@ -162,7 +259,7 @@ public function testThrowsOnUnknownEventType() ); $iterator = new EventParsingIterator( $stream, - $this->eventstreamShape, + $shape, new RestXmlParser( new Service([], function () { return []; @@ -172,4 +269,45 @@ public function testThrowsOnUnknownEventType() $iterator->current(); } + + /** + * This method loads a shape defined in JSON format, from a specified path. + * + * @param string $jsonFilePath + * + * @return StructureShape + */ + private function loadEventStreamShapeFromJson($jsonFilePath) + { + $shape = json_decode( + file_get_contents($jsonFilePath), + true + ); + + return new StructureShape( + $shape, + new ShapeMap(['EventStream' => $shape]) + ); + } + + /** + * This method creates an instance of a RestParser class based on the protocol provided. + * + * @return AbstractRestParser + */ + private function createRestParser($protocol) + { + switch ($protocol) { + case self::PROTOCOL_XML: + return new RestXmlParser(new Service([], function () { + return []; + })); + case self::PROTOCOL_JSON: + return new RestJsonParser(new Service([], function () { + return []; + })); + default: + throw new ParserException('Unknown parser protocol "' . $protocol . '"'); + } + } } diff --git a/tests/Api/eventstream_fixtures/bedrock_invoke_model_shape.json b/tests/Api/eventstream_fixtures/bedrock_invoke_model_shape.json new file mode 100644 index 0000000000..441cba7ca8 --- /dev/null +++ b/tests/Api/eventstream_fixtures/bedrock_invoke_model_shape.json @@ -0,0 +1,14 @@ +{ + "type": "structure", + "eventstream": true, + "members": { + "chunk": { + "type": "structure", + "members": { + "bytes": { + "type": "blob" + } + } + } + } +} diff --git a/tests/Api/eventstream_fixtures/headers_payload_shape.json b/tests/Api/eventstream_fixtures/headers_payload_shape.json new file mode 100644 index 0000000000..06ac641813 --- /dev/null +++ b/tests/Api/eventstream_fixtures/headers_payload_shape.json @@ -0,0 +1,26 @@ +{ + "type": "structure", + "eventstream": true, + "members": { + "Service": { + "type": "structure", + "members": { + "Region": { + "type": "string", + "eventheader": true + }, + "Availability": { + "type": "string", + "eventheader": true + }, + "ServiceName": { + "type": "string", + "eventheader": true + }, + "Logs": { + "type": "blob" + } + } + } + } +} diff --git a/tests/Api/eventstream_fixtures/input/bedrock_invoke_model_event b/tests/Api/eventstream_fixtures/input/bedrock_invoke_model_event new file mode 100644 index 0000000000..7595e5d951 --- /dev/null +++ b/tests/Api/eventstream_fixtures/input/bedrock_invoke_model_event @@ -0,0 +1 @@ +AAAAewAAAEsKLpavDTptZXNzYWdlLXR5cGUHAAVldmVudAs6ZXZlbnQtdHlwZQcABWNodW5rDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29ueyJieXRlcyI6ImV5SkdiMjhpT2lKR2IyOGlmUT09In3300ppAAAAewAAAEsKLpavDTptZXNzYWdlLXR5cGUHAAVldmVudAs6ZXZlbnQtdHlwZQcABWNodW5rDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29ueyJieXRlcyI6ImV5SkNkWHA2SWpvaVFuVjZlaUo5In0meAOc diff --git a/tests/Api/eventstream_fixtures/input/headers_payload_event b/tests/Api/eventstream_fixtures/input/headers_payload_event new file mode 100644 index 0000000000..efe5757bf1 --- /dev/null +++ b/tests/Api/eventstream_fixtures/input/headers_payload_event @@ -0,0 +1 @@ +AAAA2AAAAI1PrhRsDTptZXNzYWdlLXR5cGUHAAVldmVudAs6ZXZlbnQtdHlwZQcAB1NlcnZpY2UNOmNvbnRlbnQtdHlwZQcAEGFwcGxpY2F0aW9uL2pzb24GUmVnaW9uBwAJVVNfRUFTVF8xDEF2YWlsYWJpbGl0eQcAC0FWXzFfWk9ORV8wC1NlcnZpY2VOYW1lBwADRm9veyJMb2dzIjoiVTJWeWRtbGpaU0JvWVhNZ2MzUmhjblJsWkNCcGRITWdiM0JsY21GMGFXOXVjeUU9In0mrNEB diff --git a/tests/Api/eventstream_fixtures/input/lambda_invoke_event b/tests/Api/eventstream_fixtures/input/lambda_invoke_event new file mode 100644 index 0000000000..7a25be735c --- /dev/null +++ b/tests/Api/eventstream_fixtures/input/lambda_invoke_event @@ -0,0 +1 @@ +AAAAhAAAAFKkIfnjDTptZXNzYWdlLXR5cGUHAAVldmVudAs6ZXZlbnQtdHlwZQcADFBheWxvYWRDaHVuaw06Y29udGVudC10eXBlBwAQYXBwbGljYXRpb24vanNvbnsiUGF5bG9hZCI6ImV5SkdiMjhpT2lKR2IyOGlmUT09In2B7WUJAAAAoAAAAFR5A9USDTptZXNzYWdlLXR5cGUHAAVldmVudAs6ZXZlbnQtdHlwZQcADkludm9rZUNvbXBsZXRlDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29ueyJFcnJvckNvZGUiOiJCdXp6IiwiRXJyb3JEZXRhaWxzIjoiRm9vIiwiTG9nUmVzdWx0IjoiUm05diJ9pNO2IA== diff --git a/tests/Api/eventstream_fixtures/lambda_invoke_shape.json b/tests/Api/eventstream_fixtures/lambda_invoke_shape.json new file mode 100644 index 0000000000..3d40825e51 --- /dev/null +++ b/tests/Api/eventstream_fixtures/lambda_invoke_shape.json @@ -0,0 +1,28 @@ +{ + "type": "union", + "eventstream": true, + "members": { + "PayloadChunk": { + "type": "structure", + "members": { + "Payload": { + "type": "blob" + } + } + }, + "InvokeComplete": { + "type": "structure", + "members": { + "ErrorCode": { + "type": "string" + }, + "ErrorDetails": { + "type": "string" + }, + "LogResult": { + "type": "string" + } + } + } + } +} diff --git a/tests/Api/eventstream_fixtures/output/bedrock_invoke_model_event.json b/tests/Api/eventstream_fixtures/output/bedrock_invoke_model_event.json new file mode 100644 index 0000000000..61a917aba4 --- /dev/null +++ b/tests/Api/eventstream_fixtures/output/bedrock_invoke_model_event.json @@ -0,0 +1,12 @@ +[ + { + "chunk": { + "bytes": "{\"Foo\":\"Foo\"}" + } + }, + { + "chunk": { + "bytes": "{\"Buzz\":\"Buzz\"}" + } + } +] diff --git a/tests/Api/eventstream_fixtures/output/headers_payload_event.json b/tests/Api/eventstream_fixtures/output/headers_payload_event.json new file mode 100644 index 0000000000..541993729a --- /dev/null +++ b/tests/Api/eventstream_fixtures/output/headers_payload_event.json @@ -0,0 +1,8 @@ +{ + "Service": { + "Region": "US_EAST_1", + "Availability": "AV_1_ZONE_0", + "ServiceName": "Foo", + "Logs": "Service has started its operations!" + } +} diff --git a/tests/Api/eventstream_fixtures/output/lambda_invoke_event.json b/tests/Api/eventstream_fixtures/output/lambda_invoke_event.json new file mode 100644 index 0000000000..3d8f491d94 --- /dev/null +++ b/tests/Api/eventstream_fixtures/output/lambda_invoke_event.json @@ -0,0 +1,14 @@ +[ + { + "PayloadChunk": { + "Payload": "{\"Foo\":\"Foo\"}" + } + }, + { + "InvokeComplete": { + "ErrorCode": "Buzz", + "ErrorDetails": "Foo", + "LogResult": "Rm9v" + } + } +]