Skip to content

Commit

Permalink
Fix http json event double encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
Vinceveve committed Aug 16, 2017
1 parent 761da2c commit ac1aa73
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 15 deletions.
4 changes: 2 additions & 2 deletions src/Rxnet/EventStore/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public function __construct(LoopInterface $loop = null, Dns $dns = null, Tcp $tc
* @param int $heartBeatRate in milliseconds
* @return Observable\AnonymousObservable
*/
public function connect($dsn = 'tcp://admin:changeit@localhost:1113', $connectTimeout = 1000, $heartBeatRate = 5000)
public function connect($dsn = 'tcp://admin:changeit@127.0.0.1:1113', $connectTimeout = 1000, $heartBeatRate = 5000)
{
// connector compatibility
$connectTimeout = ($connectTimeout > 0) ? $connectTimeout / 1000 : 0;
Expand Down Expand Up @@ -636,4 +636,4 @@ protected function readEvents(Message $query, $messageType)
});

}
}
}
1 change: 0 additions & 1 deletion src/Rxnet/EventStore/HttpEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public function write($streamId, $events, $expectedVersion = null)
$headers['ES-ExpectedVersion'] = $expectedVersion;
}


$body = json_encode($json);
$headers['Length'] = strlen($body);
$options = compact('headers', 'body');
Expand Down
9 changes: 9 additions & 0 deletions src/Rxnet/EventStore/NewEvent/JsonEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,13 @@ public function getMessage()
{
return $this->message;
}
public function toArray()
{
return [
'eventId' => $this->getId(),
'eventType' => $this->getType(),
'data' => json_decode($this->getData(), true),
'metadata' => json_decode($this->getMetaData(), true)
];
}
}
15 changes: 9 additions & 6 deletions usage/http-write.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@

$eventStore = new \Rxnet\EventStore\HttpEventStore();

\Rx\Observable::interval(10)
\Rx\Observable::interval(1000)
->flatMap(
function ($i) use ($eventStore) {
$uid = \Ramsey\Uuid\Uuid::uuid3(\Ramsey\Uuid\Uuid::NAMESPACE_DNS, 'test.fr');
$event = new JsonEvent('/truc/chose', ['i' => $i], 'test.fr');
$event = new JsonEvent('/truc/chose', [
"crypto" => "btc",
"user_id" => "21433R4G523TO",
"wallet_id" => "PIH21B4T93VB5T9G7V"
]);

return $eventStore->write('domain-test3.fr', $event);
return $eventStore->write('crypto-exchange-rate', $event);
}
)
->subscribe(
Expand All @@ -24,11 +27,11 @@ function ($eventsCompleted) {

gc_collect_cycles();
$memory = memory_get_usage(true) / 1024 / 1024;
echo $eventsCompleted." {$memory}Mb \n";
echo $eventsCompleted . " {$memory}Mb \n";
//echo "Last event number {$eventsCompleted->getLastEventNumber()} on commit position {$eventsCompleted->getCommitPosition()} {$memory}Mb \n";
}
),
new EventLoopScheduler(EventLoop::getLoop())
);

EventLoop::getLoop()->run();
EventLoop::getLoop()->run();
7 changes: 4 additions & 3 deletions usage/persistent.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
$eventStore = new \Rxnet\EventStore\EventStore();
\Rxnet\await($eventStore->connect());

$eventStore->persistentSubscription('dropcatch', 'test')
$eventStore->persistentSubscription('crypto', 'test')
->flatMap(function (AcknowledgeableEventRecord $record) {
echo "received {$record->getId()} {$record->getNumber()}@{$record->getStreamId()} {$record->getType()}\n";
//return $record->ack();
return $record->nack($record::NACK_ACTION_PARK, 'oops');
var_dump($record->getData());
return $record->ack();
//return $record->nack($record::NACK_ACTION_PARK, 'oops');
})
->subscribeCallback(null, function (\Exception $e) {
echo $e->getMessage();
Expand Down
8 changes: 5 additions & 3 deletions usage/write.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
->flatMap(
function ($i) use ($eventStore) {
echo '.';
$event = new JsonEvent('/truc/chose', ['i' => $i]);
return $eventStore->write('domain-test-1.fr', [$event]);
$event = new JsonEvent('/truc/chose', [ "crypto"=> "btc",
"user_id"=> "21433R4G523TO",
"wallet_id"=> "PIH21B4T93VB5T9G7V"]);
return $eventStore->write('test-test-1.fr', [$event]);
}
)
->subscribe(
Expand All @@ -34,4 +36,4 @@ function (WriteEventsCompleted $eventsCompleted) {
new EventLoopScheduler(EventLoop::getLoop())
);

EventLoop::getLoop()->run();
EventLoop::getLoop()->run();

0 comments on commit ac1aa73

Please sign in to comment.