diff --git a/composer.json b/composer.json index f4721fb..689c228 100644 --- a/composer.json +++ b/composer.json @@ -12,6 +12,7 @@ "require": { "php": "~7.2.0|~7.3.0|~7.4.0", "ext-pcntl": "*", + "ext-json": "*", "symfony/dependency-injection": "^3.3", "symfony/config": "^3.3", "symfony/yaml": "^3.3", diff --git a/src/Exception/HttpResponseException.php b/src/Exception/HttpResponseException.php new file mode 100644 index 0000000..8e88a0a --- /dev/null +++ b/src/Exception/HttpResponseException.php @@ -0,0 +1,53 @@ +httpResponseCode = $httpResponseCode; + $this->clientMessage = $clientMessage; + parent::__construct($internalMessage ?: $clientMessage, $code, $previous); + } + + /** + * @return int + */ + public function getHttpResponseCode(): int + { + return $this->httpResponseCode; + } + + /** + * @return string + */ + public function getClientMessage(): string + { + return $this->clientMessage; + } +} diff --git a/src/ProducerInstance.php b/src/ProducerInstance.php index b5a56eb..7f17a55 100644 --- a/src/ProducerInstance.php +++ b/src/ProducerInstance.php @@ -164,7 +164,9 @@ function ($watcherId) { public function produceAndQueueJobs($data = null): Promise { return call(function () use ($data) { - $jobsCount = 0; + $flushedJobsCount = 0; + $queuedJobsCount = 0; + $caughtException = null; $job = null; $test = false; try { @@ -173,7 +175,8 @@ public function produceAndQueueJobs($data = null): Promise /** @var Job $job */ $job = $jobs->getCurrent(); $job->addEvent(new ProducedJobEvent(new \DateTime(), \get_class($this->producer))); - $jobsCount += yield $this->queueManager->enqueue($job); + $flushedJobsCount += yield $this->queueManager->enqueue($job); + $queuedJobsCount++; $this->logger->info( 'Successfully produced a new Job', [ @@ -184,8 +187,9 @@ public function produceAndQueueJobs($data = null): Promise ); } - $jobsCount += yield $this->queueManager->flush(); + $flushedJobsCount += yield $this->queueManager->flush(); } catch (\Throwable $error) { + $caughtException = $error; $this->logger->error( 'An error occurred producing/queueing jobs.', [ @@ -195,8 +199,18 @@ public function produceAndQueueJobs($data = null): Promise 'test' => $test ] ); + + // At least try to flush any previously successfully queued jobs. Don't let an error in parsing job 3 + // details also fail jobs 1 and 2. + if ($queuedJobsCount > $flushedJobsCount) { + try { + $flushedJobsCount += yield $this->queueManager->flush(); + } catch (\Throwable $nestedError) { + // Ignore any further (duplicated) errors + } + } } - return $jobsCount; + return new ProducerResult($flushedJobsCount, $caughtException); }); } diff --git a/src/ProducerResult.php b/src/ProducerResult.php new file mode 100644 index 0000000..42700f3 --- /dev/null +++ b/src/ProducerResult.php @@ -0,0 +1,44 @@ +jobsCount = $jobsCount; + $this->exception = $exception; + } + + /** + * @return int + */ + public function getJobsCount(): int + { + return $this->jobsCount; + } + + /** + * @return \Throwable|null + */ + public function getException(): ?\Throwable + { + return $this->exception; + } +} diff --git a/src/Service/HttpProducersServer.php b/src/Service/HttpProducersServer.php index 07ba4a3..86de02f 100644 --- a/src/Service/HttpProducersServer.php +++ b/src/Service/HttpProducersServer.php @@ -14,8 +14,11 @@ use Amp\Socket; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; +use Webgriffe\Esb\Exception\HttpResponseException; use Webgriffe\Esb\HttpRequestProducerInterface; use Webgriffe\Esb\ProducerInstance; +use Webgriffe\Esb\ProducerResult; + use function Amp\call; /** @@ -59,7 +62,7 @@ public function start(): Promise Socket\listen("[::]:{$this->port}"), ]; - $this->httpServer = new \Amp\Http\Server\Server( + $this->httpServer = new Server( $sockets, new CallableRequestHandler($this->callableFromInstanceMethod('requestHandler')), new NullLogger() @@ -105,9 +108,37 @@ private function requestHandler(Request $request) 'request' => sprintf('%s %s', strtoupper($request->getMethod()), $request->getUri()) ] ); - $jobsCount = yield $producerInstance->produceAndQueueJobs($request); - $responseMessage = sprintf('Successfully scheduled %s job(s) to be queued.', $jobsCount); - return new Response(Status::OK, [], sprintf('"%s"', $responseMessage)); + $producerResult = yield $producerInstance->produceAndQueueJobs($request); + return $this->buildResponse($producerResult); + } + + /** + * @param ProducerResult $producerResult + * @return Response + */ + private function buildResponse(ProducerResult $producerResult): Response + { + $producerException = $producerResult->getException(); + if ($producerException === null) { + $responseCode = Status::OK; + $responseMessage = sprintf('Successfully scheduled %d job(s) to be queued.', $producerResult->getJobsCount()); + } else { + $responseCode = Status::INTERNAL_SERVER_ERROR; + $errorMessage = 'Internal server error'; + + if ($producerException instanceof HttpResponseException) { + $responseCode = $producerException->getHttpResponseCode(); + $errorMessage = $producerException->getClientMessage(); + } + + if ($producerResult->getJobsCount() === 0) { + $responseMessage = sprintf('%s, could not schedule any jobs.', $errorMessage); + } else { + $responseMessage = sprintf('%s, only scheduled the first %d job(s) to be queued.', $errorMessage, $producerResult->getJobsCount()); + } + } + + return new Response($responseCode, [], sprintf('"%s"', $responseMessage)); } /** diff --git a/tests/DummyHttpRequestProducer.php b/tests/DummyHttpRequestProducer.php index bbd6618..197ab2c 100644 --- a/tests/DummyHttpRequestProducer.php +++ b/tests/DummyHttpRequestProducer.php @@ -3,10 +3,12 @@ namespace Webgriffe\Esb; use Amp\Http\Server\Request; +use Amp\Http\Status; use Amp\Iterator; use Amp\Producer; use Amp\Promise; use Amp\Success; +use Webgriffe\Esb\Exception\HttpResponseException; use Webgriffe\Esb\Model\Job; final class DummyHttpRequestProducer implements HttpRequestProducerInterface @@ -45,9 +47,19 @@ public function produce($data = null): Iterator ); } $body = json_decode(yield $data->getBody()->read(), true); + if (!is_array($body)) { + throw new HttpResponseException(Status::BAD_REQUEST, 'Request body contains invalid JSON'); + } $jobsData = $body['jobs']; foreach ($jobsData as $jobData) { - yield $emit(new Job([$jobData])); + switch ($jobData) { + case 'throw http response exception': + throw new HttpResponseException(Status::PRECONDITION_FAILED, 'Some other custom message'); + case 'throw other exception': + throw new \Exception('This message shouldn\'t be send to the client'); + default: + yield $emit(new Job([$jobData])); + } } }); } diff --git a/tests/Integration/HttpRequestProducerAndWorkerTest.php b/tests/Integration/HttpRequestProducerAndWorkerTest.php index a06544d..eb2f53e 100644 --- a/tests/Integration/HttpRequestProducerAndWorkerTest.php +++ b/tests/Integration/HttpRequestProducerAndWorkerTest.php @@ -5,6 +5,7 @@ use Amp\Artax\DefaultClient; use Amp\Artax\Request; use Amp\Artax\Response; +use Amp\Http\Status; use Amp\Loop; use Amp\Promise; use Amp\Socket\ClientSocket; @@ -16,7 +17,6 @@ use Webgriffe\Esb\KernelTestCase; use Webgriffe\Esb\TestUtils; use function Amp\call; -use function Amp\File\exists; use function Amp\Socket\connect; class HttpRequestProducerAndWorkerTest extends KernelTestCase @@ -50,41 +50,88 @@ public function setUp() $this->httpPort = self::$kernel->getContainer()->getParameter('http_server_port'); } - public function testHttpRequestProducerAndWorker() - { - Loop::delay(100, function () { + /** + * @dataProvider dataProviderHttpRequestProducerAndWorker + * @param string $payload + * @param int $expectedResponseCode + * @param string $expectedResponseMessage + * @param string[] $expectedJobs + * @param bool $expectError + * @throws \Exception + */ + public function testHttpRequestProducerAndWorker( + string $payload, + int $expectedResponseCode, + string $expectedResponseMessage, + array $expectedJobs, + bool $expectError + ) { + Loop::delay(100, function () use ($payload, $expectedResponseCode, $expectedResponseMessage) { yield $this->waitForConnectionAvailable("tcp://127.0.0.1:{$this->httpPort}"); - $payload = json_encode(['jobs' => ['job1', 'job2', 'job3']]); $client = new DefaultClient(); $request = (new Request("http://127.0.0.1:{$this->httpPort}/dummy", 'POST'))->withBody($payload); /** @var Response $response */ $response = yield $client->request($request); - $this->assertContains('"Successfully scheduled 3 job(s) to be queued."', yield $response->getBody()); - }); - $this->stopWhen(function () { - return (yield exists($this->workerFile)) && count($this->getFileLines($this->workerFile)) === 3; + $this->assertSame($expectedResponseCode, $response->getStatus()); + $this->assertSame($expectedResponseMessage, yield $response->getBody()); + + Loop::delay(200, function () { + Loop::stop(); + }); }); self::$kernel->boot(); - $workerFileLines = $this->getFileLines($this->workerFile); - $this->assertCount(3, $workerFileLines); - $this->assertContains('job1', $workerFileLines[0]); - $this->assertContains('job2', $workerFileLines[1]); - $this->assertContains('job3', $workerFileLines[2]); - $this->logHandler()->hasRecordThatMatches( - '/Successfully produced a new Job .*? "payload_data":["job1"]/', - Logger::INFO - ); - $this->logHandler()->hasRecordThatMatches( - '/Successfully produced a new Job .*? "payload_data":["job2"]/', - Logger::INFO - ); - $this->logHandler()->hasRecordThatMatches( - '/Successfully produced a new Job .*? "payload_data":["job3"]/', - Logger::INFO - ); - $this->assertReadyJobsCountInTube(0, self::FLOW_CODE); + $this->assertSame($expectError, $this->logHandler()->hasRecordThatContains( + 'An error occurred producing/queueing jobs.', + Logger::ERROR + )); + $this->assertWorkedJobs($expectedJobs); + } + + /** + * Data provider for testHttpRequestProducerAndWorker + * @return array[] + */ + public function dataProviderHttpRequestProducerAndWorker(): array + { + return [ + 'successfully scheduled' => [ + 'payload' => json_encode(['jobs' => ['job1', 'job2', 'job3']]), + 'expectedResponseCode' => Status::OK, + 'expectedResponseMessage' => '"Successfully scheduled 3 job(s) to be queued."', + 'expectedJobs' => ['job1', 'job2', 'job3'], + 'expectError' => false, + ], + 'complete fail' => [ + 'payload' => json_encode('not an array'), + 'expectedResponseCode' => Status::BAD_REQUEST, + 'expectedResponseMessage' => '"Request body contains invalid JSON, could not schedule any jobs."', + 'expectedJobs' => [], + 'expectError' => true, + ], + 'other custom message' => [ + 'payload' => json_encode(['jobs' => ['throw http response exception']]), + 'expectedResponseCode' => Status::PRECONDITION_FAILED, + 'expectedResponseMessage' => '"Some other custom message, could not schedule any jobs."', + 'expectedJobs' => [], + 'expectError' => true, + ], + 'default message' => [ + 'payload' => json_encode(['jobs' => ['throw other exception']]), + 'expectedResponseCode' => Status::INTERNAL_SERVER_ERROR, + 'expectedResponseMessage' => '"Internal server error, could not schedule any jobs."', + 'expectedJobs' => [], + 'expectError' => true, + ], + 'first two jobs scheduled, third failed, fourth never scheduled' => [ + 'payload' => json_encode(['jobs' => ['job1', 'job2', 'throw http response exception', 'job4']]), + 'expectedResponseCode' => Status::PRECONDITION_FAILED, + 'expectedResponseMessage' => '"Some other custom message, only scheduled the first 2 job(s) to be queued."', + 'expectedJobs' => ['job1', 'job2'], + 'expectError' => true, + ], + ]; } public function testHttpRequestProducerWithWrongUriShouldReturn404() @@ -104,8 +151,7 @@ public function testHttpRequestProducerWithWrongUriShouldReturn404() self::$kernel->boot(); - $this->assertFileNotExists($this->workerFile); - $this->assertReadyJobsCountInTube(0, self::FLOW_CODE); + $this->assertWorkedJobs([]); } private function waitForConnectionAvailable(string $uri): Promise @@ -122,4 +168,30 @@ private function waitForConnectionAvailable(string $uri): Promise $connection->close(); }); } + + /** + * @param string[] $jobs + */ + private function assertWorkedJobs(array $jobs): void + { + if (count($jobs) === 0) { + $this->assertFileNotExists($this->workerFile); + } else { + $workerFileLines = $this->getFileLines($this->workerFile); + $this->assertCount(count($jobs), $workerFileLines); + foreach ($jobs as $index => $jobData) { + $this->assertContains($jobData, $workerFileLines[$index]); + + $this->assertTrue($this->logHandler()->hasRecordThatPasses( + function (array $logEntry) use ($jobData): bool { + return $logEntry['message'] === 'Successfully produced a new Job' + && $logEntry['context']['payload_data'] === [$jobData]; + }, + Logger::INFO + )); + } + } + + $this->assertReadyJobsCountInTube(0, self::FLOW_CODE); + } }