Skip to content

Commit

Permalink
fix: event parsing iterator
Browse files Browse the repository at this point in the history
The EventParsingIterator implementation does not parse correctly events when their payload do not have the eventpayload property set. The current behavior is that if said property "eventpayload" is not present then it fallback to extracting the value from the headers, but this is not a reliable method for parsing event streams since seems that not all services uses this pattern to provide the events. One example is the InvokeModelWithResponseStream operation from bedrock-runtime service. To fix this we pretty much followed the specs found in the smithy reference: https://smithy.io/2.0/spec/streaming.html#eventpayload-trait#id6, which explains that when eventpayload and eventheader is not set for any of the members of the shape then, the payload  itself should be considered as the top level shape structure to be parsed.
The test implementation for eventstream  have also been refactored as follow:
 - We now have to specify the rest protocol that the input test data will be using, and based on that protocol we have a factory method to create either a rest xml or rest JSON parser.
 - Instead of validating if a specific member is present to validate if the parsed data type is the correct one, we now validate each parsed member to make sure they are all parsed into the expected data type.
  • Loading branch information
yenfryherrerafeliz committed Dec 11, 2023
1 parent 5b8a381 commit 08e961d
Show file tree
Hide file tree
Showing 11 changed files with 374 additions and 86 deletions.
80 changes: 62 additions & 18 deletions src/Api/Parser/EventParsingIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,37 +69,81 @@ private function parseEvent(array $event)
if ($event['headers'][':message-type'] === 'error') {
return $this->parseError($event);
}

if ($event['headers'][':message-type'] !== 'event') {
throw new ParserException('Failed to parse unknown message type.');
}
}

if (empty($event['headers'][':event-type'])) {
$eventType = $event['headers'][':event-type'] ?? null;
if (empty($eventType)) {
throw new ParserException('Failed to parse without event type.');
}
$eventShape = $this->shape->getMember($event['headers'][':event-type']);

$parsedEvent = [];
foreach ($eventShape['members'] as $shape => $details) {
if (!empty($details['eventpayload'])) {
$payloadShape = $eventShape->getMember($shape);
if ($payloadShape['type'] === 'blob') {
$parsedEvent[$shape] = $event['payload'];

$eventShape = $this->shape->getMember($eventType);
$eventPayload = $event['payload'];

return [
$eventType => array_merge(
$this->parseEventHeaders($event['headers'], $eventShape),
$this->parseEventPayload($eventPayload, $eventShape)
)
];
}

/**
* @param $headers
* @param $eventShape
*
* @return array
*/
private function parseEventHeaders($headers, $eventShape): array
{
$parsedHeaders = [];
foreach ($eventShape->getMembers() as $memberName => $memberProps) {
if (isset($memberProps['eventheader'])) {
$parsedHeaders[$memberName] = $headers[$memberName];
}
}

return $parsedHeaders;
}

/**
* @param $payload
* @param $eventShape
*
* @return array
*/
private function parseEventPayload($payload, $eventShape): array
{
$parsedPayload = [];
foreach ($eventShape->getMembers() as $memberName => $memberProps) {
$memberShape = $eventShape->getMember($memberName);
if (isset($memberProps['eventpayload'])) {
if ($memberShape->getType() === 'blob') {
$parsedPayload[$memberName] = $payload;
} else {
$parsedEvent[$shape] = $this->parser->parseMemberFromStream(
$event['payload'],
$payloadShape,
$parsedPayload[$memberName] = $this->parser->parseMemberFromStream(
$payload,
$memberShape,
null
);
}
} else {
$parsedEvent[$shape] = $event['headers'][$shape];

break;
}
}

return [
$event['headers'][':event-type'] => $parsedEvent
];
if (empty($parsedPayload) && !empty($payload->getContents())) {
/**
* If we did not find a member with an eventpayload trait, then we should deserialize the payload
* using the event's shape.
*/
$parsedPayload = $this->parser->parseMemberFromStream($payload, $eventShape, null);
}

return $parsedPayload;
}

private function parseError(array $event)
Expand All @@ -109,4 +153,4 @@ private function parseError(array $event)
$event['headers'][':error-message']
);
}
}
}
Loading

0 comments on commit 08e961d

Please sign in to comment.