Skip to content

Commit

Permalink
Meaningful HTTP Producer responses
Browse files Browse the repository at this point in the history
- Let the HTTP Producer return with 500 Internal Server error if not (all) jobs could be queued
- Allow option to override response code and message for user exceptions
  • Loading branch information
youwe-petervanderwal committed Oct 29, 2020
1 parent 522bde2 commit b9474e5
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 38 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"require": {
"php": "~7.2.0|~7.3.0|~7.4.0",
"ext-pcntl": "*",
"ext-json": "*",
"symfony/dependency-injection": "^3.3",
"symfony/config": "^3.3",
"symfony/yaml": "^3.3",
Expand Down
53 changes: 53 additions & 0 deletions src/Exception/HttpResponseException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?php

declare(strict_types=1);

namespace Webgriffe\Esb\Exception;

class HttpResponseException extends \Exception
{
/**
* @var int
*/
private $httpResponseCode;

/**
* @var string
*/
private $clientMessage;

/**
* @param int $httpResponseCode The HTTP response code to use
* @param string $clientMessage The message to send to the client
* @param string $internalMessage The message to use internally (e.g. store in error logs), when empty the client message is used
* @param int $code The Exception code.
* @param \Throwable|null $previous The previous throwable used for the exception chaining.
*/
public function __construct(
int $httpResponseCode,
string $clientMessage,
string $internalMessage = "",
int $code = 0,
?\Throwable $previous = null
) {
$this->httpResponseCode = $httpResponseCode;
$this->clientMessage = $clientMessage;
parent::__construct($internalMessage ?: $clientMessage, $code, $previous);
}

/**
* @return int
*/
public function getHttpResponseCode(): int
{
return $this->httpResponseCode;
}

/**
* @return string
*/
public function getClientMessage(): string
{
return $this->clientMessage;
}
}
22 changes: 18 additions & 4 deletions src/ProducerInstance.php
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ function ($watcherId) {
public function produceAndQueueJobs($data = null): Promise
{
return call(function () use ($data) {
$jobsCount = 0;
$flushedJobsCount = 0;
$queuedJobsCount = 0;
$caughtException = null;
$job = null;
$test = false;
try {
Expand All @@ -173,7 +175,8 @@ public function produceAndQueueJobs($data = null): Promise
/** @var Job $job */
$job = $jobs->getCurrent();
$job->addEvent(new ProducedJobEvent(new \DateTime(), \get_class($this->producer)));
$jobsCount += yield $this->queueManager->enqueue($job);
$flushedJobsCount += yield $this->queueManager->enqueue($job);
$queuedJobsCount++;
$this->logger->info(
'Successfully produced a new Job',
[
Expand All @@ -184,8 +187,9 @@ public function produceAndQueueJobs($data = null): Promise
);
}

$jobsCount += yield $this->queueManager->flush();
$flushedJobsCount += yield $this->queueManager->flush();
} catch (\Throwable $error) {
$caughtException = $error;
$this->logger->error(
'An error occurred producing/queueing jobs.',
[
Expand All @@ -195,8 +199,18 @@ public function produceAndQueueJobs($data = null): Promise
'test' => $test
]
);

// At least try to flush any previously successfully queued jobs. Don't let an error in parsing job 3
// details also fail jobs 1 and 2.
if ($queuedJobsCount > $flushedJobsCount) {
try {
$flushedJobsCount += yield $this->queueManager->flush();
} catch (\Throwable $nestedError) {
// Ignore any further (duplicated) errors
}
}
}
return $jobsCount;
return new ProducerResult($flushedJobsCount, $caughtException);
});
}

Expand Down
44 changes: 44 additions & 0 deletions src/ProducerResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

declare(strict_types=1);

namespace Webgriffe\Esb;

class ProducerResult
{
/**
* @var int
*/
private $jobsCount;

/**
* @var \Throwable|null
*/
private $exception;

/**
* @param int $jobsCount
* @param \Throwable|null $exception
*/
public function __construct(int $jobsCount, ?\Throwable $exception = null)
{
$this->jobsCount = $jobsCount;
$this->exception = $exception;
}

/**
* @return int
*/
public function getJobsCount(): int
{
return $this->jobsCount;
}

/**
* @return \Throwable|null
*/
public function getException(): ?\Throwable
{
return $this->exception;
}
}
39 changes: 35 additions & 4 deletions src/Service/HttpProducersServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
use Amp\Socket;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Webgriffe\Esb\Exception\HttpResponseException;
use Webgriffe\Esb\HttpRequestProducerInterface;
use Webgriffe\Esb\ProducerInstance;
use Webgriffe\Esb\ProducerResult;

use function Amp\call;

/**
Expand Down Expand Up @@ -59,7 +62,7 @@ public function start(): Promise
Socket\listen("[::]:{$this->port}"),
];

$this->httpServer = new \Amp\Http\Server\Server(
$this->httpServer = new Server(
$sockets,
new CallableRequestHandler($this->callableFromInstanceMethod('requestHandler')),
new NullLogger()
Expand Down Expand Up @@ -105,9 +108,37 @@ private function requestHandler(Request $request)
'request' => sprintf('%s %s', strtoupper($request->getMethod()), $request->getUri())
]
);
$jobsCount = yield $producerInstance->produceAndQueueJobs($request);
$responseMessage = sprintf('Successfully scheduled %s job(s) to be queued.', $jobsCount);
return new Response(Status::OK, [], sprintf('"%s"', $responseMessage));
$producerResult = yield $producerInstance->produceAndQueueJobs($request);
return $this->buildResponse($producerResult);
}

/**
* @param ProducerResult $producerResult
* @return Response
*/
private function buildResponse(ProducerResult $producerResult): Response
{
$producerException = $producerResult->getException();
if ($producerException === null) {
$responseCode = Status::OK;
$responseMessage = sprintf('Successfully scheduled %d job(s) to be queued.', $producerResult->getJobsCount());
} else {
$responseCode = Status::INTERNAL_SERVER_ERROR;
$errorMessage = 'Internal server error';

if ($producerException instanceof HttpResponseException) {
$responseCode = $producerException->getHttpResponseCode();
$errorMessage = $producerException->getClientMessage();
}

if ($producerResult->getJobsCount() === 0) {
$responseMessage = sprintf('%s, could not schedule any jobs.', $errorMessage);
} else {
$responseMessage = sprintf('%s, only scheduled the first %d job(s) to be queued.', $errorMessage, $producerResult->getJobsCount());
}
}

return new Response($responseCode, [], sprintf('"%s"', $responseMessage));
}

/**
Expand Down
14 changes: 13 additions & 1 deletion tests/DummyHttpRequestProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
namespace Webgriffe\Esb;

use Amp\Http\Server\Request;
use Amp\Http\Status;
use Amp\Iterator;
use Amp\Producer;
use Amp\Promise;
use Amp\Success;
use Webgriffe\Esb\Exception\HttpResponseException;
use Webgriffe\Esb\Model\Job;

final class DummyHttpRequestProducer implements HttpRequestProducerInterface
Expand Down Expand Up @@ -45,9 +47,19 @@ public function produce($data = null): Iterator
);
}
$body = json_decode(yield $data->getBody()->read(), true);
if (!is_array($body)) {
throw new HttpResponseException(Status::BAD_REQUEST, 'Request body contains invalid JSON');
}
$jobsData = $body['jobs'];
foreach ($jobsData as $jobData) {
yield $emit(new Job([$jobData]));
switch ($jobData) {
case 'throw http response exception':
throw new HttpResponseException(Status::PRECONDITION_FAILED, 'Some other custom message');
case 'throw other exception':
throw new \Exception('This message shouldn\'t be send to the client');
default:
yield $emit(new Job([$jobData]));
}
}
});
}
Expand Down
Loading

0 comments on commit b9474e5

Please sign in to comment.