From 0d7b0e420eb48fae036ffd6bf09adcb51cdf9498 Mon Sep 17 00:00:00 2001 From: David Dan Date: Thu, 27 Jun 2019 13:27:17 -0400 Subject: [PATCH] Fixed progressive RPC issue where multiple where a bad yeild message was being sent as the last message. Fixed some error handling issues --- src/Client.php | 70 ++++----------------------- src/Observable/CallObservable.php | 5 +- src/Observable/RegisterObservable.php | 23 ++++----- src/Subject/WebSocketSubject.php | 2 +- 4 files changed, 26 insertions(+), 74 deletions(-) diff --git a/src/Client.php b/src/Client.php index a247427..7e59f0e 100644 --- a/src/Client.php +++ b/src/Client.php @@ -64,10 +64,10 @@ public function __construct(string $url, string $realm, array $options = [], Sub $goodByeMsg = $goodByeMsg->do([$this->onClose, 'onNext']); - $abortMsg = $abortMsg->do([$this->onClose, 'onNext']); - $remainingMsgs = $remainingMsgs->merge($goodByeMsg); + $abortMsg = $abortMsg->do([$this->onClose, 'onNext']); + $challenge = $this->challenge($challengeMsg)->do([$webSocket, 'onNext']); $abortError = $abortMsg->map(function (AbortMessage $msg) { @@ -90,13 +90,6 @@ public function __construct(string $url, string $realm, array $options = [], Sub $this->disposable->add($webSocket); } - /** - * @param string $uri - * @param array $args - * @param array $argskw - * @param array $options - * @return Observable - */ public function call(string $uri, array $args = [], array $argskw = [], array $options = null): Observable { return $this->session @@ -107,12 +100,6 @@ public function call(string $uri, array $args = [], array $argskw = [], array $o ->take(1); } - /** - * @param string $uri - * @param callable $callback - * @param array $options - * @return Observable - */ public function register(string $uri, callable $callback, array $options = []): Observable { return $this->registerExtended($uri, $callback, $options, false); @@ -122,37 +109,19 @@ public function register(string $uri, callable $callback, array $options = []): * This is a variant of call, that expects the far end to emit more than one result. It will also repeat the call, * if the websocket connection resets and the observable has not completed or errored. * - * @param string $uri - * @param array $args - * @param array $argskw - * @param array $options - * @return Observable */ public function progressiveCall(string $uri, array $args = [], array $argskw = [], array $options = null): Observable { $options['receive_progress'] = true; - return Observable::defer(function () use ($uri, $args, $argskw, $options) { - $completed = new Subject(); - - return $this->session - ->takeUntil($completed) - ->flatMapLatest(function ($res) use ($completed, $uri, $args, $argskw, $options) { - [$messages, $webSocket] = $res; - return (new CallObservable($uri, $messages, $webSocket, $args, $argskw, $options)) - ->doOnCompleted(function () use ($completed) { - $completed->onNext(0); - }); - }); - }); + return $this->session + ->flatMapLatest(function ($res) use ($uri, $args, $argskw, $options) { + [$messages, $webSocket] = $res; + return new CallObservable($uri, $messages, $webSocket, $args, $argskw, $options); + }) + ->retryWhen([$this, '_reconnect']); } - /** - * @param string $uri - * @param callable $callback - * @param array $options - * @return Observable - */ public function progressiveRegister(string $uri, callable $callback, array $options = []): Observable { $options['progress'] = true; @@ -162,13 +131,6 @@ public function progressiveRegister(string $uri, callable $callback, array $opti return $this->registerExtended($uri, $callback, $options); } - /** - * @param string $uri - * @param callable $callback - * @param array $options - * @param bool $extended - * @return Observable - */ public function registerExtended(string $uri, callable $callback, array $options = [], bool $extended = true): Observable { return $this->session->flatMapLatest(function ($res) use ($uri, $callback, $options, $extended) { @@ -177,11 +139,6 @@ public function registerExtended(string $uri, callable $callback, array $options }); } - /** - * @param string $uri - * @param array $options - * @return Observable - */ public function topic(string $uri, array $options = []): Observable { return $this->session->flatMapLatest(function ($res) use ($uri, $options) { @@ -190,13 +147,6 @@ public function topic(string $uri, array $options = []): Observable }); } - /** - * @param string $uri - * @param mixed | Observable $obs - * @param array $options - * @return DisposableInterface - * @throws \InvalidArgumentException - */ public function publish(string $uri, $obs, array $options = []): DisposableInterface { $obs = $obs instanceof Observable ? $obs : Observable::of($obs); @@ -206,7 +156,7 @@ public function publish(string $uri, $obs, array $options = []): DisposableInter return $this->session ->takeUntil($completed->delay(1)) ->pluck(1) - ->map(function ( $webSocket) use ($obs, $completed, $uri, $options) { + ->map(function ($webSocket) use ($obs, $completed, $uri, $options) { return $obs ->finally(function () use ($completed) { $completed->onNext(0); @@ -250,7 +200,7 @@ public function _reconnect(Observable $attempts) return $attempts ->flatMap(function (\Exception $ex) use ($maxRetryDelay, $retryDelayGrowth, $initialRetryDelay) { $delay = min($maxRetryDelay, pow($retryDelayGrowth, ++$this->currentRetryCount) + $initialRetryDelay); - $seconds = number_format((float)$delay / 1000, 3, '.', '');; + $seconds = number_format((float)$delay / 1000, 3, '.', ''); echo 'Error: ', $ex->getMessage(), PHP_EOL, "Reconnecting in ${seconds} seconds...", PHP_EOL; return Observable::timer((int)$delay); }) diff --git a/src/Observable/CallObservable.php b/src/Observable/CallObservable.php index 99d1511..3ceccc6 100644 --- a/src/Observable/CallObservable.php +++ b/src/Observable/CallObservable.php @@ -30,7 +30,8 @@ public function __construct( array $options = null, int $timeout = 300000, SchedulerInterface $scheduler = null - ) { + ) + { $this->uri = $uri; $this->args = $args; $this->argskw = $argskw; @@ -86,7 +87,7 @@ public function _subscribe(ObserverInterface $observer): DisposableInterface return $msg instanceof ErrorMessage && $msg->getErrorRequestId() === $requestId; }) ->flatMap(function (ErrorMessage $msg) { - return Observable::error(new WampErrorException($msg->getErrorURI(), $msg->getArguments()), $this->scheduler); + return Observable::error(new WampErrorException($msg->getErrorURI() . ':' . $this->uri, $msg->getArguments()), $this->scheduler); }) ->takeUntil($resultMsg) ->take(1); diff --git a/src/Observable/RegisterObservable.php b/src/Observable/RegisterObservable.php index 3910231..a4eaec0 100644 --- a/src/Observable/RegisterObservable.php +++ b/src/Observable/RegisterObservable.php @@ -102,9 +102,9 @@ function () use (&$completed, $observer, $unregister) { try { if ($this->extended) { - $result = call_user_func_array($this->callback, [$msg->getArguments(), $msg->getArgumentsKw(), $msg->getDetails(), $msg]); + $result = \call_user_func($this->callback, $msg->getArguments(), $msg->getArgumentsKw(), $msg->getDetails(), $msg); } else { - $result = call_user_func_array($this->callback, $msg->getArguments()); + $result = \call_user_func_array($this->callback, $msg->getArguments()); } } catch (\Throwable $e) { $result = Observable::error($e); @@ -118,15 +118,15 @@ function () use (&$completed, $observer, $unregister) { $returnObs = $resultObs ->take(1) ->map(function ($value) use ($msg) { - return [$value, $msg, $this->options]; + return new YieldMessage($msg->getRequestId(), new \stdClass(), [$value]); }); } else { $returnObs = $resultObs ->map(function ($value) use ($msg) { - return [$value, $msg, $this->options]; + return new YieldMessage($msg->getRequestId(), (object)['progress' => true], [$value]); }) - ->concat(Observable::of([null, $msg, ["progress" => false]], $this->scheduler)); + ->concat(Observable::of(new YieldMessage($msg->getRequestId(), new \stdClass()), $this->scheduler)); } $interruptMsg = $this->messages @@ -135,8 +135,15 @@ function () use (&$completed, $observer, $unregister) { }) ->take(1); + $errorMsg = $this->messages + ->filter(function (Message $m) use ($msg) { + return $m instanceof ErrorMessage && $m->getRequestId() === $msg->getRequestId(); + }) + ->take(1); + return $returnObs ->takeUntil($interruptMsg) + ->takeUntil($errorMsg) ->catch(function (\Throwable $ex) use ($msg) { $invocationError = $ex instanceof WampErrorException ? WampInvocationException::withInvocationMessageAndWampErrorException($msg, $ex) @@ -146,12 +153,6 @@ function () use (&$completed, $observer, $unregister) { return Observable::empty($this->scheduler); }); }) - ->map(function ($args) { - /* @var $invocationMsg InvocationMessage */ - list($value, $invocationMsg, $options) = $args; - - return new YieldMessage($invocationMsg->getRequestId(), $options, [$value]); - }) ->subscribe($this->ws); $invocationErrors = $this->invocationErrors diff --git a/src/Subject/WebSocketSubject.php b/src/Subject/WebSocketSubject.php index f11e285..a3dc9dc 100644 --- a/src/Subject/WebSocketSubject.php +++ b/src/Subject/WebSocketSubject.php @@ -52,6 +52,7 @@ protected function _subscribe(ObserverInterface $observer): DisposableInterface $this->openObserver->onNext($ms); }) + ->switch() ->finally(function () { // The connection has closed, so start buffering messages util it reconnects. $this->sendSubject = new ReplaySubject(); @@ -67,7 +68,6 @@ protected function _subscribe(ObserverInterface $observer): DisposableInterface echo "Error {$e->getMessage()}, Reconnecting\n"; })->delay(1000); }) - ->switch() ->map([$this->serializer, 'deserialize']) ->subscribe($observer); }