diff --git a/src/Rxnet/EventStore/EventStore.php b/src/Rxnet/EventStore/EventStore.php index 2d56000..35ec8d7 100644 --- a/src/Rxnet/EventStore/EventStore.php +++ b/src/Rxnet/EventStore/EventStore.php @@ -124,7 +124,7 @@ public function connect($dsn = 'tcp://admin:changeit@localhost:1113', $connectTi } $this->dsn = $parsedDsn; // What you should observe if you want to auto reconnect - $this->connectionSubject = new ReplaySubject(1, 1); + $this->connectionSubject = new Subject(); $this->connector->setTimeout($connectTimeout); return Observable::create(function (ObserverInterface $observer) { $this->dns @@ -133,7 +133,7 @@ public function connect($dsn = 'tcp://admin:changeit@localhost:1113', $connectTi function ($ip) { return $this->connector->connect($ip, $this->dsn['port']); }) - ->flatMap(function (ConnectorEvent $connectorEvent) { + ->subscribe(new CallbackObserver(function (ConnectorEvent $connectorEvent) { // send all data to our read buffer $this->stream = new BufferedStream($connectorEvent->getStream()->getSocket(), $this->loop); $this->readBufferDisposable = $this->stream->subscribe($this->readBuffer); @@ -146,12 +146,10 @@ function ($ip) { // start heartbeat listener $this->heartBeatDisposable = $this->heartbeat(); - // Replay subject will do the magic $this->connectionSubject->onNext(new Event('/eventstore/connected')); - // Forward internal errors to the connect result - return $this->connectionSubject; - }) - ->subscribe($observer, new EventLoopScheduler($this->loop)); + }), new EventLoopScheduler($this->loop)); + + $this->connectionSubject->subscribe($observer); return new CallbackDisposable(function () { if ($this->readBufferDisposable instanceof DisposableInterface) { @@ -257,7 +255,7 @@ public function write($streamId, $events, $expectedVersion = -2, $requireMaster $readDisposable = $this->readBuffer->waitFor($correlationID, 1) ->subscribe($observer); - return new CallbackDisposable(function() use($writeDisposable, $readDisposable) { + return new CallbackDisposable(function () use ($writeDisposable, $readDisposable) { $readDisposable->dispose(); $writeDisposable->dispose(); }); diff --git a/usage/write.php b/usage/write.php index 276a29a..5ff133a 100644 --- a/usage/write.php +++ b/usage/write.php @@ -6,13 +6,19 @@ use Rxnet\EventStore\NewEvent\JsonEvent; require '../vendor/autoload.php'; +//$loop = new \Rxnet\Loop\LibEvLoop(); +//EventLoop::setLoop($loop); +echo 'connecting'; $eventStore = new \Rxnet\EventStore\EventStore(); \Rxnet\await($eventStore->connect()); +echo "connected \n"; + \Rx\Observable::interval(10) ->flatMap( function ($i) use ($eventStore) { + echo '.'; $event = new JsonEvent('/truc/chose', ['i' => $i]); return $eventStore->write('domain-test-1.fr', [$event]); }