From 71dd23e676d640d7617a0a85c512a7f7834b520e Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Tue, 18 Aug 2020 13:30:54 -0400 Subject: [PATCH] Bug fix: Error after progressive result was being ignored --- src/Observable/CallObservable.php | 40 +++++++------------ .../Observable/CallObservableTest.php | 36 +++++++++++++++++ 2 files changed, 50 insertions(+), 26 deletions(-) diff --git a/src/Observable/CallObservable.php b/src/Observable/CallObservable.php index 3ceccc6..033d064 100644 --- a/src/Observable/CallObservable.php +++ b/src/Observable/CallObservable.php @@ -48,11 +48,15 @@ public function _subscribe(ObserverInterface $observer): DisposableInterface $requestId = Utils::getUniqueId(); $callMsg = new CallMessage($requestId, $this->options, $this->uri, $this->args, $this->argskw); - $msg = $this->messages + $result = $this->messages ->filter(function (Message $msg) use ($requestId) { - return $msg instanceof ResultMessage && $msg->getRequestId() === $requestId; + return ($msg instanceof ResultMessage && $msg->getRequestId() === $requestId) + || ($msg instanceof ErrorMessage && $msg->getErrorRequestId() === $requestId); }) - ->flatMap(function (ResultMessage $msg) { + ->flatMap(function (Message $msg) { + if ($msg instanceof ErrorMessage) { + return Observable::error(new WampErrorException($msg->getErrorURI() . ':' . $this->uri, $msg->getArguments()), $this->scheduler); + } static $i = -1; $i++; @@ -69,10 +73,8 @@ public function _subscribe(ObserverInterface $observer): DisposableInterface ], $this->scheduler); } return Observable::of($msg); - }); - - //Take until we get a result without progress - $resultMsg = $msg + }) + //Take until we get a result without progress ->takeWhile(function (ResultMessage $msg) { $details = $msg->getDetails(); return (bool)($details->progress ?? false); @@ -80,17 +82,11 @@ public function _subscribe(ObserverInterface $observer): DisposableInterface ->finally(function () { $this->completed = true; }) - ->share(); - - $error = $this->messages - ->filter(function (Message $msg) use ($requestId) { - return $msg instanceof ErrorMessage && $msg->getErrorRequestId() === $requestId; - }) - ->flatMap(function (ErrorMessage $msg) { - return Observable::error(new WampErrorException($msg->getErrorURI() . ':' . $this->uri, $msg->getArguments()), $this->scheduler); - }) - ->takeUntil($resultMsg) - ->take(1); + ->map(function (ResultMessage $msg) { + $details = $msg->getDetails(); + unset($details->progress); + return new ResultMessage($msg->getRequestId(), $details, $msg->getArguments(), $msg->getArgumentsKw()); + }); try { $this->webSocket->onNext($callMsg); @@ -99,14 +95,6 @@ public function _subscribe(ObserverInterface $observer): DisposableInterface return new EmptyDisposable(); } - $result = $error - ->merge($resultMsg) - ->map(function (ResultMessage $msg) { - $details = $msg->getDetails(); - unset($details->progress); - return new ResultMessage($msg->getRequestId(), $details, $msg->getArguments(), $msg->getArgumentsKw()); - }); - return new CompositeDisposable([ new CallbackDisposable(function () use ($requestId) { if (!$this->completed) { diff --git a/tests/Functional/Observable/CallObservableTest.php b/tests/Functional/Observable/CallObservableTest.php index 3c624da..7179802 100644 --- a/tests/Functional/Observable/CallObservableTest.php +++ b/tests/Functional/Observable/CallObservableTest.php @@ -497,4 +497,40 @@ public function call_one_dispose_after() onCompleted(252) ], $results->getMessages()); } + + /** + * @test + */ + public function progressive_error_after_result() { + $args = ["testing"]; + + $resultMessage = new ResultMessage(null, (object)['progress' => true], $args, null); + $errorMessage = new ErrorMessage(CallMessage::MSG_CALL, 123, (object)[], 'error.something'); + + $webSocket = new Subject(); + $webSocket->subscribe(function (CallMessage $msg) use ($resultMessage, $errorMessage) { + $requestId = $msg->getRequestId(); + $resultMessage->setRequestId($requestId); + $errorMessage->setRequestId($requestId); + }); + + $messages = $this->createHotObservable([ + onNext(150, 1), + onNext(210, new WelcomeMessage(12345, new \stdClass())), + onNext(250, $resultMessage), + onNext(300, $errorMessage), + onCompleted(350) + ]); + + $results = $this->scheduler->startWithDispose(function () use ($messages, $webSocket) { + return new CallObservable('testing.uri', $messages, $webSocket, null, null, ['receive_progress' => true], 3000, $this->scheduler); + }, 355); + + $exception = new WampErrorException($errorMessage->getErrorURI() . ':testing.uri', $errorMessage->getArguments()); + + $this->assertMessages([ + onNext(250, $resultMessage), + onError(301, $exception) + ], $results->getMessages()); + } }