From 3b0e730a9df08902cfc578742ceb0af867751e7b Mon Sep 17 00:00:00 2001 From: Vincent Amstoutz Date: Thu, 19 Dec 2024 15:02:49 +0100 Subject: [PATCH] feat(octane): add support for concurrent tasks with FrankenPHP --- src/Concerns/ProvidesConcurrencySupport.php | 9 + .../FrankenPhpHttpTaskDispatcher.php | 82 ++++++++ src/FrankenPhp/FrankenPhpTaskDispatcher.php | 74 ++++++++ src/FrankenPhp/TaskResult.php | 10 + tests/FrankenPhpHttpTaskDispatcherTest.php | 175 ++++++++++++++++++ tests/FrankenPhpTaskDispatcherTest.php | 120 ++++++++++++ 6 files changed, 470 insertions(+) create mode 100644 src/FrankenPhp/FrankenPhpHttpTaskDispatcher.php create mode 100644 src/FrankenPhp/FrankenPhpTaskDispatcher.php create mode 100644 src/FrankenPhp/TaskResult.php create mode 100644 tests/FrankenPhpHttpTaskDispatcherTest.php create mode 100644 tests/FrankenPhpTaskDispatcherTest.php diff --git a/src/Concerns/ProvidesConcurrencySupport.php b/src/Concerns/ProvidesConcurrencySupport.php index 134f601f6..f518031df 100644 --- a/src/Concerns/ProvidesConcurrencySupport.php +++ b/src/Concerns/ProvidesConcurrencySupport.php @@ -3,6 +3,9 @@ namespace Laravel\Octane\Concerns; use Laravel\Octane\Contracts\DispatchesTasks; +use Laravel\Octane\FrankenPhp\FrankenPhpHttpTaskDispatcher; +use Laravel\Octane\FrankenPhp\FrankenPhpTaskDispatcher; +use Laravel\Octane\FrankenPhp\ServerStateFile as FrankenPhpServerStateFile; use Laravel\Octane\SequentialTaskDispatcher; use Laravel\Octane\Swoole\ServerStateFile; use Laravel\Octane\Swoole\SwooleHttpTaskDispatcher; @@ -35,6 +38,12 @@ public function tasks() { return match (true) { app()->bound(DispatchesTasks::class) => app(DispatchesTasks::class), + app()->bound(FrankenPhpServerStateFile::class) => new FrankenPhpTaskDispatcher(), + class_exists(FrankenPhpServerStateFile::class) => new FrankenPhpHttpTaskDispatcher( + '127.0.0.1', + '8000', + new SequentialTaskDispatcher + ), app()->bound(Server::class) => new SwooleTaskDispatcher, class_exists(Server::class) => (fn (array $serverState) => new SwooleHttpTaskDispatcher( $serverState['state']['host'] ?? '127.0.0.1', diff --git a/src/FrankenPhp/FrankenPhpHttpTaskDispatcher.php b/src/FrankenPhp/FrankenPhpHttpTaskDispatcher.php new file mode 100644 index 000000000..8fe610bd7 --- /dev/null +++ b/src/FrankenPhp/FrankenPhpHttpTaskDispatcher.php @@ -0,0 +1,82 @@ +mapWithKeys(function ($task, $key) { + return [ + $key => $task instanceof Closure + ? new SerializableClosure($task) + : $task, + ]; + })->all(); + + try { + $response = Http::timeout(($waitMilliseconds / 1000) + 5)->post("http://{$this->host}:{$this->port}/octane/resolve-tasks", [ + 'tasks' => Crypt::encryptString(serialize($tasks)), + 'wait' => $waitMilliseconds, + ]); + + return match ($response->status()) { + 200 => unserialize($response), + 504 => throw TaskTimeoutException::after($waitMilliseconds), + default => throw TaskExceptionResult::from( + new Exception('Invalid response from task server.'), + )->getOriginal(), + }; + } catch (ConnectionException) { + return $this->fallbackDispatcher->resolve($tasks, $waitMilliseconds); + } + } + + /** + * Concurrently dispatch the given callbacks via background tasks. + */ + public function dispatch(array $tasks): void + { + $tasks = collect($tasks)->mapWithKeys(function ($task, $key) { + return [ + $key => $task instanceof Closure + ? new SerializableClosure($task) + : $task, + ]; + })->all(); + + try { + Http::post("http://{$this->host}:{$this->port}/octane/dispatch-tasks", [ + 'tasks' => Crypt::encryptString(serialize($tasks)), + ]); + } catch (ConnectionException) { + $this->fallbackDispatcher->dispatch($tasks); + } + } +} diff --git a/src/FrankenPhp/FrankenPhpTaskDispatcher.php b/src/FrankenPhp/FrankenPhpTaskDispatcher.php new file mode 100644 index 000000000..5df5f59ae --- /dev/null +++ b/src/FrankenPhp/FrankenPhpTaskDispatcher.php @@ -0,0 +1,74 @@ +bound(ServerStateFile::class)) { + throw new InvalidArgumentException('Tasks can only be dispatched within a FrankenPHP server context.'); + } + + $results = app(ServerStateFile::class)->taskWaitMulti(collect($tasks)->mapWithKeys(function ($task, $key) { + return [$key => $task instanceof Closure + ? new SerializableClosure($task) + : $task, ]; + })->all(), $waitMilliseconds / 1000); + + if ($results === false) { + throw TaskTimeoutException::after($waitMilliseconds); + } + + $i = 0; + + foreach ($tasks as $key => $task) { + if (isset($results[$i])) { + if ($results[$i] instanceof TaskExceptionResult) { + throw $results[$i]->getOriginal(); + } + + $tasks[$key] = $results[$i]->result; + } else { + $tasks[$key] = false; + } + + $i++; + } + + return $tasks; + } + + /** + * Concurrently dispatch the given callbacks via background tasks. + */ + public function dispatch(array $tasks): void + { + if (! app()->bound(ServerStateFile::class)) { + throw new InvalidArgumentException('Tasks can only be dispatched within a FrankenPHP server context.'); + } + + $server = app(ServerStateFile::class); + + collect($tasks)->each(function ($task) use ($server) { + $server->task($task instanceof Closure ? new SerializableClosure($task) : $task); + }); + } +} diff --git a/src/FrankenPhp/TaskResult.php b/src/FrankenPhp/TaskResult.php new file mode 100644 index 000000000..21a08c3e0 --- /dev/null +++ b/src/FrankenPhp/TaskResult.php @@ -0,0 +1,10 @@ + Http::response(serialize(['first' => 1, 'second' => 2, 'third' => null])), + ]); + + $this->assertEquals([ + 'first' => 1, + 'second' => 2, + 'third' => null, + ], $dispatcher->resolve([ + 'first' => fn () => 1, + 'second' => fn () => 2, + 'third' => function () { + }, + ])); + } + + /** @doesNotPerformAssertions @test */ + public function test_tasks_can_be_dispatched_via_http(): void + { + $dispatcher = new FrankenPhpHttpTaskDispatcher( + '127.0.0.1', + '8000', + new SequentialTaskDispatcher, + ); + + Http::fake([ + '127.0.0.1:8000/octane/dispatch-tasks' => Http::response(serialize(['first' => 1, 'second' => 2])), + ]); + + $dispatcher->dispatch([ + 'first' => fn () => 1, + 'second' => fn () => 2, + ]); + } + + public function test_tasks_can_be_resolved_via_fallback_dispatcher(): void + { + $dispatcher = new FrankenPhpHttpTaskDispatcher( + '127.0.0.1', + '8000', + new SequentialTaskDispatcher, + ); + + $this->assertEquals([ + 'first' => 1, + 'second' => 2, + ], $dispatcher->resolve([ + 'first' => fn () => 1, + 'second' => fn () => 2, + ])); + } + + /** @doesNotPerformAssertions @test */ + public function test_tasks_can_be_dispatched_via_fallback_dispatcher(): void + { + $dispatcher = new FrankenPhpHttpTaskDispatcher( + '127.0.0.1', + '8000', + new SequentialTaskDispatcher, + ); + + $dispatcher->dispatch([ + 'first' => fn () => 1, + 'second' => fn () => 2, + ]); + } + + /** + * @throws TaskTimeoutException + */ + public function test_resolving_tasks_propagate_exceptions(): void + { + $dispatcher = new FrankenPhpHttpTaskDispatcher( + '127.0.0.1', + '8000', + new SequentialTaskDispatcher, + ); + + Http::fake([ + '127.0.0.1:8000/octane/resolve-tasks' => Http::response(null, 500), + ]); + + $this->expectException(TaskException::class); + $this->expectExceptionMessage('Invalid response from task server.'); + + $dispatcher->resolve(['first' => fn () => throw new Exception('Something went wrong.')]); + } + + /** + * @throws TaskTimeoutException + */ + public function test_resolving_tasks_propagate_dd_calls(): void + { + $dispatcher = new FrankenPhpHttpTaskDispatcher( + '127.0.0.1', + '8000', + new SequentialTaskDispatcher, + ); + + Http::fake([ + '127.0.0.1:8000/octane/resolve-tasks' => Http::response(null, 500), + ]); + + $this->expectException(TaskException::class); + $this->expectExceptionMessage('Invalid response from task server.'); + + $dispatcher->resolve(['first' => fn () => throw new DdException(['foo' => 'bar'])]); + } + + /** @doesNotPerformAssertions @test */ + public function test_dispatching_tasks_do_not_propagate_exceptions(): void + { + $dispatcher = new FrankenPhpHttpTaskDispatcher( + '127.0.0.1', + '8000', + new SequentialTaskDispatcher, + ); + + Http::fake([ + '127.0.0.1:8000/octane/dispatch-tasks' => Http::response(null, 500), + ]); + + $dispatcher->dispatch(['first' => fn () => throw new Exception('Something went wrong.')]); + } + + public function test_resolving_tasks_may_timeout(): void + { + $dispatcher = new FrankenPhpHttpTaskDispatcher( + '127.0.0.1', + '8000', + new SequentialTaskDispatcher, + ); + + Http::fake([ + '127.0.0.1:8000/octane/resolve-tasks' => Http::response(null, 504), + ]); + + $this->expectException(TaskTimeoutException::class); + $this->expectExceptionMessage('Task timed out after 2000 milliseconds.'); + + $dispatcher->resolve(['first' => fn () => 1], 2000); + } + + protected function getPackageProviders($app): array + { + return ['Laravel\Octane\OctaneServiceProvider']; + } +} diff --git a/tests/FrankenPhpTaskDispatcherTest.php b/tests/FrankenPhpTaskDispatcherTest.php new file mode 100644 index 000000000..319ab6aee --- /dev/null +++ b/tests/FrankenPhpTaskDispatcherTest.php @@ -0,0 +1,120 @@ +expectException(InvalidArgumentException::class); + + $dispatcher->resolve(['first' => fn () => 1]); + } + + public function test_tasks_can_only_be_dispatched_via_server_context(): void + { + $dispatcher = new FrankenPhpTaskDispatcher(); + + $this->expectException(InvalidArgumentException::class); + + $dispatcher->dispatch(['first' => fn () => 1]); + } + + public function test_resolving_tasks_may_timeout(): void + { + $dispatcher = new FrankenPhpTaskDispatcher(); + + $this->instance(ServerStateFile::class, Mockery::mock(ServerStateFile::class, function ($mock) { + $mock->shouldReceive('taskWaitMulti') + ->with(\Mockery::any(), 2) + ->once() + ->andReturn(false); + })); + + $this->expectException(TaskTimeoutException::class); + $this->expectExceptionMessage('Task timed out after 2000 milliseconds.'); + + $dispatcher->resolve(['first' => fn () => 1], 2000); + } + + public function test_resolving_tasks_propagate_exceptions(): void + { + $dispatcher = new FrankenPhpTaskDispatcher(); + + $this->instance(ServerStateFile::class, Mockery::mock(ServerStateFile::class, function ($mock) { + $mock->shouldReceive('taskWaitMulti') + ->once() + ->andReturn([TaskExceptionResult::from(new Exception('Something went wrong'))]); + })); + + $this->expectException(TaskException::class); + $this->expectExceptionMessage('Something went wrong'); + + $dispatcher->resolve(['first' => fn () => 1]); + } + + public function test_resolving_tasks_propagate_dd_calls(): void + { + $dispatcher = new FrankenPhpTaskDispatcher(); + + $this->instance(ServerStateFile::class, Mockery::mock(ServerStateFile::class, function ($mock) { + $mock->shouldReceive('taskWaitMulti') + ->once() + ->andReturn([TaskExceptionResult::from( + new DdException(['foo' => 'bar']) + )]); + })); + + $this->expectException(DdException::class); + $this->expectExceptionMessage(json_encode(['foo' => 'bar'])); + + $dispatcher->resolve([ + 'first' => fn () => throw new DdException(['foo' => 'bar']), + ]); + } + + public function test_dispatching_tasks_do_not_propagate_exceptions(): void + { + $dispatcher = new FrankenPhpTaskDispatcher(); + + $this->instance(ServerStateFile::class, Mockery::mock(ServerStateFile::class, function ($mock) { + $mock->shouldReceive('task') + ->once(); + })); + + $dispatcher->dispatch(['first' => fn () => throw new Exception('Something went wrong.')]); + } + + public function test_tasks_can_be_resolved(): void + { + $dispatcher = new FrankenPhpTaskDispatcher(); + + $this->instance(ServerStateFile::class, Mockery::mock(ServerStateFile::class, function ($mock) { + $mock->shouldReceive('taskWaitMulti') + ->once() + ->andReturn([new TaskResult(1), new TaskResult(2)]); + })); + + $this->assertEquals([ + 'first' => 1, + 'second' => 2, + ], $dispatcher->resolve([ + 'first' => fn () => 1, + 'second' => fn () => 2, + ])); + } +}