Skip to content

Commit

Permalink
fix: event parsing iterator
Browse files Browse the repository at this point in the history
The EventParsingIterator implementation does not parse correctly events when their payload do not have the eventpayload property set. The current behavior is that if said property "eventpayload" is not present then it fallback to extracting the value from the headers, but this is not a reliable method for parsing event streams since seems that not all services uses this pattern to provide the events. One example is the InvokeModelWithResponseStream operation from bedrock-runtime service. To fix this we pretty much followed the specs found in the smithy reference: https://smithy.io/2.0/spec/streaming.html#eventpayload-trait#id6, which explains that when eventpayload and eventheader is not set for any of the members of the shape then, the payload  itself should be considered as the top level structured to be parsed.
  • Loading branch information
yenfryherrerafeliz committed Nov 21, 2023
1 parent 5b8a381 commit 11f4391
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions src/Api/Parser/EventParsingIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,36 +69,43 @@ private function parseEvent(array $event)
if ($event['headers'][':message-type'] === 'error') {
return $this->parseError($event);
}

if ($event['headers'][':message-type'] !== 'event') {
throw new ParserException('Failed to parse unknown message type.');
}
}

if (empty($event['headers'][':event-type'])) {
$eventType = $event['headers'][':event-type'] ?? null;
if (is_null($eventType)) {
throw new ParserException('Failed to parse without event type.');
}
$eventShape = $this->shape->getMember($event['headers'][':event-type']);

$eventShape = $this->shape->getMember($eventType);
$eventPayload = $event['payload'];
$parsedEvent = [];
foreach ($eventShape['members'] as $shape => $details) {
if (!empty($details['eventpayload'])) {
$payloadShape = $eventShape->getMember($shape);
if ($payloadShape['type'] === 'blob') {
$parsedEvent[$shape] = $event['payload'];
foreach ($eventShape['members'] as $eventShapeMemberKey => $eventShapeMemberProps) {
$memberShape = $eventShape->getMember($eventShapeMemberKey);
if (!empty($eventShapeMemberProps['eventheader'])) {
$parsedEvent[$eventShapeMemberKey] = $event['headers'][$eventShapeMemberKey];
} elseif (!empty($eventShapeMemberProps['eventpayload'])) {
if ($memberShape['type'] === 'blob') {
$parsedEvent[$eventShapeMemberKey] = $eventPayload;
} else {
$parsedEvent[$shape] = $this->parser->parseMemberFromStream(
$event['payload'],
$payloadShape,
$parsedEvent[$eventShapeMemberKey] = $this->parser->parseMemberFromStream(
$eventPayload,
$memberShape,
null
);
}
} else {
$parsedEvent[$shape] = $event['headers'][$shape];
}
}

if (empty($parsedEvent) && !empty($eventPayload->getContents())) {
$parsedEvent = $this->parser->parseMemberFromStream($eventPayload, $eventShape, null);
}

return [
$event['headers'][':event-type'] => $parsedEvent
$eventType => $parsedEvent
];
}

Expand All @@ -109,4 +116,4 @@ private function parseError(array $event)
$event['headers'][':error-message']
);
}
}
}

0 comments on commit 11f4391

Please sign in to comment.