Skip to content

Commit

Permalink
fix: event parsing iterator
Browse files Browse the repository at this point in the history
The EventParsingIterator implementation does not parse correctly events when their payload do not have the eventpayload property set. The current behavior is that if said property "eventpayload" is not present then it fallback to extracting the value from the headers, but this is not a reliable method for parsing event streams since seems that not all services uses this pattern to provide the events. One example is the InvokeModelWithResponseStream operation from bedrock-runtime service. To fix this we pretty much followed the specs found in the smithy reference: https://smithy.io/2.0/spec/streaming.html#eventpayload-trait#id6, which explains that when eventpayload and eventheader is not set for any of the members of the shape then, the payload  itself should be considered as the top level structured to be parsed.
  • Loading branch information
yenfryherrerafeliz committed Dec 1, 2023
1 parent 5b8a381 commit 177f2f8
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 85 deletions.
35 changes: 21 additions & 14 deletions src/Api/Parser/EventParsingIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,36 +69,43 @@ private function parseEvent(array $event)
if ($event['headers'][':message-type'] === 'error') {
return $this->parseError($event);
}

if ($event['headers'][':message-type'] !== 'event') {
throw new ParserException('Failed to parse unknown message type.');
}
}

if (empty($event['headers'][':event-type'])) {
$eventType = $event['headers'][':event-type'] ?? null;
if (empty($eventType)) {
throw new ParserException('Failed to parse without event type.');
}
$eventShape = $this->shape->getMember($event['headers'][':event-type']);

$eventShape = $this->shape->getMember($eventType);
$eventPayload = $event['payload'];
$parsedEvent = [];
foreach ($eventShape['members'] as $shape => $details) {
if (!empty($details['eventpayload'])) {
$payloadShape = $eventShape->getMember($shape);
if ($payloadShape['type'] === 'blob') {
$parsedEvent[$shape] = $event['payload'];
foreach ($eventShape->getMembers() as $eventShapeMemberKey => $eventShapeMemberProps) {
$memberShape = $eventShape->getMember($eventShapeMemberKey);
if (!empty($eventShapeMemberProps['eventheader'])) {
$parsedEvent[$eventShapeMemberKey] = $event['headers'][$eventShapeMemberKey];
} elseif (!empty($eventShapeMemberProps['eventpayload'])) {
if ($memberShape->getType() === 'blob') {
$parsedEvent[$eventShapeMemberKey] = $eventPayload;
} else {
$parsedEvent[$shape] = $this->parser->parseMemberFromStream(
$event['payload'],
$payloadShape,
$parsedEvent[$eventShapeMemberKey] = $this->parser->parseMemberFromStream(
$eventPayload,
$memberShape,
null
);
}
} else {
$parsedEvent[$shape] = $event['headers'][$shape];
}
}

if (empty($parsedEvent) && !empty($eventPayload->getContents())) {
$parsedEvent = $this->parser->parseMemberFromStream($eventPayload, $eventShape, null);
}

return [
$event['headers'][':event-type'] => $parsedEvent
$eventType => $parsedEvent
];
}

Expand All @@ -109,4 +116,4 @@ private function parseError(array $event)
$event['headers'][':error-message']
);
}
}
}
Loading

0 comments on commit 177f2f8

Please sign in to comment.