Skip to content

Commit

Permalink
Merge pull request #12 from voryx/http-version
Browse files Browse the repository at this point in the history
Update to newer http and http-client
  • Loading branch information
mbonneau authored Dec 14, 2017
2 parents f48d767 + 4cd5a74 commit 4435389
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 214 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ php:
- 5.6
- 7
- 7.1
- 7.2

before_script: composer install

Expand Down
15 changes: 10 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,22 @@
}
],
"autoload": {
"psr-0": {
"WampPost": "src"
"psr-4": {
"WampPost\\": "src/WampPost"
}
},
"autoload-dev": {
"psr-4": {
"WampPost\\Tests\\": "tests"
}
},
"require": {
"voryx/thruway": "0.5.x-dev",
"react/http": "^0.6.0"
"voryx/thruway": "^0.5.0",
"react/http": "^0.8.0"
},
"require-dev": {
"voryx/event-loop": "^0.2.0",
"react/http-client": "^0.4.17",
"react/http-client": "^0.5.0",
"phpunit/phpunit": "5.7"
}
}
59 changes: 0 additions & 59 deletions src/WampPost/BodySnatcher.php

This file was deleted.

216 changes: 98 additions & 118 deletions src/WampPost/WampPost.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,36 @@

namespace WampPost;

use Psr\Http\Message\ServerRequestInterface;
use React\EventLoop\Factory;
use React\Http\Request;
use React\EventLoop\LoopInterface;
use React\Http\Response;
use React\Http\Server as HttpServer;
use React\Promise\Deferred;
use React\Socket\Server;
use Thruway\CallResult;
use Thruway\Common\Utils;
use Thruway\Message\ErrorMessage;
use Thruway\Peer\Client;

class WampPost extends Client {
private $bindAddress;
private $realmName;

/** @var Server */
class WampPost extends Client
{
private $socket;
private $http;

function __construct($realmName, $loop = null, $bindAddress = 'tcp://127.0.0.1:8181')
public function __construct($realmName, LoopInterface $loop = null, $bindAddress = 'tcp://127.0.0.1:8181')
{
if ($loop === null) {
$loop = Factory::create();
}
$loop = $loop ?: Factory::create();
$http = new HttpServer([$this, 'handleRequest']);

$this->bindAddress = $bindAddress;
$this->realmName = $realmName;
$this->socket = new Server($this->bindAddress, $loop);
$this->http = new \React\Http\Server($this->socket);
$this->socket = new Server($bindAddress, $loop);

$this->http->on('request', [$this, 'handleRequest']);
$http->listen($this->socket);

parent::__construct($realmName, $loop);
}

public function start($startLoop = true) {

parent::start($startLoop);
}

public function onSessionStart($session, $transport) {

public function onSessionStart($session, $transport)
{
}

/**
Expand All @@ -54,110 +44,100 @@ public function onClose($reason)
parent::onClose($reason);
}

/**
* @param Request $request
* @param Response $response
*/
public function handleRequest(Request $request, Response $response) {
if ($request->getPath() == '/pub' && $request->getMethod() == 'POST') {
$this->handlePublishHttpPost($request, $response);
} else if ($request->getPath() == '/call' && $request->getMethod() == 'POST') {
$this->handleCallHttpRequest($request, $response);
} else {
$response->writeHead(404, ['Content-Type' => 'text/plain', 'Connection' => 'close']);
$response->end("Not found");
public function handleRequest(ServerRequestInterface $request, callable $next)
{
if ($request->getMethod() === 'POST' && $request->getUri()->getPath() === '/pub') {
return $this->handlePublishHttpPost($request, $next);
}
}

private function handlePublishHttpPost(Request $request, Response $response) {
$bodySnatcher = new BodySnatcher($request);
$bodySnatcher->promise()->then(function ($body) use ($request, $response) {
try {
//{"topic": "com.myapp.topic1", "args": ["Hello, world"]}
$json = json_decode($body);
if ($request->getMethod() === 'POST' && $request->getUri()->getPath() === '/call') {
return $this->handleCallHttpRequest($request, $next);
}

if ($json === null) {
throw new \Exception("JSON decoding failed: " . json_last_error_msg());
}
return new Response(404, [], 'Not found');
}

if (isset($json->topic)
&& is_scalar($json->topic)
&& isset($json->args)
&& is_array($json->args)
&& ($this->getPublisher() !== null)
) {
$json->topic = strtolower($json->topic);
if (!Utils::uriIsValid($json->topic)) {
throw new \Exception("Invalid URI: " . $json->topic);
}
private function handlePublishHttpPost(ServerRequestInterface $request, callable $next)
{
try {
//{"topic": "com.myapp.topic1", "args": ["Hello, world"]}
$json = json_decode($request->getBody());

$argsKw = isset($json->argsKw) && is_object($json->argsKw) ? $json->argsKw : null;
$options = isset($json->options) && is_object($json->options) ? $json->options : null;
$this->getSession()->publish($json->topic, $json->args, $argsKw, $options);
} else {
throw new \Exception("Invalid request: " . json_encode($json));
if ($json === null) {
throw new \Exception('JSON decoding failed: ' . json_last_error_msg());
}

if (isset($json->topic)
&& is_scalar($json->topic)
&& isset($json->args)
&& is_array($json->args)
&& ($this->getPublisher() !== null)
) {
$json->topic = strtolower($json->topic);
if (!Utils::uriIsValid($json->topic)) {
throw new \Exception('Invalid URI: ' . $json->topic);
}
} catch (\Exception $e) {
// should shut down everything
$response->writeHead(400, ['Content-Type' => 'text/plain', 'Connection' => 'close']);
$response->end("Bad Request: " . $e->getMessage());
return;

$argsKw = isset($json->argsKw) && is_object($json->argsKw) ? $json->argsKw : null;
$options = isset($json->options) && is_object($json->options) ? $json->options : null;
$this->getSession()->publish($json->topic, $json->args, $argsKw, $options);
} else {
throw new \Exception('Invalid request: ' . json_encode($json));
}
$response->writeHead(200, ['Content-Type' => 'text/plain', 'Connection' => 'close']);
$response->end("pub");
});
} catch (\Exception $e) {
return new Response(400, [], 'Bad Request: ' . $e->getMessage());
}

return new Response(200, [], 'pub');
}

private function handleCallHttpRequest($request, $response) {
$bodySnatcher = new BodySnatcher($request);
$bodySnatcher->promise()->then(function ($body) use ($request, $response) {
try {
//{"procedure": "com.myapp.procedure1", "args": ["Hello, world"], "argsKw": {}, "options": {} }
$json = json_decode($body);

if (isset($json->procedure)
&& Utils::uriIsValid($json->procedure)
&& ($this->getCaller() !== null)
) {
$args = isset($json->args) && is_array($json->args) ? $json->args : null;
$argsKw = isset($json->argsKw) && is_object($json->argsKw) ? $json->argsKw : null;
$options = isset($json->options) && is_object($json->options) ? $json->options : null;

$this->getSession()->call($json->procedure, $args, $argsKw, $options)->then(
/** @param CallResult $result */
function (CallResult $result) use ($response) {
$responseObj = new \stdClass();
$responseObj->result = "SUCCESS";
$responseObj->args = $result->getArguments();
$responseObj->argsKw = $result->getArgumentsKw();
$responseObj->details = $result->getDetails();

$response->writeHead(200, ['Content-Type' => 'application/json', 'Connection' => 'close']);
$response->end(json_encode($responseObj));
},
function (ErrorMessage $msg) use ($response) {
$responseObj = new \stdClass();
$responseObj->result = "ERROR";
$responseObj->error_uri = $msg->getErrorURI();
$responseObj->error_args = $msg->getArguments();
$responseObj->error_argskw = $msg->getArgumentsKw();
$responseObj->error_details = $msg->getDetails();

// maybe return an error code here
$response->writeHead(200, ['Content-Type' => 'application/json', 'Connection' => 'close']);
$response->end(json_encode($responseObj));
}
);
} else {
// maybe return an error code here
$response->writeHead(200, ['Content-Type' => 'text/plain', 'Connection' => 'close']);
$response->end("No procedure set");
}
} catch (\Exception $e) {
private function handleCallHttpRequest(ServerRequestInterface $request, callable $next)
{
$deferred = new Deferred();
try {
//{"procedure": "com.myapp.procedure1", "args": ["Hello, world"], "argsKw": {}, "options": {} }
$json = json_decode($request->getBody());

if (isset($json->procedure)
&& Utils::uriIsValid($json->procedure)
&& ($this->getCaller() !== null)
) {
$args = isset($json->args) && is_array($json->args) ? $json->args : null;
$argsKw = isset($json->argsKw) && is_object($json->argsKw) ? $json->argsKw : null;
$options = isset($json->options) && is_object($json->options) ? $json->options : null;

$this->getSession()->call($json->procedure, $args, $argsKw, $options)->then(
/** @param CallResult $result */
function (CallResult $result) use ($deferred) {
$responseObj = new \stdClass();
$responseObj->result = 'SUCCESS';
$responseObj->args = $result->getArguments();
$responseObj->argsKw = $result->getArgumentsKw();
$responseObj->details = $result->getDetails();

$deferred->resolve(new Response(200, ['Content-Type' => 'application/json'], json_encode($responseObj)));
},
function (ErrorMessage $msg) use ($deferred) {
$responseObj = new \stdClass();
$responseObj->result = 'ERROR';
$responseObj->error_uri = $msg->getErrorURI();
$responseObj->error_args = $msg->getArguments();
$responseObj->error_argskw = $msg->getArgumentsKw();
$responseObj->error_details = $msg->getDetails();

// maybe return an error code here
$deferred->resolve(new Response(200, ['Content-Type' => 'application/json'], json_encode($responseObj)));
}
);
} else {
// maybe return an error code here
$response->writeHead(200, ['Content-Type' => 'text/plain', 'Connection' => 'close']);
$response->end("Problem");
$deferred->resolve(new Response(200, [], 'No procedure set'));
}
});
} catch (\Exception $e) {
// maybe return an error code here
$deferred->resolve(new Response(200, [], 'Problem'));
}

return $deferred->promise();
}
}
}
2 changes: 1 addition & 1 deletion src/WampPost/startWampPost.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

require_once __DIR__ . "/../../vendor/autoload.php";

$wp = new \WampPost\WampPost('realm1', null, '127.0.0.1', 8181);
$wp = new \WampPost\WampPost('realm1', null, '127.0.0.1:8181');

$wp->addTransportProvider(new \Thruway\Transport\PawlTransportProvider('ws://127.0.0.1:9090/'));

Expand Down
Loading

0 comments on commit 4435389

Please sign in to comment.