Skip to content

Commit

Permalink
Read event forward was not completing and giving events in different …
Browse files Browse the repository at this point in the history
…order

fix #16
Vinceveve committed Jul 24, 2018

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 5d9f46c commit fdadd42
Showing 7 changed files with 207 additions and 95 deletions.
2 changes: 2 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
}
],
"require": {
"php":"7.0.0",
"ramsey/uuid": "^3.5",
"TrafficCophp/ByteBuffer": "^0.3",
"google/protobuf": "^3.2",
@@ -41,6 +42,7 @@
"autoload": {
"psr-4": {
"Rxnet\\EventStore\\": "src/Rxnet/EventStore",
"Rxnet\\Operator\\": "src/Rxnet/Operator",
"GPBMetadata\\": "src/GPBMetadata"
}
}
36 changes: 25 additions & 11 deletions examples/readAll.php
Original file line number Diff line number Diff line change
@@ -2,19 +2,33 @@

use EventLoop\EventLoop;

require __DIR__.'/../vendor/autoload.php';

require __DIR__ . '/../vendor/autoload.php';
ini_set('memory_limit', '500M');

$eventStore = new \Rxnet\EventStore\EventStore();
$eventStore->connect()
->subscribe(function() use ($eventStore) {
echo "connected \n";
$eventStore->readEventsForward('domain-test.fr')
->subscribe(function (\Rxnet\EventStore\EventRecord $record) {
echo "received {$record->getId()} {$record->getNumber()}@{$record->getStreamId()} {$record->getType()} created at {$record->getCreated()->format('c')}\n";
});
}, function (\Exception $e) {
echo $e->getMessage();
});
->subscribe(
function () use ($eventStore) {
echo "connected \n";
$eventStore->readEventsForward('domain-test.fr')
->subscribe(
function (\Rxnet\EventStore\EventRecord $record) {
echo "received {$record->getId()} {$record->getNumber()}@{$record->getStreamId()} {$record->getType()} created at {$record->getCreated()->format('c')}\n";
},
function ( $e) {
echo $e->getMessage();
},
function () {
EventLoop::getLoop()->stop();
}
);
},
function ($e) {
echo $e->getMessage();
},
function () {
EventLoop::getLoop()->stop();
}
);

EventLoop::getLoop()->run();
2 changes: 1 addition & 1 deletion examples/write.php
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@
$eventStore->connect()
->subscribe(function () use ($eventStore) {
echo "connected \n";
\Rx\Observable::interval(100)
\Rx\Observable::interval(10)
->flatMap(
function ($i) use ($eventStore) {
echo 'write : ';
142 changes: 70 additions & 72 deletions src/Rxnet/EventStore/EventStore.php
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
use Rx\ObserverInterface;
use Rx\Subject\ReplaySubject;
use Rx\Subject\Subject;
use Rxnet\Operator\OnBackPressureBuffer;
use Rxnet\Socket;
use Rxnet\EventStore\Data\ConnectToPersistentSubscription;
use Rxnet\EventStore\Data\NewEvent;
@@ -42,6 +43,7 @@
use Rxnet\EventStore\Message\SocketMessage;
use Rxnet\EventStore\NewEvent\NewEventInterface;


class EventStore
{
const POSITION_START = 0;
@@ -534,82 +536,78 @@ public function readEventsBackward($streamId, $fromEvent = self::POSITION_END, $
*/
protected function readEvents(Message $query, $messageType)
{
// TODO backpressure, wait for event's array to be readed completely before asking for more in stream
// OnDemand ? onBackpressureBuffer ?
return Observable::create(function (ObserverInterface $observer) use ($messageType, $query) {
$end = false;
$stop = false;
$maxPossible = 10; //4096
$max = ($query instanceof ReadStreamEvents) ? $query->getMaxCount() : self::POSITION_LATEST;

$asked = $max;
if ($max >= $maxPossible) {
$max = $maxPossible;
$query->setMaxCount($max);
}
$correlationID = $this->writer->createUUIDIfNeeded();
$maxPossible = 100;
$max = ($query instanceof ReadStreamEvents) ? $query->getMaxCount() : self::POSITION_LATEST;

$this->writer->composeAndWrite($messageType, $query, $correlationID)
// When written wait for all responses
->merge($this->readBuffer->waitFor($correlationID, -1))
// Throw if we have an error message
->flatMap(function (ReadStreamEventsCompleted $event) {
if ($error = $event->getError()) {
return Observable::error(new \Exception($error));
}
return Observable::of($event);
})
// If more data is needed do another query
->do(function (ReadStreamEventsCompleted $event) use ($query, $correlationID, &$asked, &$end, $max) {
$asked = $max;
if ($max >= $maxPossible) {
$max = $maxPossible;
$query->setMaxCount($max);
}
$backPressure = new OnBackPressureBuffer();
$correlationID = $this->writer->createUUIDIfNeeded();
// to master the output and transform it
$readUntilEnd = new Subject();

// all the events with my correlation id will be here
$inputObs = $this->readBuffer->waitFor($correlationID, -1)
->flatMap(function (ReadStreamEventsCompleted $event) {
if ($error = $event->getError()) {
return Observable::error(new \Exception($error));
}
return Observable::of($event);
})
->share();

// First slot of data
$this->writer->composeAndWrite($messageType, $query, $correlationID)
// When written wait for all responses
->merge($inputObs)
->subscribe($readUntilEnd);

// Detect the end and ask for more until its done
$inputObs->subscribe(function (ReadStreamEventsCompleted $event) use ($readUntilEnd, $maxPossible, $query, &$asked, &$max, $correlationID, $messageType) {
$records = $event->getEvents();
$asked -= count($records);

if (!$event->getIsEndOfStream() AND !($asked <= 0 && $max != self::POSITION_LATEST)) {
$records = $event->getEvents();
$asked -= count($records);
if ($event->getIsEndOfStream()) {
$end = true;
} elseif ($asked <= 0 && $max != self::POSITION_LATEST) {
$end = true;
}
})
// Format EventRecord for easy reading
->flatMap(function (ReadStreamEventsCompleted $event) use (&$asked, &$end, &$stop, $max, $maxPossible, $messageType, $query, $correlationID) {
/* @var ReadStreamEventsCompleted $event */
$records = [];
/* @var \Rxnet\EventStore\EventRecord[] $records */
$events = $event->getEvents();
foreach ($events as $item) {
/* @var \Rxnet\EventStore\Data\ResolvedIndexedEvent $item */
$records[] = new EventRecord($item->getEvent());
$start = $records[count($records) - 1];
/* @var ResolvedIndexedEvent $start */

if (null === $start->getLink()) {
$start = ($messageType == MessageType::READ_STREAM_EVENTS_FORWARD) ? $start->getEvent()->getEventNumber() + 1 : $start->getEvent()->getEventNumber() - 1;
} else {
$start = ($messageType == MessageType::READ_STREAM_EVENTS_FORWARD) ? $start->getLink()->getEventNumber() + 1 : $start->getLink()->getEventNumber() - 1;
}
// Will emit onNext for each event
return Observable::fromArray($records)
->doOnCompleted(function () use (&$end, &$stop, &$events, &$asked, $max, $maxPossible, $messageType, $query, $correlationID) {
if ($end) $stop = true;
else {
$start = $events[count($events) - 1];
/* @var ResolvedIndexedEvent $start */

if (null === $start->getLink()) {
$start = ($messageType == MessageType::READ_STREAM_EVENTS_FORWARD) ? $start->getEvent()->getEventNumber() + 1 : $start->getEvent()->getEventNumber() - 1;
} else {
$start = ($messageType == MessageType::READ_STREAM_EVENTS_FORWARD) ? $start->getLink()->getEventNumber() + 1 : $start->getLink()->getEventNumber() - 1;
}

$query->setFromEventNumber($start);
$query->setMaxCount($asked > $maxPossible ? $maxPossible : $asked);
$query->setFromEventNumber($start);
$query->setMaxCount($asked > $maxPossible ? $maxPossible : $asked);
$this->writer->composeAndWrite($messageType, $query, $correlationID);
}
else {
$readUntilEnd->onNext($event);
$readUntilEnd->onCompleted();
}
});

//echo "Not end of stream need slice from position {$start} next is {$event->getNextEventNumber()} \n";
$this->writer->composeAndWrite(
$messageType,
$query,
$correlationID
);
}
});
})
// Continue to watch until we have all our results (or end)
->takeWhile(function () use (&$stop) {
return !$stop;
})
->subscribe($observer);
});
// Give back our subject one event per row
return $readUntilEnd
->asObservable()
->lift($backPressure->operator())
// Format EventRecord for easy reading
->flatMap(function (ReadStreamEventsCompleted $event) use ($backPressure, &$asked, &$end) {
/* @var ReadStreamEventsCompleted $event */
$records = [];
/* @var \Rxnet\EventStore\EventRecord[] $records */
$events = $event->getEvents();
foreach ($events as $item) {
/* @var \Rxnet\EventStore\Data\ResolvedIndexedEvent $item */
$records[] = new EventRecord($item->getEvent());
}
// Will emit onNext for each event
return Observable::fromArray($records)
->doOnCompleted([$backPressure, 'request']);
});
}
}
9 changes: 0 additions & 9 deletions src/Rxnet/EventStore/ReadBuffer.php
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
<?php
/**
* Created by PhpStorm.
* User: vincent
* Date: 26/04/2017
* Time: 15:56
*/

namespace Rxnet\EventStore;


use Rx\Subject\Subject;
use Rxnet\EventStore\Communication\CommunicationFactory;
use Rxnet\EventStore\Message\MessageConfiguration;
use Rxnet\EventStore\Message\MessageType;
use Rxnet\EventStore\Message\SocketMessage;
use Rxnet\Stream\StreamEvent;
use TrafficCophp\ByteBuffer\Buffer;

class ReadBuffer extends Subject
2 changes: 0 additions & 2 deletions src/Rxnet/EventStore/Writer.php
Original file line number Diff line number Diff line change
@@ -2,11 +2,9 @@

namespace Rxnet\EventStore;

use EventLoop\EventLoop;
use Google\Protobuf\Internal\Message;
use Ramsey\Uuid\Uuid;
use Rx\Observable;
use Rx\ObserverInterface;
use Rxnet\EventStore\Message\Credentials;
use Rxnet\EventStore\Message\MessageConfiguration;
use Rxnet\EventStore\Message\MessageType;
109 changes: 109 additions & 0 deletions src/Rxnet/Operator/OnbackpressureBuffer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
<?php
namespace Rxnet\Operator;
use Rx\DisposableInterface;
use Rx\ObservableInterface;
use Rx\Observer\CallbackObserver;
use Rx\ObserverInterface;
use Rx\Operator\OperatorInterface;
use Rx\SchedulerInterface;
use Rx\Subject\Subject;
/**
* Class OnBackPressureBuffer
* Store event stream until the consumer request next
*
* @package Rxnet\Operator
*/
class OnBackPressureBuffer implements OperatorInterface
{
const OVERFLOW_STRATEGY_DROP_OLDEST = "drop_oldest";
const OVERFLOW_STRATEGY_DROP_LATEST = "drop_latest";
const OVERFLOW_STRATEGY_ERROR = "error";
/**
* @var Subject
*/
protected $subject;
protected $queue;
protected $capacity = -1;
protected $overflowStrategy;
protected $onOverflow;
protected $pending = false;
protected $sourceCompleted = false;
public function __construct($capacity = -1, callable $onOverflow = null, $overflowStrategy = self::OVERFLOW_STRATEGY_ERROR)
{
$this->capacity = $capacity;
$this->onOverflow = $onOverflow;
$this->overflowStrategy = $overflowStrategy;
$this->subject = new Subject();
$this->queue = new \SplQueue();
}
/**
* @param \Rx\ObservableInterface $observable
* @param \Rx\ObserverInterface $observer
* @return \Rx\DisposableInterface
*/
public function __invoke(ObservableInterface $observable, ObserverInterface $observer) : DisposableInterface
{
// Send back the subject, that will buffer
$this->subject->subscribe($observer);
// Wait for data on stream
return $observable->subscribe(
new CallbackObserver(
function ($next) {
// Live stream no queue necessary
if (!$this->pending) {
// Wait for next request
$this->pending = true;
$this->subject->onNext($next);
return;
}
if ($this->capacity != -1 && $this->queue->count() >= $this->capacity -1) {
if($this->onOverflow) {
$closure = $this->onOverflow;
$closure($next);
}
switch ($this->overflowStrategy) {
case self::OVERFLOW_STRATEGY_DROP_LATEST:
return;
case self::OVERFLOW_STRATEGY_ERROR:
$this->subject->onError(new \OutOfBoundsException("Buffer is full with {$this->capacity} elements inside"));
break;
case self::OVERFLOW_STRATEGY_DROP_OLDEST:
$this->queue->pop();
break;
}
}
// Add to queue
$this->queue->push($next);
},
[$this->subject, 'onError'],
function() {
if(!$this->pending) {
$this->subject->onCompleted();
}
else {
$this->sourceCompleted = true;
}
}
)
);
}
public function operator()
{
return function () {
return $this;
};
}
public function request()
{
// Queue is finished we can return to live stream
if ($this->queue->isEmpty()) {
$this->pending = false;
if($this->sourceCompleted) {
$this->subject->onCompleted();
}
return;
}
// Take element in order they have been inserted
$this->subject->onNext($this->queue->shift());
}
}

0 comments on commit fdadd42

Please sign in to comment.