From 5d9f46c1f765569ba3b80ff561da4fe3543c3b00 Mon Sep 17 00:00:00 2001 From: Vince ve Date: Thu, 19 Jul 2018 17:30:04 +0200 Subject: [PATCH] Read event forward was not displaying last page fix #16 --- examples/readAll.php | 20 ++++++++++ src/Rxnet/EventStore/EventStore.php | 58 +++++++++++++++-------------- 2 files changed, 51 insertions(+), 27 deletions(-) create mode 100644 examples/readAll.php diff --git a/examples/readAll.php b/examples/readAll.php new file mode 100644 index 0000000..6c216b8 --- /dev/null +++ b/examples/readAll.php @@ -0,0 +1,20 @@ +connect() + ->subscribe(function() use ($eventStore) { + echo "connected \n"; + $eventStore->readEventsForward('domain-test.fr') + ->subscribe(function (\Rxnet\EventStore\EventRecord $record) { + echo "received {$record->getId()} {$record->getNumber()}@{$record->getStreamId()} {$record->getType()} created at {$record->getCreated()->format('c')}\n"; + }); + }, function (\Exception $e) { + echo $e->getMessage(); + }); + +EventLoop::getLoop()->run(); \ No newline at end of file diff --git a/src/Rxnet/EventStore/EventStore.php b/src/Rxnet/EventStore/EventStore.php index 48f0a89..bf3d1bb 100644 --- a/src/Rxnet/EventStore/EventStore.php +++ b/src/Rxnet/EventStore/EventStore.php @@ -538,6 +538,7 @@ protected function readEvents(Message $query, $messageType) // OnDemand ? onBackpressureBuffer ? return Observable::create(function (ObserverInterface $observer) use ($messageType, $query) { $end = false; + $stop = false; $maxPossible = 10; //4096 $max = ($query instanceof ReadStreamEvents) ? $query->getMaxCount() : self::POSITION_LATEST; @@ -559,7 +560,7 @@ protected function readEvents(Message $query, $messageType) return Observable::of($event); }) // If more data is needed do another query - ->doOnNext(function (ReadStreamEventsCompleted $event) use ($query, $correlationID, &$end, &$asked, $max, $maxPossible, $messageType) { + ->do(function (ReadStreamEventsCompleted $event) use ($query, $correlationID, &$asked, &$end, $max) { $records = $event->getEvents(); $asked -= count($records); if ($event->getIsEndOfStream()) { @@ -567,33 +568,9 @@ protected function readEvents(Message $query, $messageType) } elseif ($asked <= 0 && $max != self::POSITION_LATEST) { $end = true; } - if (!$end) { - $start = $records[count($records) - 1]; - /* @var ResolvedIndexedEvent $start */ - - if (null === $start->getLink()) { - $start = ($messageType == MessageType::READ_STREAM_EVENTS_FORWARD) ? $start->getEvent()->getEventNumber() + 1 : $start->getEvent()->getEventNumber() - 1; - } else { - $start = ($messageType == MessageType::READ_STREAM_EVENTS_FORWARD) ? $start->getLink()->getEventNumber() + 1 : $start->getLink()->getEventNumber() - 1; - } - - $query->setFromEventNumber($start); - $query->setMaxCount($asked > $maxPossible ? $maxPossible : $asked); - - //echo "Not end of stream need slice from position {$start} next is {$event->getNextEventNumber()} \n"; - $this->writer->composeAndWrite( - $messageType, - $query, - $correlationID - ); - } - }) - // Continue to watch until we have all our results (or end) - ->takeWhile(function () use (&$end) { - return !$end; }) // Format EventRecord for easy reading - ->flatMap(function (ReadStreamEventsCompleted $event) use (&$asked, &$end) { + ->flatMap(function (ReadStreamEventsCompleted $event) use (&$asked, &$end, &$stop, $max, $maxPossible, $messageType, $query, $correlationID) { /* @var ReadStreamEventsCompleted $event */ $records = []; /* @var \Rxnet\EventStore\EventRecord[] $records */ @@ -603,7 +580,34 @@ protected function readEvents(Message $query, $messageType) $records[] = new EventRecord($item->getEvent()); } // Will emit onNext for each event - return Observable::fromArray($records); + return Observable::fromArray($records) + ->doOnCompleted(function () use (&$end, &$stop, &$events, &$asked, $max, $maxPossible, $messageType, $query, $correlationID) { + if ($end) $stop = true; + else { + $start = $events[count($events) - 1]; + /* @var ResolvedIndexedEvent $start */ + + if (null === $start->getLink()) { + $start = ($messageType == MessageType::READ_STREAM_EVENTS_FORWARD) ? $start->getEvent()->getEventNumber() + 1 : $start->getEvent()->getEventNumber() - 1; + } else { + $start = ($messageType == MessageType::READ_STREAM_EVENTS_FORWARD) ? $start->getLink()->getEventNumber() + 1 : $start->getLink()->getEventNumber() - 1; + } + + $query->setFromEventNumber($start); + $query->setMaxCount($asked > $maxPossible ? $maxPossible : $asked); + + //echo "Not end of stream need slice from position {$start} next is {$event->getNextEventNumber()} \n"; + $this->writer->composeAndWrite( + $messageType, + $query, + $correlationID + ); + } + }); + }) + // Continue to watch until we have all our results (or end) + ->takeWhile(function () use (&$stop) { + return !$stop; }) ->subscribe($observer); });