Skip to content

Commit

Permalink
Bug fix: Error after progressive result was being ignored
Browse files Browse the repository at this point in the history
  • Loading branch information
mbonneau committed Aug 18, 2020
1 parent 5e028f0 commit 71dd23e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 26 deletions.
40 changes: 14 additions & 26 deletions src/Observable/CallObservable.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,15 @@ public function _subscribe(ObserverInterface $observer): DisposableInterface
$requestId = Utils::getUniqueId();
$callMsg = new CallMessage($requestId, $this->options, $this->uri, $this->args, $this->argskw);

$msg = $this->messages
$result = $this->messages
->filter(function (Message $msg) use ($requestId) {
return $msg instanceof ResultMessage && $msg->getRequestId() === $requestId;
return ($msg instanceof ResultMessage && $msg->getRequestId() === $requestId)
|| ($msg instanceof ErrorMessage && $msg->getErrorRequestId() === $requestId);
})
->flatMap(function (ResultMessage $msg) {
->flatMap(function (Message $msg) {
if ($msg instanceof ErrorMessage) {
return Observable::error(new WampErrorException($msg->getErrorURI() . ':' . $this->uri, $msg->getArguments()), $this->scheduler);
}

static $i = -1;
$i++;
Expand All @@ -69,28 +73,20 @@ public function _subscribe(ObserverInterface $observer): DisposableInterface
], $this->scheduler);
}
return Observable::of($msg);
});

//Take until we get a result without progress
$resultMsg = $msg
})
//Take until we get a result without progress
->takeWhile(function (ResultMessage $msg) {
$details = $msg->getDetails();
return (bool)($details->progress ?? false);
})
->finally(function () {
$this->completed = true;
})
->share();

$error = $this->messages
->filter(function (Message $msg) use ($requestId) {
return $msg instanceof ErrorMessage && $msg->getErrorRequestId() === $requestId;
})
->flatMap(function (ErrorMessage $msg) {
return Observable::error(new WampErrorException($msg->getErrorURI() . ':' . $this->uri, $msg->getArguments()), $this->scheduler);
})
->takeUntil($resultMsg)
->take(1);
->map(function (ResultMessage $msg) {
$details = $msg->getDetails();
unset($details->progress);
return new ResultMessage($msg->getRequestId(), $details, $msg->getArguments(), $msg->getArgumentsKw());
});

try {
$this->webSocket->onNext($callMsg);
Expand All @@ -99,14 +95,6 @@ public function _subscribe(ObserverInterface $observer): DisposableInterface
return new EmptyDisposable();
}

$result = $error
->merge($resultMsg)
->map(function (ResultMessage $msg) {
$details = $msg->getDetails();
unset($details->progress);
return new ResultMessage($msg->getRequestId(), $details, $msg->getArguments(), $msg->getArgumentsKw());
});

return new CompositeDisposable([
new CallbackDisposable(function () use ($requestId) {
if (!$this->completed) {
Expand Down
36 changes: 36 additions & 0 deletions tests/Functional/Observable/CallObservableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -497,4 +497,40 @@ public function call_one_dispose_after()
onCompleted(252)
], $results->getMessages());
}

/**
* @test
*/
public function progressive_error_after_result() {
$args = ["testing"];

$resultMessage = new ResultMessage(null, (object)['progress' => true], $args, null);
$errorMessage = new ErrorMessage(CallMessage::MSG_CALL, 123, (object)[], 'error.something');

$webSocket = new Subject();
$webSocket->subscribe(function (CallMessage $msg) use ($resultMessage, $errorMessage) {
$requestId = $msg->getRequestId();
$resultMessage->setRequestId($requestId);
$errorMessage->setRequestId($requestId);
});

$messages = $this->createHotObservable([
onNext(150, 1),
onNext(210, new WelcomeMessage(12345, new \stdClass())),
onNext(250, $resultMessage),
onNext(300, $errorMessage),
onCompleted(350)
]);

$results = $this->scheduler->startWithDispose(function () use ($messages, $webSocket) {
return new CallObservable('testing.uri', $messages, $webSocket, null, null, ['receive_progress' => true], 3000, $this->scheduler);
}, 355);

$exception = new WampErrorException($errorMessage->getErrorURI() . ':testing.uri', $errorMessage->getArguments());

$this->assertMessages([
onNext(250, $resultMessage),
onError(301, $exception)
], $results->getMessages());
}
}

0 comments on commit 71dd23e

Please sign in to comment.