Skip to content

Commit

Permalink
fix Unexpected wire type error caused when message spans across multi…
Browse files Browse the repository at this point in the history
…ple requests
  • Loading branch information
eithed authored and Th3Mouk committed Mar 1, 2019
1 parent f29e988 commit b27511d
Showing 1 changed file with 17 additions and 19 deletions.
36 changes: 17 additions & 19 deletions src/Rxnet/EventStore/ReadBuffer.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,23 @@ public function onNext($value)
$value = $this->currentMessage . $value;
}

do {
$buffer = new Buffer($value);
$dataLength = strlen($value);
$messageLength = $buffer->readInt32LE(0) + MessageConfiguration::INT_32_LENGTH;

if ($dataLength == $messageLength) {
$socketMessages[] = $this->decomposeMessage($value);
$this->currentMessage = null;
} elseif ($dataLength > $messageLength) {
$message = substr($value, 0, $messageLength);
$socketMessages[] = $this->decomposeMessage($message);

// reset data to next message
$value = substr($value, $messageLength, $dataLength);
$this->currentMessage = null;
} else {
$this->currentMessage .= $value;
}
} while ($dataLength > $messageLength);
$buffer = new Buffer($value);
$dataLength = strlen($value);
$messageLength = $buffer->readInt32LE(0) + MessageConfiguration::INT_32_LENGTH;

if ($dataLength == $messageLength) {
$socketMessages[] = $this->decomposeMessage($value);
$this->currentMessage = null;
} elseif ($dataLength > $messageLength) {
$message = substr($value, 0, $messageLength);
$socketMessages[] = $this->decomposeMessage($message);

// reset data to next message
$value = substr($value, $messageLength);
$this->currentMessage = $value;
} else {
$this->currentMessage = $value;
}

//echo "messages : ".count($socketMessages) ." for ".count($this->observers)." observers \n";
foreach ($socketMessages as $message) {
Expand Down

0 comments on commit b27511d

Please sign in to comment.