Skip to content

Commit

Permalink
Read event forward was not displaying last page
Browse files Browse the repository at this point in the history
fix #16
  • Loading branch information
Vinceveve committed Jul 19, 2018
1 parent 758e811 commit 5d9f46c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 27 deletions.
20 changes: 20 additions & 0 deletions examples/readAll.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

use EventLoop\EventLoop;

require __DIR__.'/../vendor/autoload.php';


$eventStore = new \Rxnet\EventStore\EventStore();
$eventStore->connect()
->subscribe(function() use ($eventStore) {
echo "connected \n";
$eventStore->readEventsForward('domain-test.fr')
->subscribe(function (\Rxnet\EventStore\EventRecord $record) {
echo "received {$record->getId()} {$record->getNumber()}@{$record->getStreamId()} {$record->getType()} created at {$record->getCreated()->format('c')}\n";
});
}, function (\Exception $e) {
echo $e->getMessage();
});

EventLoop::getLoop()->run();
58 changes: 31 additions & 27 deletions src/Rxnet/EventStore/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ protected function readEvents(Message $query, $messageType)
// OnDemand ? onBackpressureBuffer ?
return Observable::create(function (ObserverInterface $observer) use ($messageType, $query) {
$end = false;
$stop = false;
$maxPossible = 10; //4096
$max = ($query instanceof ReadStreamEvents) ? $query->getMaxCount() : self::POSITION_LATEST;

Expand All @@ -559,41 +560,17 @@ protected function readEvents(Message $query, $messageType)
return Observable::of($event);
})
// If more data is needed do another query
->doOnNext(function (ReadStreamEventsCompleted $event) use ($query, $correlationID, &$end, &$asked, $max, $maxPossible, $messageType) {
->do(function (ReadStreamEventsCompleted $event) use ($query, $correlationID, &$asked, &$end, $max) {
$records = $event->getEvents();
$asked -= count($records);
if ($event->getIsEndOfStream()) {
$end = true;
} elseif ($asked <= 0 && $max != self::POSITION_LATEST) {
$end = true;
}
if (!$end) {
$start = $records[count($records) - 1];
/* @var ResolvedIndexedEvent $start */

if (null === $start->getLink()) {
$start = ($messageType == MessageType::READ_STREAM_EVENTS_FORWARD) ? $start->getEvent()->getEventNumber() + 1 : $start->getEvent()->getEventNumber() - 1;
} else {
$start = ($messageType == MessageType::READ_STREAM_EVENTS_FORWARD) ? $start->getLink()->getEventNumber() + 1 : $start->getLink()->getEventNumber() - 1;
}

$query->setFromEventNumber($start);
$query->setMaxCount($asked > $maxPossible ? $maxPossible : $asked);

//echo "Not end of stream need slice from position {$start} next is {$event->getNextEventNumber()} \n";
$this->writer->composeAndWrite(
$messageType,
$query,
$correlationID
);
}
})
// Continue to watch until we have all our results (or end)
->takeWhile(function () use (&$end) {
return !$end;
})
// Format EventRecord for easy reading
->flatMap(function (ReadStreamEventsCompleted $event) use (&$asked, &$end) {
->flatMap(function (ReadStreamEventsCompleted $event) use (&$asked, &$end, &$stop, $max, $maxPossible, $messageType, $query, $correlationID) {
/* @var ReadStreamEventsCompleted $event */
$records = [];
/* @var \Rxnet\EventStore\EventRecord[] $records */
Expand All @@ -603,7 +580,34 @@ protected function readEvents(Message $query, $messageType)
$records[] = new EventRecord($item->getEvent());
}
// Will emit onNext for each event
return Observable::fromArray($records);
return Observable::fromArray($records)
->doOnCompleted(function () use (&$end, &$stop, &$events, &$asked, $max, $maxPossible, $messageType, $query, $correlationID) {
if ($end) $stop = true;
else {
$start = $events[count($events) - 1];
/* @var ResolvedIndexedEvent $start */

if (null === $start->getLink()) {
$start = ($messageType == MessageType::READ_STREAM_EVENTS_FORWARD) ? $start->getEvent()->getEventNumber() + 1 : $start->getEvent()->getEventNumber() - 1;
} else {
$start = ($messageType == MessageType::READ_STREAM_EVENTS_FORWARD) ? $start->getLink()->getEventNumber() + 1 : $start->getLink()->getEventNumber() - 1;
}

$query->setFromEventNumber($start);
$query->setMaxCount($asked > $maxPossible ? $maxPossible : $asked);

//echo "Not end of stream need slice from position {$start} next is {$event->getNextEventNumber()} \n";
$this->writer->composeAndWrite(
$messageType,
$query,
$correlationID
);
}
});
})
// Continue to watch until we have all our results (or end)
->takeWhile(function () use (&$stop) {
return !$stop;
})
->subscribe($observer);
});
Expand Down

0 comments on commit 5d9f46c

Please sign in to comment.