Skip to content

Commit

Permalink
Fixed progressive RPC issue where multiple where a bad yeild message …
Browse files Browse the repository at this point in the history
…was being sent as the last message.

Fixed some error handling issues
  • Loading branch information
davidwdan committed Jun 27, 2019
1 parent 043d3b1 commit 0d7b0e4
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 74 deletions.
70 changes: 10 additions & 60 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public function __construct(string $url, string $realm, array $options = [], Sub

$goodByeMsg = $goodByeMsg->do([$this->onClose, 'onNext']);

$abortMsg = $abortMsg->do([$this->onClose, 'onNext']);

$remainingMsgs = $remainingMsgs->merge($goodByeMsg);

$abortMsg = $abortMsg->do([$this->onClose, 'onNext']);

$challenge = $this->challenge($challengeMsg)->do([$webSocket, 'onNext']);

$abortError = $abortMsg->map(function (AbortMessage $msg) {
Expand All @@ -90,13 +90,6 @@ public function __construct(string $url, string $realm, array $options = [], Sub
$this->disposable->add($webSocket);
}

/**
* @param string $uri
* @param array $args
* @param array $argskw
* @param array $options
* @return Observable
*/
public function call(string $uri, array $args = [], array $argskw = [], array $options = null): Observable
{
return $this->session
Expand All @@ -107,12 +100,6 @@ public function call(string $uri, array $args = [], array $argskw = [], array $o
->take(1);
}

/**
* @param string $uri
* @param callable $callback
* @param array $options
* @return Observable
*/
public function register(string $uri, callable $callback, array $options = []): Observable
{
return $this->registerExtended($uri, $callback, $options, false);
Expand All @@ -122,37 +109,19 @@ public function register(string $uri, callable $callback, array $options = []):
* This is a variant of call, that expects the far end to emit more than one result. It will also repeat the call,
* if the websocket connection resets and the observable has not completed or errored.
*
* @param string $uri
* @param array $args
* @param array $argskw
* @param array $options
* @return Observable
*/
public function progressiveCall(string $uri, array $args = [], array $argskw = [], array $options = null): Observable
{
$options['receive_progress'] = true;

return Observable::defer(function () use ($uri, $args, $argskw, $options) {
$completed = new Subject();

return $this->session
->takeUntil($completed)
->flatMapLatest(function ($res) use ($completed, $uri, $args, $argskw, $options) {
[$messages, $webSocket] = $res;
return (new CallObservable($uri, $messages, $webSocket, $args, $argskw, $options))
->doOnCompleted(function () use ($completed) {
$completed->onNext(0);
});
});
});
return $this->session
->flatMapLatest(function ($res) use ($uri, $args, $argskw, $options) {
[$messages, $webSocket] = $res;
return new CallObservable($uri, $messages, $webSocket, $args, $argskw, $options);
})
->retryWhen([$this, '_reconnect']);
}

/**
* @param string $uri
* @param callable $callback
* @param array $options
* @return Observable
*/
public function progressiveRegister(string $uri, callable $callback, array $options = []): Observable
{
$options['progress'] = true;
Expand All @@ -162,13 +131,6 @@ public function progressiveRegister(string $uri, callable $callback, array $opti
return $this->registerExtended($uri, $callback, $options);
}

/**
* @param string $uri
* @param callable $callback
* @param array $options
* @param bool $extended
* @return Observable
*/
public function registerExtended(string $uri, callable $callback, array $options = [], bool $extended = true): Observable
{
return $this->session->flatMapLatest(function ($res) use ($uri, $callback, $options, $extended) {
Expand All @@ -177,11 +139,6 @@ public function registerExtended(string $uri, callable $callback, array $options
});
}

/**
* @param string $uri
* @param array $options
* @return Observable
*/
public function topic(string $uri, array $options = []): Observable
{
return $this->session->flatMapLatest(function ($res) use ($uri, $options) {
Expand All @@ -190,13 +147,6 @@ public function topic(string $uri, array $options = []): Observable
});
}

/**
* @param string $uri
* @param mixed | Observable $obs
* @param array $options
* @return DisposableInterface
* @throws \InvalidArgumentException
*/
public function publish(string $uri, $obs, array $options = []): DisposableInterface
{
$obs = $obs instanceof Observable ? $obs : Observable::of($obs);
Expand All @@ -206,7 +156,7 @@ public function publish(string $uri, $obs, array $options = []): DisposableInter
return $this->session
->takeUntil($completed->delay(1))
->pluck(1)
->map(function ( $webSocket) use ($obs, $completed, $uri, $options) {
->map(function ($webSocket) use ($obs, $completed, $uri, $options) {
return $obs
->finally(function () use ($completed) {
$completed->onNext(0);
Expand Down Expand Up @@ -250,7 +200,7 @@ public function _reconnect(Observable $attempts)
return $attempts
->flatMap(function (\Exception $ex) use ($maxRetryDelay, $retryDelayGrowth, $initialRetryDelay) {
$delay = min($maxRetryDelay, pow($retryDelayGrowth, ++$this->currentRetryCount) + $initialRetryDelay);
$seconds = number_format((float)$delay / 1000, 3, '.', '');;
$seconds = number_format((float)$delay / 1000, 3, '.', '');
echo 'Error: ', $ex->getMessage(), PHP_EOL, "Reconnecting in ${seconds} seconds...", PHP_EOL;
return Observable::timer((int)$delay);
})
Expand Down
5 changes: 3 additions & 2 deletions src/Observable/CallObservable.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public function __construct(
array $options = null,
int $timeout = 300000,
SchedulerInterface $scheduler = null
) {
)
{
$this->uri = $uri;
$this->args = $args;
$this->argskw = $argskw;
Expand Down Expand Up @@ -86,7 +87,7 @@ public function _subscribe(ObserverInterface $observer): DisposableInterface
return $msg instanceof ErrorMessage && $msg->getErrorRequestId() === $requestId;
})
->flatMap(function (ErrorMessage $msg) {
return Observable::error(new WampErrorException($msg->getErrorURI(), $msg->getArguments()), $this->scheduler);
return Observable::error(new WampErrorException($msg->getErrorURI() . ':' . $this->uri, $msg->getArguments()), $this->scheduler);
})
->takeUntil($resultMsg)
->take(1);
Expand Down
23 changes: 12 additions & 11 deletions src/Observable/RegisterObservable.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ function () use (&$completed, $observer, $unregister) {

try {
if ($this->extended) {
$result = call_user_func_array($this->callback, [$msg->getArguments(), $msg->getArgumentsKw(), $msg->getDetails(), $msg]);
$result = \call_user_func($this->callback, $msg->getArguments(), $msg->getArgumentsKw(), $msg->getDetails(), $msg);
} else {
$result = call_user_func_array($this->callback, $msg->getArguments());
$result = \call_user_func_array($this->callback, $msg->getArguments());
}
} catch (\Throwable $e) {
$result = Observable::error($e);
Expand All @@ -118,15 +118,15 @@ function () use (&$completed, $observer, $unregister) {
$returnObs = $resultObs
->take(1)
->map(function ($value) use ($msg) {
return [$value, $msg, $this->options];
return new YieldMessage($msg->getRequestId(), new \stdClass(), [$value]);
});
} else {

$returnObs = $resultObs
->map(function ($value) use ($msg) {
return [$value, $msg, $this->options];
return new YieldMessage($msg->getRequestId(), (object)['progress' => true], [$value]);
})
->concat(Observable::of([null, $msg, ["progress" => false]], $this->scheduler));
->concat(Observable::of(new YieldMessage($msg->getRequestId(), new \stdClass()), $this->scheduler));
}

$interruptMsg = $this->messages
Expand All @@ -135,8 +135,15 @@ function () use (&$completed, $observer, $unregister) {
})
->take(1);

$errorMsg = $this->messages
->filter(function (Message $m) use ($msg) {
return $m instanceof ErrorMessage && $m->getRequestId() === $msg->getRequestId();
})
->take(1);

return $returnObs
->takeUntil($interruptMsg)
->takeUntil($errorMsg)
->catch(function (\Throwable $ex) use ($msg) {
$invocationError = $ex instanceof WampErrorException
? WampInvocationException::withInvocationMessageAndWampErrorException($msg, $ex)
Expand All @@ -146,12 +153,6 @@ function () use (&$completed, $observer, $unregister) {
return Observable::empty($this->scheduler);
});
})
->map(function ($args) {
/* @var $invocationMsg InvocationMessage */
list($value, $invocationMsg, $options) = $args;

return new YieldMessage($invocationMsg->getRequestId(), $options, [$value]);
})
->subscribe($this->ws);

$invocationErrors = $this->invocationErrors
Expand Down
2 changes: 1 addition & 1 deletion src/Subject/WebSocketSubject.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ protected function _subscribe(ObserverInterface $observer): DisposableInterface

$this->openObserver->onNext($ms);
})
->switch()
->finally(function () {
// The connection has closed, so start buffering messages util it reconnects.
$this->sendSubject = new ReplaySubject();
Expand All @@ -67,7 +68,6 @@ protected function _subscribe(ObserverInterface $observer): DisposableInterface
echo "Error {$e->getMessage()}, Reconnecting\n";
})->delay(1000);
})
->switch()
->map([$this->serializer, 'deserialize'])
->subscribe($observer);
}
Expand Down

0 comments on commit 0d7b0e4

Please sign in to comment.