Skip to content

Commit

Permalink
Allow pass-through of connector
Browse files Browse the repository at this point in the history
  • Loading branch information
mbonneau committed Jul 20, 2019
1 parent 0d7b0e4 commit 663b1ce
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
5 changes: 3 additions & 2 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Rx\Thruway;

use React\Socket\ConnectorInterface;
use Rx\Exception\Exception;
use Rx\Disposable\CompositeDisposable;
use Rx\DisposableInterface;
Expand All @@ -25,7 +26,7 @@ final class Client

private $currentRetryCount = 0;

public function __construct(string $url, string $realm, array $options = [], Subject $webSocket = null, Observable $messages = null, Observable $session = null)
public function __construct(string $url, string $realm, array $options = [], Subject $webSocket = null, Observable $messages = null, Observable $session = null, ConnectorInterface $connector)
{
$this->disposable = new CompositeDisposable();
$this->onClose = new Subject();
Expand All @@ -36,7 +37,7 @@ public function __construct(string $url, string $realm, array $options = [], Sub
$open = new Subject();
$close = new Subject();

$webSocket = $webSocket ?: new WebSocketSubject($url, ['wamp.2.json'], $open, $close);
$webSocket = $webSocket ?: new WebSocketSubject($url, ['wamp.2.json'], $open, $close, $connector);
$messages = $messages ?: $webSocket->retryWhen([$this, '_reconnect'])->singleInstance();

//When the connection opens, send a HelloMessage
Expand Down
5 changes: 3 additions & 2 deletions src/Subject/WebSocketSubject.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Rx\Thruway\Subject;

use function EventLoop\getLoop;
use React\Socket\ConnectorInterface;
use Rx\DisposableInterface;
use Rx\Observable;
use Rx\ObserverInterface;
Expand All @@ -22,14 +23,14 @@ final class WebSocketSubject extends Subject
private $closeObserver;
private $serializer;

public function __construct(string $url, array $protocols = [], Subject $openObserver = null, Subject $closeObserver = null)
public function __construct(string $url, array $protocols = [], Subject $openObserver = null, Subject $closeObserver = null, ConnectorInterface $connector)
{
$this->openObserver = $openObserver ?? new Subject();
$this->closeObserver = $closeObserver ?? new Subject();
$this->serializer = new JsonSerializer();
$this->sendSubject = new ReplaySubject();

$this->ws = new Client($url, false, $protocols, getLoop());
$this->ws = new Client($url, false, $protocols, getLoop(), $connector);
}

public function onNext($value)
Expand Down

0 comments on commit 663b1ce

Please sign in to comment.