From dfa15eb01e2b1beda774a9afd4f6d785d00d44b5 Mon Sep 17 00:00:00 2001 From: "Hermann D. Schimpf" Date: Wed, 1 Mar 2023 12:30:06 -0300 Subject: [PATCH 01/12] Add symfony/console as suggested package --- composer.json | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/composer.json b/composer.json index 37b935f..1abd786 100644 --- a/composer.json +++ b/composer.json @@ -10,7 +10,8 @@ } ], "suggest": { - "ext-parallel": "Allows to run multi-threaded processes" + "ext-parallel": "Allows to run multi-threaded processes", + "symfony/console": "Allows usage of a shared ProgressBar between the Workers" }, "require": { "php": ">=8.0" @@ -25,7 +26,8 @@ }, "require-dev": { "phpunit/phpunit": "^9.6", - "roave/security-advisories": "dev-latest" + "roave/security-advisories": "dev-latest", + "symfony/console": "^6.0" }, "autoload-dev": { "psr-4": { From 619ccbcd8cf528064af5fc2924556c6aab7f544a Mon Sep 17 00:00:00 2001 From: "Hermann D. Schimpf" Date: Wed, 1 Mar 2023 14:19:47 -0300 Subject: [PATCH 02/12] Updated README --- README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.md b/README.md index 3afa5e8..f16b2c5 100644 --- a/README.md +++ b/README.md @@ -115,3 +115,12 @@ Scheduler::disconnect(); ### References 1. [Parallel\Runtime::run() Task Characteristics](https://www.php.net/manual/en/parallel-runtime.run.php#refsect1-parallel-runtime.run-closure-characteristics) + +# Security Vulnerabilities +If you encounter any security related issue, feel free to raise a ticket on the issue traker. + +# Contributors +- [Hermann D. Schimpf](https://hds-solutions.net) + +# Licence +GPL-3.0 Please see [License File](LICENSE) for more information. From 3899d109ec0453610bac7c828b8a91041ead13fa Mon Sep 17 00:00:00 2001 From: "Hermann D. Schimpf" Date: Thu, 2 Mar 2023 15:45:05 -0300 Subject: [PATCH 03/12] Minor changes, big internal changes * Renamed Scheduler::with() method to using() * Renamed ParallelWorker::processTask() to process() * Redesigned internal data manipulation * Create worker instance inside thread to allow passing data to constructor --- README.md | 13 +-- src/Parallel/Contracts/ParallelWorker.php | 55 ++++++++++ src/Parallel/Internals/PendingTask.php | 30 ++++++ src/Parallel/Internals/RegisteredWorker.php | 22 ++++ src/Parallel/Internals/Worker.php | 15 +++ src/Parallel/ParallelWorker.php | 50 ++++++--- src/Parallel/PendingTask.php | 30 ------ src/Parallel/ProcessedTask.php | 14 +-- src/Parallel/Scheduler.php | 107 ++++++++++---------- src/Parallel/Worker.php | 18 ---- tests/ParallelTest.php | 60 ++++++++--- tests/{ => Workers}/AnotherWorker.php | 4 +- tests/{ => Workers}/TestWorker.php | 4 +- 13 files changed, 277 insertions(+), 145 deletions(-) create mode 100644 src/Parallel/Contracts/ParallelWorker.php create mode 100644 src/Parallel/Internals/PendingTask.php create mode 100644 src/Parallel/Internals/RegisteredWorker.php create mode 100644 src/Parallel/Internals/Worker.php delete mode 100644 src/Parallel/PendingTask.php delete mode 100644 src/Parallel/Worker.php rename tests/{ => Workers}/AnotherWorker.php (77%) rename tests/{ => Workers}/TestWorker.php (77%) diff --git a/README.md b/README.md index f16b2c5..eaed211 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ composer require hds-solutions/parallel-sdk ## Usage You need to define a `Worker` that will process the tasks. There are two options: 1. Using an anonymous function as a `Worker`. -2. Creating a class that extends from `ParallelWorker` and implements the `processTask()` method. +2. Creating a class that extends from `ParallelWorker` and implements the `process()` method. Then you can schedule tasks to run in parallel using `Scheduler::runTask()` method. @@ -31,7 +31,7 @@ Defining an anonymous function as a `Worker` to process the tasks. ```php use HDSSolutions\Console\Parallel\Scheduler; -Scheduler::with(static function(int $number): int { +Scheduler::using(static function(int $number): int { // here you do some work with the received data // this portion of code will run on a separated thread @@ -55,7 +55,7 @@ use HDSSolutions\Console\Parallel\ParallelWorker; final class ExampleWorker extends ParallelWorker { - protected function processTask(int $number = 0): int { + protected function process(int $number = 0): int { // example process $microseconds = random_int(100, 500); echo sprintf("ExampleWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds); @@ -72,7 +72,7 @@ final class ExampleWorker extends ParallelWorker { use HDSSolutions\Console\Parallel\Scheduler; $worker = new ExampleWorker(); -Scheduler::with($worker); +Scheduler::using($worker); ``` ### Schedule tasks @@ -93,13 +93,14 @@ foreach (range(1, 100) as $task) { ``` ### Get processed tasks result + ```php use HDSSolutions\Console\Parallel\Scheduler; use HDSSolutions\Console\Parallel\ProcessedTask; foreach (Scheduler::getProcessedTasks() as $processed_task) { - // you have access to Worker that processed the task - $worker = $processed_task->getWorker(); + // you have access to the Worker class that was used to processed the task + $worker = $processed_task->getWorkerClass(); // and the result of the task processed $result = $processed_task->getResult(); } diff --git a/src/Parallel/Contracts/ParallelWorker.php b/src/Parallel/Contracts/ParallelWorker.php new file mode 100644 index 0000000..f9184b5 --- /dev/null +++ b/src/Parallel/Contracts/ParallelWorker.php @@ -0,0 +1,55 @@ +registered_worker; + } + + /** + * @return mixed Data of the Task + */ + public function getData(): mixed { + return $this->data; + } + +} diff --git a/src/Parallel/Internals/RegisteredWorker.php b/src/Parallel/Internals/RegisteredWorker.php new file mode 100644 index 0000000..8196a1b --- /dev/null +++ b/src/Parallel/Internals/RegisteredWorker.php @@ -0,0 +1,22 @@ +worker_class; + } + + public function getClosure(): ?Closure { + return $this->closure; + } + +} diff --git a/src/Parallel/Internals/Worker.php b/src/Parallel/Internals/Worker.php new file mode 100644 index 0000000..9972215 --- /dev/null +++ b/src/Parallel/Internals/Worker.php @@ -0,0 +1,15 @@ +processTask( ...func_get_args() ); + final public function __construct() {} + + final public function getState(): int { + return $this->state; } - final public function onTaskFinished(Closure $callback): self { - // register callback - $this->taskFinishedCallback = $callback; + final public function start(...$args): void { + if ($this->state !== self::STATE_New) { + throw new RuntimeException('This Worker has been already started'); + } - return $this; + $this->state = self::STATE_Running; + $this->result = $this->process(...$args); + $this->state = self::STATE_Finished; } - final public function dispatchTaskFinished(...$result): void { - // pass task result to callback - ($this->taskFinishedCallback)(...$result); + /** + * Processes task data and returns the result + * + * @return mixed Task processing result + */ + abstract protected function process(): mixed; + + final public function getProcessedTask(): ProcessedTask { + if ($this->state !== self::STATE_Finished) { + throw new RuntimeException('This Worker hasn\'t been started'); + } + + return new ProcessedTask(get_class($this), $this->result); } } diff --git a/src/Parallel/PendingTask.php b/src/Parallel/PendingTask.php deleted file mode 100644 index 427e308..0000000 --- a/src/Parallel/PendingTask.php +++ /dev/null @@ -1,30 +0,0 @@ -worker_id; - } - - /** - * @return mixed Data of the Task - */ - public function getData(): mixed { - return $this->data; - } - -} diff --git a/src/Parallel/ProcessedTask.php b/src/Parallel/ProcessedTask.php index 3198665..ee5865a 100644 --- a/src/Parallel/ProcessedTask.php +++ b/src/Parallel/ProcessedTask.php @@ -5,23 +5,23 @@ final class ProcessedTask { /** - * @param ParallelWorker $worker Worker that processed the task - * @param mixed $result Result of the task + * @param string $worker_class Worker class used to process the Task + * @param mixed $result Result of the Task */ public function __construct( - private ParallelWorker $worker, + private string $worker_class, private mixed $result, ) {} /** - * @return ParallelWorker + * @return string Worker class that processed the Task */ - public function getWorker(): ParallelWorker { - return $this->worker; + public function getWorkerClass(): string { + return $this->worker_class; } /** - * @return mixed + * @return mixed Result of the processed Task */ public function getResult(): mixed { return $this->result; diff --git a/src/Parallel/Scheduler.php b/src/Parallel/Scheduler.php index 7bedc7c..11958b6 100644 --- a/src/Parallel/Scheduler.php +++ b/src/Parallel/Scheduler.php @@ -5,6 +5,9 @@ use Closure; use Exception; use Generator; +use HDSSolutions\Console\Parallel\Internals\PendingTask; +use HDSSolutions\Console\Parallel\Internals\RegisteredWorker; +use HDSSolutions\Console\Parallel\Internals\Worker; use parallel\Channel; use parallel\Future; use parallel\Runtime; @@ -17,8 +20,8 @@ final class Scheduler { /** @var Scheduler Singleton instance */ private static self $instance; - /** @var ParallelWorker[] Registered workers */ - private array $workers; + /** @var RegisteredWorker[] Registered workers */ + private array $registered_workers = []; /** @var ?Channel Channel to wait threads start */ private ?Channel $starter = null; @@ -47,17 +50,20 @@ private static function instance(): self { } /** - * Register the worker to process tasks + * Register a worker class to process tasks * - * @param Closure|ParallelWorker $worker Worker to process tasks + * @param string | Closure $worker Worker class to be used for processing tasks * - * @return ParallelWorker + * @return RegisteredWorker */ - public static function with(Closure | ParallelWorker $worker): ParallelWorker { + public static function using(string | Closure $worker): RegisteredWorker { // convert Closure to ParallelWorker instance - self::instance()->workers[] = $worker instanceof ParallelWorker ? $worker : new Worker($worker); + self::instance()->registered_workers[] = $registered_worker = new RegisteredWorker( + worker_class: is_string($worker) ? $worker : Worker::class, + closure: $worker instanceof Closure ? $worker : null, + ); - return $worker; + return $registered_worker; } /** @@ -76,13 +82,15 @@ public static function with(Closure | ParallelWorker $worker): ParallelWorker { */ public static function runTask(mixed ...$data): void { // check if a worker was defined - if (($worker_id = count(self::instance()->workers) - 1) < 0) { + if (($worker_id = count(self::instance()->registered_workers) - 1) < 0) { // reject task scheduling, no worker is defined throw new RuntimeException('No worker is defined'); } - // save data to pending tasks - self::instance()->pendingTasks[] = new PendingTask($worker_id, $data); + // get registered worker + $registered_worker = self::instance()->registered_workers[$worker_id]; + // register a pending task linked with the registered Worker + self::instance()->pendingTasks[] = new PendingTask($registered_worker, $data); do { // remove finished tasks @@ -143,8 +151,8 @@ public static function stop(bool $force = true): void { // wait for all tasks to finish while ( !empty(array_filter(self::instance()->futures, static fn(Future $future): bool => !$future->done()))) usleep(10_000); // close channels - self::instance()->__starter?->close(); - self::instance()->__starter = null; + self::instance()->starter?->close(); + self::instance()->starter = null; } /** @@ -164,45 +172,48 @@ private function runNextTask(): void { if ( !empty($this->pendingTasks) && $this->hasCpuAvailable()) { // get next available pending task $pending_task = array_shift($this->pendingTasks); - // get worker ID from pending task - $worker_id = $pending_task->getWorkerId(); // create starter channel to wait threads start event $this->starter ??= extension_loaded('parallel') ? Channel::make('starter') : null; // process task inside a thread (if parallel extension is available) - $this->futures[] = ( !extension_loaded('parallel') - // normal execution (non-threaded) - ? [ $worker_id, $this->workers[$worker_id](...$pending_task->getData()) ] - + if (extension_loaded('parallel')) { // parallel available, process task inside a thread - : run(static function(...$data): array { - /** @var int $worker_id */ - $worker_id = array_shift($data); - /** @var ParallelWorker $worker */ - $worker = array_shift($data); - + $this->futures[] = run(static function(PendingTask $pending_task): ProcessedTask { // notify that thread started Channel::open('starter')->send(true); - // process task using specified worker - $result = $worker(...$data); - - // execute finished event - try { $worker->dispatchTaskFinished($result); - } catch (Exception) {} - - // return worker ID and result - return [ $worker_id, $result ]; + // get Worker class to instantiate + $worker_class = $pending_task->getRegisteredWorker()->getWorkerClass(); + /** @var ParallelWorker $worker Instance of the Worker */ + $worker = new $worker_class(); + // build task params + $params = $worker instanceof Worker + // process task using local Worker + ? [ $pending_task->getRegisteredWorker()->getClosure(), ...$pending_task->getData() ] + // process task using user Worker + : [ ...$pending_task->getData() ]; + // process task + $worker->start(...$params); + + // return Worker result + return $worker->getProcessedTask(); }, [ - // send worker ID for returning value - $worker_id, - // worker to process task - $this->workers[$worker_id], - // task data to pass to the worker - ...$pending_task->getData(), - ]) - ); + // send pending task to process + $pending_task, + ]); + + } else { + // get Worker class to instantiate + $worker_class = $pending_task->getRegisteredWorker()->getWorkerClass(); + /** @var ParallelWorker $worker Instance of the Worker */ + $worker = new $worker_class(); + // process task using worker + $worker->start(...$pending_task->getData()); + + // store Worker result + $this->futures[] = $worker->getProcessedTask(); + } // wait for thread to start $this->starter?->recv(); @@ -225,16 +236,10 @@ private function cleanFinishedTasks(): void { foreach ($this->futures as $idx => $future) { // check if future is already done working if ( !extension_loaded('parallel') || $future->done()) { - try { - // get result to release thread - $result = extension_loaded('parallel') ? $future->value() : $future; - // get worker identifier - $worker_id = array_shift($result); - // get process result - $result = array_shift($result); - // store Task result - $this->results[] = new ProcessedTask($this->workers[$worker_id], $result); + // store the ProcessedTask + try { $this->results[] = extension_loaded('parallel') ? $future->value() : $future; } catch (Throwable) {} + // add future idx to finished tasks list $finished_tasks[] = $idx; } diff --git a/src/Parallel/Worker.php b/src/Parallel/Worker.php deleted file mode 100644 index fbcfe18..0000000 --- a/src/Parallel/Worker.php +++ /dev/null @@ -1,18 +0,0 @@ -processor)($broadcast, ...$data); - } - -} diff --git a/tests/ParallelTest.php b/tests/ParallelTest.php index 5c920eb..fde0558 100644 --- a/tests/ParallelTest.php +++ b/tests/ParallelTest.php @@ -2,8 +2,11 @@ namespace HDSSolutions\Console\Tests; +use HDSSolutions\Console\Parallel\Internals\Worker; use HDSSolutions\Console\Parallel\Scheduler; +use HDSSolutions\Console\Tests\Workers; use PHPUnit\Framework\TestCase; +use RuntimeException; use Throwable; use function parallel\bootstrap; @@ -20,27 +23,56 @@ public function testThatParallelExtensionIsAvailable(): void { } } + /** + * @depends testThatParallelExtensionIsAvailable + */ + public function testThatWorkerMustBeDefinedValidates(): void { + $this->expectException(RuntimeException::class); + Scheduler::runTask(123); + } + + /** + * @depends testThatParallelExtensionIsAvailable + */ + public function testThatClosureCanBeUsedAsWorker(): void { + Scheduler::using(static function($input) { + usleep(random_int(100, 500) * 1000); + return $input * 2; + }); + + foreach ($tasks = range(1, 10) as $task) { + try { Scheduler::runTask($task); + } catch (Throwable) { + Scheduler::stop(); + } + } + + $results = []; + foreach (Scheduler::getProcessedTasks() as $processed_task) { + $this->assertEquals(Worker::class, $processed_task->getWorkerClass()); + $results[] = $processed_task->getResult(); + } + // tasks results must be the same count + $this->assertCount(count($tasks), $results); + } + /** * @depends testThatParallelExtensionIsAvailable */ public function testParallel(): void { $workers = [ - new TestWorker(), - new AnotherWorker(), + Workers\TestWorker::class, + Workers\AnotherWorker::class, ]; $tasks = []; foreach ($workers as $idx => $worker) { // register worker - Scheduler::with($worker) - // register task finished callback - ->onTaskFinished(static function($task_no) { - echo sprintf("%s finished\n", $task_no); - }); + Scheduler::using($worker); // build example "tasks" - $tasks[get_class($worker)] = range(($idx + 1) * 100, ($idx + 1) * 100 + 25); + $tasks[$worker] = range(($idx + 1) * 100, ($idx + 1) * 100 + 25); // run example tasks - foreach ($tasks[get_class($worker)] as $task) { + foreach ($tasks[$worker] as $task) { try { Scheduler::runTask($task); } catch (Throwable) { Scheduler::stop(); @@ -50,10 +82,10 @@ public function testParallel(): void { $results = []; // fetch processed tasks and store their results - foreach (Scheduler::getProcessedTasks() as $task_result) { + foreach (Scheduler::getProcessedTasks() as $processed_task) { echo sprintf("Task result from #%s => %u\n", - $worker_class = get_class($task_result->getWorker()), - $result = $task_result->getResult()); + $worker_class = $processed_task->getWorkerClass(), + $result = $processed_task->getResult()); $results[$worker_class][] = $result; } @@ -62,9 +94,9 @@ public function testParallel(): void { // check results foreach ($workers as $worker) { // get original tasks - $worker_tasks = $tasks[get_class($worker)]; + $worker_tasks = $tasks[$worker]; // get tasks results - $worker_results = $results[get_class($worker)]; + $worker_results = $results[$worker]; // tasks results must be the same count $this->assertCount(count($worker_tasks), $worker_results); diff --git a/tests/AnotherWorker.php b/tests/Workers/AnotherWorker.php similarity index 77% rename from tests/AnotherWorker.php rename to tests/Workers/AnotherWorker.php index 3ada416..82e9925 100644 --- a/tests/AnotherWorker.php +++ b/tests/Workers/AnotherWorker.php @@ -1,12 +1,12 @@ > Hello from task #%u, I'll wait %sms\n", $number, $microseconds); usleep($microseconds * 1000); diff --git a/tests/TestWorker.php b/tests/Workers/TestWorker.php similarity index 77% rename from tests/TestWorker.php rename to tests/Workers/TestWorker.php index 74842cf..12372be 100644 --- a/tests/TestWorker.php +++ b/tests/Workers/TestWorker.php @@ -1,12 +1,12 @@ > Hello from task #%u, I'll wait %sms\n", $number, $microseconds); usleep($microseconds * 1000); From d3078846af3e7fe490b6320c088c710f60850365 Mon Sep 17 00:00:00 2001 From: "Hermann D. Schimpf" Date: Thu, 2 Mar 2023 15:54:23 -0300 Subject: [PATCH 04/12] Process start and end time --- src/Parallel/Contracts/ParallelWorker.php | 10 +++++++++ src/Parallel/ParallelWorker.php | 26 ++++++++++++++++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/Parallel/Contracts/ParallelWorker.php b/src/Parallel/Contracts/ParallelWorker.php index f9184b5..c394b6b 100644 --- a/src/Parallel/Contracts/ParallelWorker.php +++ b/src/Parallel/Contracts/ParallelWorker.php @@ -45,6 +45,16 @@ public function getState(): int; */ public function start(...$args): void; + /** + * @return ?float Time when Worker started processing the Task, null if Worker didn't start yet + */ + public function getStartedAt(): ?float; + + /** + * @return ?float Time when Worker finished processing the Task, null if Worker didn't finish yet + */ + public function getFinishedAt(): ?float; + /** * Returns the processed task * diff --git a/src/Parallel/ParallelWorker.php b/src/Parallel/ParallelWorker.php index 99f780f..36b369f 100644 --- a/src/Parallel/ParallelWorker.php +++ b/src/Parallel/ParallelWorker.php @@ -3,6 +3,7 @@ namespace HDSSolutions\Console\Parallel; use RuntimeException; +use Throwable; abstract class ParallelWorker implements Contracts\ParallelWorker { @@ -12,6 +13,16 @@ abstract class ParallelWorker implements Contracts\ParallelWorker { */ private int $state = self::STATE_New; + /** + * @var float Time when process started + */ + private float $started_at; + + /** + * @var float Time when process finished + */ + private float $finished_at; + /** * @var mixed Worker execution result */ @@ -29,7 +40,12 @@ final public function start(...$args): void { } $this->state = self::STATE_Running; - $this->result = $this->process(...$args); + $this->started_at = microtime(true); + + try { $this->result = $this->process(...$args); + } catch (Throwable) {} + + $this->finished_at = microtime(true); $this->state = self::STATE_Finished; } @@ -40,6 +56,14 @@ final public function start(...$args): void { */ abstract protected function process(): mixed; + final public function getStartedAt(): ?float { + return $this->started_at ?? null; + } + + final public function getFinishedAt(): ?float { + return $this->finished_at ?? null; + } + final public function getProcessedTask(): ProcessedTask { if ($this->state !== self::STATE_Finished) { throw new RuntimeException('This Worker hasn\'t been started'); From c61959ada496de5516af677642152bc4415dab9d Mon Sep 17 00:00:00 2001 From: "Hermann D. Schimpf" Date: Thu, 2 Mar 2023 16:08:09 -0300 Subject: [PATCH 05/12] We are just using composer's default autoloader * Also, update README to tell about bootstrapping parallel --- README.md | 14 ++++++++++++-- phpunit.xml | 2 +- tests/ParallelTest.php | 4 ++-- tests/config/bootstrap.php | 3 --- 4 files changed, 15 insertions(+), 8 deletions(-) delete mode 100644 tests/config/bootstrap.php diff --git a/README.md b/README.md index eaed211..c227638 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,15 @@ composer require hds-solutions/parallel-sdk ``` ## Usage +Firstly, you need to set the bootstrap file for parallel. Setting the composer's autoloader is enough. See reference [#1](#references) for more info. +```php +// check if extension is loaded to allow deploying even in envorinments where parallel isn't installed +if (extension_loaded('parallel')) { + // set the path to composer's autoloader + parallel\bootstrap(__DIR__.'/vendor/autoload.php'); +} +``` + You need to define a `Worker` that will process the tasks. There are two options: 1. Using an anonymous function as a `Worker`. 2. Creating a class that extends from `ParallelWorker` and implements the `process()` method. @@ -87,7 +96,7 @@ foreach (range(1, 100) as $task) { } catch (Throwable) { // if no Worker was defined, a RuntimeException will be thrown - // also, Workers have some limitations, see Reference #1 for more info + // also, Workers have some limitations, see Reference #2 for more info } } ``` @@ -115,7 +124,8 @@ Scheduler::disconnect(); ``` ### References -1. [Parallel\Runtime::run() Task Characteristics](https://www.php.net/manual/en/parallel-runtime.run.php#refsect1-parallel-runtime.run-closure-characteristics) +1. [parallel\bootstrap()](https://www.php.net/manual/en/parallel.bootstrap.php) +2. [Parallel\Runtime::run() Task Characteristics](https://www.php.net/manual/en/parallel-runtime.run.php#refsect1-parallel-runtime.run-closure-characteristics) # Security Vulnerabilities If you encounter any security related issue, feel free to raise a ticket on the issue traker. diff --git a/phpunit.xml b/phpunit.xml index 0b4896f..31aace4 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -1,7 +1,7 @@ diff --git a/tests/ParallelTest.php b/tests/ParallelTest.php index fde0558..e105c5a 100644 --- a/tests/ParallelTest.php +++ b/tests/ParallelTest.php @@ -18,8 +18,8 @@ public function testThatParallelExtensionIsAvailable(): void { // check if extension is available if ($loaded) { - // set parallel bootstrap file - bootstrap(__DIR__.'/config/bootstrap.php'); + // set bootstrap file + bootstrap(__DIR__.'/../vendor/autoload.php'); } } diff --git a/tests/config/bootstrap.php b/tests/config/bootstrap.php deleted file mode 100644 index 5b98d42..0000000 --- a/tests/config/bootstrap.php +++ /dev/null @@ -1,3 +0,0 @@ - Date: Fri, 3 Mar 2023 16:42:59 -0300 Subject: [PATCH 06/12] Allow sending constructor arguments --- src/Parallel/Internals/RegisteredWorker.php | 5 +++++ src/Parallel/ParallelWorker.php | 2 -- src/Parallel/Scheduler.php | 6 ++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/Parallel/Internals/RegisteredWorker.php b/src/Parallel/Internals/RegisteredWorker.php index 8196a1b..42f260e 100644 --- a/src/Parallel/Internals/RegisteredWorker.php +++ b/src/Parallel/Internals/RegisteredWorker.php @@ -9,6 +9,7 @@ final class RegisteredWorker { public function __construct( private string $worker_class, private ?Closure $closure = null, + private array $args = [], ) {} public function getWorkerClass(): string { @@ -19,4 +20,8 @@ public function getClosure(): ?Closure { return $this->closure; } + public function getArgs(): array { + return $this->args; + } + } diff --git a/src/Parallel/ParallelWorker.php b/src/Parallel/ParallelWorker.php index 36b369f..8eb41ea 100644 --- a/src/Parallel/ParallelWorker.php +++ b/src/Parallel/ParallelWorker.php @@ -28,8 +28,6 @@ abstract class ParallelWorker implements Contracts\ParallelWorker { */ private mixed $result; - final public function __construct() {} - final public function getState(): int { return $this->state; } diff --git a/src/Parallel/Scheduler.php b/src/Parallel/Scheduler.php index 11958b6..6f4e7eb 100644 --- a/src/Parallel/Scheduler.php +++ b/src/Parallel/Scheduler.php @@ -53,14 +53,16 @@ private static function instance(): self { * Register a worker class to process tasks * * @param string | Closure $worker Worker class to be used for processing tasks + * @param mixed ...$args Arguments passed to Worker constructor * * @return RegisteredWorker */ - public static function using(string | Closure $worker): RegisteredWorker { + public static function using(string | Closure $worker, ...$args): RegisteredWorker { // convert Closure to ParallelWorker instance self::instance()->registered_workers[] = $registered_worker = new RegisteredWorker( worker_class: is_string($worker) ? $worker : Worker::class, closure: $worker instanceof Closure ? $worker : null, + args: $args, ); return $registered_worker; @@ -186,7 +188,7 @@ private function runNextTask(): void { // get Worker class to instantiate $worker_class = $pending_task->getRegisteredWorker()->getWorkerClass(); /** @var ParallelWorker $worker Instance of the Worker */ - $worker = new $worker_class(); + $worker = new $worker_class(...$pending_task->getRegisteredWorker()->getArgs()); // build task params $params = $worker instanceof Worker // process task using local Worker From 6ab1968590d9018ff95217ce6fce1a5081e7fae6 Mon Sep 17 00:00:00 2001 From: "Hermann D. Schimpf" Date: Fri, 3 Mar 2023 16:43:16 -0300 Subject: [PATCH 07/12] Updated tests to send constructor arguments --- tests/ParallelTest.php | 9 +++++++-- tests/Workers/AnotherWorker.php | 8 ++++++-- tests/Workers/TestWorker.php | 8 ++++++-- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/tests/ParallelTest.php b/tests/ParallelTest.php index e105c5a..fa3e2d8 100644 --- a/tests/ParallelTest.php +++ b/tests/ParallelTest.php @@ -66,9 +66,11 @@ public function testParallel(): void { ]; $tasks = []; + $multipliers = [ 2, 4, 8 ]; + foreach ($workers as $idx => $worker) { // register worker - Scheduler::using($worker); + Scheduler::using($worker, $multipliers); // build example "tasks" $tasks[$worker] = range(($idx + 1) * 100, ($idx + 1) * 100 + 25); // run example tasks @@ -101,7 +103,10 @@ public function testParallel(): void { // tasks results must be the same count $this->assertCount(count($worker_tasks), $worker_results); // tasks results must be in different order - $this->assertNotEquals($worker_tasks, $worker_results, 'Arrays are in the same order'); + $this->assertNotEquals($worker_tasks, array_column($worker_results, 0), 'Arrays are in the same order'); + + $result = array_shift($worker_results); + $this->assertEquals($result[1], $result[0] * array_product($multipliers)); } } diff --git a/tests/Workers/AnotherWorker.php b/tests/Workers/AnotherWorker.php index 82e9925..80bea78 100644 --- a/tests/Workers/AnotherWorker.php +++ b/tests/Workers/AnotherWorker.php @@ -6,12 +6,16 @@ final class AnotherWorker extends ParallelWorker { - protected function process(int $number = 0): int { + public function __construct( + private array $multipliers, + ) {} + + protected function process(int $number = 0): array { $microseconds = random_int(100, 500); echo sprintf("AnotherWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds); usleep($microseconds * 1000); - return $number; + return [ $number, $number * array_product($this->multipliers) ]; } } diff --git a/tests/Workers/TestWorker.php b/tests/Workers/TestWorker.php index 12372be..a0c929d 100644 --- a/tests/Workers/TestWorker.php +++ b/tests/Workers/TestWorker.php @@ -6,12 +6,16 @@ final class TestWorker extends ParallelWorker { - protected function process(int $number = 0): int { + public function __construct( + private array $multipliers, + ) {} + + protected function process(int $number = 0): array { $microseconds = random_int(100, 500); echo sprintf("TestWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds); usleep($microseconds * 1000); - return $number; + return [ $number, $number * array_product($this->multipliers) ]; } } From f3c8aab86a776dbe625b9440d62e8ecf8327a27f Mon Sep 17 00:00:00 2001 From: "Hermann D. Schimpf" Date: Wed, 8 Mar 2023 20:19:09 -0300 Subject: [PATCH 08/12] Create an unique ID for the instance * Used for channel uniqueness --- src/Parallel/Scheduler.php | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/Parallel/Scheduler.php b/src/Parallel/Scheduler.php index 6f4e7eb..6ebeea2 100644 --- a/src/Parallel/Scheduler.php +++ b/src/Parallel/Scheduler.php @@ -20,6 +20,9 @@ final class Scheduler { /** @var Scheduler Singleton instance */ private static self $instance; + /** @var string Unique ID of the instance */ + private string $uuid; + /** @var RegisteredWorker[] Registered workers */ private array $registered_workers = []; @@ -37,10 +40,13 @@ final class Scheduler { /** @var ?int Max CPU usage count */ private ?int $max_cpu_count = null; + /** * Disable public constructor, usage only available through singleton instance */ - private function __construct() {} + private function __construct() { + $this->uuid = uniqid(self::class, true); + } /** * @return self Singleton instance @@ -176,14 +182,14 @@ private function runNextTask(): void { $pending_task = array_shift($this->pendingTasks); // create starter channel to wait threads start event - $this->starter ??= extension_loaded('parallel') ? Channel::make('starter') : null; + $this->starter ??= extension_loaded('parallel') ? Channel::make(sprintf('starter@%s', $this->uuid)) : null; // process task inside a thread (if parallel extension is available) if (extension_loaded('parallel')) { // parallel available, process task inside a thread - $this->futures[] = run(static function(PendingTask $pending_task): ProcessedTask { + $this->futures[] = run(static function(string $uuid, PendingTask $pending_task): ProcessedTask { // notify that thread started - Channel::open('starter')->send(true); + Channel::open(sprintf('starter@%s', $uuid))->send(true); // get Worker class to instantiate $worker_class = $pending_task->getRegisteredWorker()->getWorkerClass(); @@ -201,6 +207,8 @@ private function runNextTask(): void { // return Worker result return $worker->getProcessedTask(); }, [ + // send UUID for starter channel + $this->uuid, // send pending task to process $pending_task, ]); From ad7414860e6b278cf44bb8f9cb5c981befc01438 Mon Sep 17 00:00:00 2001 From: "Hermann D. Schimpf" Date: Tue, 14 Mar 2023 10:33:05 -0300 Subject: [PATCH 09/12] Added a shared ProgressBar to output a progress on Worker tasks --- src/Parallel/Contracts/ParallelWorker.php | 36 ++++ .../Messages/ProgressBarActionMessage.php | 16 ++ .../ProgressBarRegistrationMessage.php | 16 ++ .../Internals/Messages/StatsReportMessage.php | 12 ++ src/Parallel/Internals/ProgressBarWorker.php | 147 ++++++++++++++++ src/Parallel/Internals/RegisteredWorker.php | 27 +++ src/Parallel/ParallelWorker.php | 53 ++++++ src/Parallel/Scheduler.php | 162 +++++++++++++++--- 8 files changed, 446 insertions(+), 23 deletions(-) create mode 100644 src/Parallel/Internals/Messages/ProgressBarActionMessage.php create mode 100644 src/Parallel/Internals/Messages/ProgressBarRegistrationMessage.php create mode 100644 src/Parallel/Internals/Messages/StatsReportMessage.php create mode 100644 src/Parallel/Internals/ProgressBarWorker.php diff --git a/src/Parallel/Contracts/ParallelWorker.php b/src/Parallel/Contracts/ParallelWorker.php index c394b6b..69cc8ca 100644 --- a/src/Parallel/Contracts/ParallelWorker.php +++ b/src/Parallel/Contracts/ParallelWorker.php @@ -45,6 +45,42 @@ public function getState(): int; */ public function start(...$args): void; + /** + * Associates a text with a named placeholder. + * + * @param string $message The text to associate with the placeholder + * @param string $name The name of the placeholder + */ + public function setMessage(string $message, string $name = 'message'): void; + + /** + * Advances the progress output X steps. + * + * @param int $steps Number of steps to advance + */ + public function advance(int $steps = 1): void; + + /** + * Moves the progress output to a specific step. + * + * @param int $step Step to move progress to + */ + public function setProgress(int $step): void; + + /** + * Outputs the current progress string. + */ + public function display(): void; + + /** + * Removes the progress bar from the current line. + * + * This is useful if you wish to write some output + * while a progress bar is running. + * Call display() to show the progress bar again. + */ + public function clear(): void; + /** * @return ?float Time when Worker started processing the Task, null if Worker didn't start yet */ diff --git a/src/Parallel/Internals/Messages/ProgressBarActionMessage.php b/src/Parallel/Internals/Messages/ProgressBarActionMessage.php new file mode 100644 index 0000000..cc4ebf5 --- /dev/null +++ b/src/Parallel/Internals/Messages/ProgressBarActionMessage.php @@ -0,0 +1,16 @@ +uuid)); + // notify successful start + $progresBarChannel->send(true); + + // create ProgressBar instance + $this->progressBar = $createProgressBarInstance(); + + // threads memory usage and peak + $this->threads_memory = [ + 'current' => [ $main_memory_usage() ], + 'peak' => [ memory_get_usage() ], + ]; + + // get next message + while (Type::Close !== $message = $progresBarChannel->recv()) { + // check for close event and exit loop + if ($message === Type::Close) break; + + switch ($message_class = get_class($message)) { + case ProgressBarRegistrationMessage::class: + $this->registerWorker($message->steps); + break; + + case StatsReportMessage::class: + // update memory usage for this thread + $this->threads_memory['current'][0] = $main_memory_usage(); + // update peak memory usage + if ($this->threads_memory['current'][0] > $this->threads_memory['peak'][0]) { + $this->threads_memory['peak'][0] = $this->threads_memory['current'][0]; + } + + // save memory usage of thread + $this->threads_memory['current'][$message->worker_id] = $message->memory_usage; + // update peak memory usage + if ($this->threads_memory['current'][$message->worker_id] > ($this->threads_memory['peak'][$message->worker_id] ?? 0)) { + $this->threads_memory['peak'][$message->worker_id] = $this->threads_memory['current'][$message->worker_id]; + } + + // update ProgressBar memory report + $this->progressBar->setMessage($this->getMemoryUsage(), 'threads_memory'); + break; + + case ProgressBarActionMessage::class: + // redirect action to ProgressBar instance + $this->progressBar->{$message->action}(...$message->args); + if ($message->action === 'advance') { + // count processed item + $this->items[ time() ] = ($this->items[ time() ] ?? 0) + 1; + // update ProgressBar items per second report + $this->progressBar->setMessage($this->getItemsPerSecond(), 'items_per_second'); + } + break; + + default: + throw new RuntimeException(sprintf('Unsupported message type: %s', $message_class)); + } + + } + + // end progress bar + $this->progressBar->finish(); + + return true; + } + + private function registerWorker(int $steps = 0): void { + // check if ProgressBar isn't already started + if ( !$this->progressBarStarted) { + // start Worker ProgressBar + $this->progressBar->start($steps); + } else { + // update steps + $this->progressBar->setMaxSteps($steps); + } + } + + private function getMemoryUsage(): string { + // main memory used + $main = Helper::formatMemory($this->threads_memory['current'][0]); + // total memory used (sum of all threads) + $total = Helper::formatMemory($total_raw = array_sum($this->threads_memory['current'])); + // average of each thread + $average = Helper::formatMemory((int) ($total_raw / (($count = count($this->threads_memory['current']) - 1) > 0 ? $count : 1))); + // peak memory usage + $peak = Helper::formatMemory(array_sum($this->threads_memory['peak'])); + + return "$main, threads: {$count}x ~$average, Σ $total ↑ $peak"; + } + + private function getItemsPerSecond(): string { + // check for empty list + if (empty($this->items)) return '0'; + + // keep only last 15s for average + $this->items = array_slice($this->items, -15, preserve_keys: true); + + // return the average of items processed per second + return '~'.number_format(floor(array_sum($this->items) / count($this->items) * 100) / 100, 2); + } + +} diff --git a/src/Parallel/Internals/RegisteredWorker.php b/src/Parallel/Internals/RegisteredWorker.php index 42f260e..72a2d3f 100644 --- a/src/Parallel/Internals/RegisteredWorker.php +++ b/src/Parallel/Internals/RegisteredWorker.php @@ -3,15 +3,42 @@ namespace HDSSolutions\Console\Parallel\Internals; use Closure; +use HDSSolutions\Console\Parallel\Scheduler; final class RegisteredWorker { + /** + * @var bool Flag to identify if this Worker has a ProgressBar + */ + private bool $with_progress = false; + public function __construct( + private int $identifier, private string $worker_class, private ?Closure $closure = null, private array $args = [], ) {} + public function getIdentifier(): int { + return $this->identifier; + } + + /** + * Enables a ProgressBar for the worker + * + * @param bool $with_progress Flag to enable/disable the ProgressBar + */ + public function withProgress(bool $with_progress = true, int $steps = 0): void { + if (false === $this->with_progress = $with_progress) return; + + // enable ProgressBar thread + Scheduler::registerWorkerWithProgressBar($this, $steps); + } + + public function hasProgressEnabled(): bool { + return $this->with_progress; + } + public function getWorkerClass(): string { return $this->worker_class; } diff --git a/src/Parallel/ParallelWorker.php b/src/Parallel/ParallelWorker.php index 8eb41ea..f27fbe1 100644 --- a/src/Parallel/ParallelWorker.php +++ b/src/Parallel/ParallelWorker.php @@ -2,6 +2,9 @@ namespace HDSSolutions\Console\Parallel; +use HDSSolutions\Console\Parallel\Internals\Messages\ProgressBarActionMessage; +use HDSSolutions\Console\Parallel\Internals\Messages\StatsReportMessage; +use parallel\Channel; use RuntimeException; use Throwable; @@ -13,6 +16,16 @@ abstract class ParallelWorker implements Contracts\ParallelWorker { */ private int $state = self::STATE_New; + /** + * @var string Worker Identifier + */ + private string $identifier; + + /** + * @var Channel|null Channel of communication between Task and ProgressBar + */ + private ?Channel $progressBarChannel = null; + /** * @var float Time when process started */ @@ -32,6 +45,15 @@ final public function getState(): int { return $this->state; } + final public function connectProgressBar(string $uuid, string $identifier): bool { + // store worker identifier + $this->identifier = $identifier; + // connect to channel + $this->progressBarChannel = Channel::open(sprintf('progress-bar@%s', $uuid)); + + return true; + } + final public function start(...$args): void { if ($this->state !== self::STATE_New) { throw new RuntimeException('This Worker has been already started'); @@ -54,6 +76,26 @@ final public function start(...$args): void { */ abstract protected function process(): mixed; + final public function setMessage(string $message, string $name = 'message'): void { + $this->newProgressBarAction(__FUNCTION__, $message, $name); + } + + final public function advance(int $steps = 1): void { + $this->newProgressBarAction(__FUNCTION__, $steps); + } + + final public function setProgress(int $step): void { + $this->newProgressBarAction(__FUNCTION__, $step); + } + + final public function display(): void { + $this->newProgressBarAction(__FUNCTION__); + } + + final public function clear(): void { + $this->newProgressBarAction(__FUNCTION__); + } + final public function getStartedAt(): ?float { return $this->started_at ?? null; } @@ -70,4 +112,15 @@ final public function getProcessedTask(): ProcessedTask { return new ProcessedTask(get_class($this), $this->result); } + private function newProgressBarAction(string $action, ...$args): void { + $this->progressBarChannel->send(new StatsReportMessage( + worker_id: $this->identifier, + memory_usage: memory_get_usage(), + )); + $this->progressBarChannel->send(new ProgressBarActionMessage( + action: $action, + args: $args, + )); + } + } diff --git a/src/Parallel/Scheduler.php b/src/Parallel/Scheduler.php index 6ebeea2..6368f01 100644 --- a/src/Parallel/Scheduler.php +++ b/src/Parallel/Scheduler.php @@ -5,13 +5,18 @@ use Closure; use Exception; use Generator; +use HDSSolutions\Console\Parallel\Internals\Messages\ProgressBarRegistrationMessage; use HDSSolutions\Console\Parallel\Internals\PendingTask; +use HDSSolutions\Console\Parallel\Internals\ProgressBarWorker; use HDSSolutions\Console\Parallel\Internals\RegisteredWorker; use HDSSolutions\Console\Parallel\Internals\Worker; use parallel\Channel; +use parallel\Events\Event\Type; use parallel\Future; use parallel\Runtime; use RuntimeException; +use Symfony\Component\Console\Helper\ProgressBar; +use Symfony\Component\Console\Output\ConsoleOutput; use Throwable; use function parallel\run; @@ -41,11 +46,26 @@ final class Scheduler { /** @var ?int Max CPU usage count */ private ?int $max_cpu_count = null; + /** + * @var ProgressBar|null ProgressBar instance for non-threaded Tasks execution + */ + private ?ProgressBar $progressBar = null; + + /** + * @var Future|null Thread controlling the ProgressBar + */ + private ?Future $progressBarThread = null; + + /** + * @var Channel|null Channel of communication between ProgressBar and Tasks + */ + private ?Channel $progressBarChannel = null; + /** * Disable public constructor, usage only available through singleton instance */ private function __construct() { - $this->uuid = uniqid(self::class, true); + $this->uuid = substr(md5(uniqid(self::class, true)), 0, 16); } /** @@ -55,6 +75,73 @@ private static function instance(): self { return self::$instance ??= new self(); } + public static function registerWorkerWithProgressBar(RegisteredWorker $registered_worker, int $steps = 0): void { + self::instance()->initProgressBar(); + + // register Worker ProgressBar + self::instance()->progressBarChannel?->send(new ProgressBarRegistrationMessage( + worker: $registered_worker->getWorkerClass(), + steps: $steps, + )); + } + + private function initProgressBar(): void { + // init ProgressBar only if not already working + if ($this->progressBar !== null || $this->progressBarThread !== null) return; + + // start a normal ProgressBar if parallel isn't available (non-threaded) + if ( !extension_loaded('parallel')) { + // TODO non-threaded + $this->progressBar = $this->createProgressBarInstance(); + return; + } + + // create a channel of communication between ProgressBar and Tasks + $this->progressBarChannel = Channel::make(sprintf('progress-bar@%s', $this->uuid)); + + // main thread memory reporter + // FIXME this closure is copied and runs inside a thread, so memory report isn't accurate + $main_memory_usage = static fn() => memory_get_usage(); + + // decouple progress bar to a separated thread + $this->progressBarThread = run(static function(string $uuid, Closure $createProgressBarInstance, Closure $main_memory_usage): void { + // create ProgressBar worker instance + $progressBarWorker = new ProgressBarWorker($uuid); + // start ProgressBar + $progressBarWorker->start($createProgressBarInstance, $main_memory_usage); + + }, [ + // send UUID for starter channel + $this->uuid, + // send ProgressBar creator + fn() => $this->createProgressBarInstance(), + // send main memory usage reporter + $main_memory_usage, + ]); + + // wait for ProgressBar thread to start + if ($this->progressBarChannel->recv() !== true) { + throw new RuntimeException('Failed to start ProgressBar'); + } + } + + private function createProgressBarInstance(): ProgressBar { + $progressBar = new ProgressBar(new ConsoleOutput()); + + // set initial parameters + $progressBar->setBarWidth( 80 ); + $progressBar->setRedrawFrequency( 100 ); + $progressBar->minSecondsBetweenRedraws( 0.1 ); + $progressBar->maxSecondsBetweenRedraws( 0.2 ); + $progressBar->setFormat(" %current% of %max%: %message%\n". + " [%bar%] %percent:3s%%\n". + " elapsed: %elapsed:6s%, remaining: %remaining:-6s%, %items_per_second% items/s\n". + " memory: %threads_memory%\n"); + $progressBar->setMessage('Starting...'); + + return $progressBar; + } + /** * Register a worker class to process tasks * @@ -66,6 +153,7 @@ private static function instance(): self { public static function using(string | Closure $worker, ...$args): RegisteredWorker { // convert Closure to ParallelWorker instance self::instance()->registered_workers[] = $registered_worker = new RegisteredWorker( + identifier: count(self::instance()->registered_workers), worker_class: is_string($worker) ? $worker : Worker::class, closure: $worker instanceof Closure ? $worker : null, args: $args, @@ -158,21 +246,6 @@ public static function stop(bool $force = true): void { // wait for all tasks to finish while ( !empty(array_filter(self::instance()->futures, static fn(Future $future): bool => !$future->done()))) usleep(10_000); - // close channels - self::instance()->starter?->close(); - self::instance()->starter = null; - } - - /** - * Ensures that everything gets closed - */ - public static function disconnect(): void { - // check if extension is loaded - if ( !extension_loaded('parallel')) return; - // kill all running threads - while ($task = array_shift(self::instance()->futures)) try { $task->cancel(); } catch (Exception) {} - // task start watcher - try { self::instance()->starter?->close(); } catch (Channel\Error\Closed) {} } private function runNextTask(): void { @@ -188,19 +261,29 @@ private function runNextTask(): void { if (extension_loaded('parallel')) { // parallel available, process task inside a thread $this->futures[] = run(static function(string $uuid, PendingTask $pending_task): ProcessedTask { - // notify that thread started - Channel::open(sprintf('starter@%s', $uuid))->send(true); - + // get registered worker + $registered_worker = $pending_task->getRegisteredWorker(); // get Worker class to instantiate - $worker_class = $pending_task->getRegisteredWorker()->getWorkerClass(); + $worker_class = $registered_worker->getWorkerClass(); + /** @var ParallelWorker $worker Instance of the Worker */ - $worker = new $worker_class(...$pending_task->getRegisteredWorker()->getArgs()); + $worker = new $worker_class(...$registered_worker->getArgs()); // build task params $params = $worker instanceof Worker // process task using local Worker - ? [ $pending_task->getRegisteredWorker()->getClosure(), ...$pending_task->getData() ] + ? [ $registered_worker->getClosure(), ...$pending_task->getData() ] // process task using user Worker : [ ...$pending_task->getData() ]; + + // check if worker has ProgressBar enabled + if ($registered_worker->hasProgressEnabled()) { + // connect worker to ProgressBar + $worker->connectProgressBar($uuid, $GLOBALS['worker_thread_id'] ??= sprintf('%s@%s', $uuid, substr(md5(uniqid($worker_class, true)), 0, 16))); + } + + // notify that thread started + Channel::open(sprintf('starter@%s', $uuid))->send(true); + // process task $worker->start(...$params); @@ -226,7 +309,9 @@ private function runNextTask(): void { } // wait for thread to start - $this->starter?->recv(); + if ($this->starter?->recv() !== true) { + throw new RuntimeException('Failed to start Task'); + } } } @@ -259,6 +344,37 @@ private function cleanFinishedTasks(): void { foreach ($finished_tasks as $idx) unset($this->futures[$idx]); } + /** + * Ensures that everything gets closed + */ + public static function disconnect(): void { + // check if extension is loaded + if ( !extension_loaded('parallel')) return; + + try { + // send message to ProgressBar thread to stop execution + self::instance()->progressBarChannel?->send(Type::Close); + // wait progress thread to finish + self::instance()->progressBarThread?->value(); + // close ProgressBar communication channel + self::instance()->progressBarChannel?->close(); + + self::instance()->progressBarChannel = null; + self::instance()->progressBarThread = null; + + } catch (Channel\Error\Closed | Throwable) {} + + // kill all running threads + while ($task = array_shift(self::instance()->futures)) try { $task->cancel(); } catch (Exception) {} + + try { + // task start watcher + self::instance()->starter?->close(); + self::instance()->starter = null; + + } catch (Channel\Error\Closed) {} + } + public function __destruct() { // ensure that we execute disconnect self::disconnect(); From 70ff2af680c55625ab85cf0582132a7b47b4492f Mon Sep 17 00:00:00 2001 From: "Hermann D. Schimpf" Date: Tue, 14 Mar 2023 10:36:05 -0300 Subject: [PATCH 10/12] Tests updated with ProgressBar --- tests/ParallelTest.php | 21 +++++++++++++++------ tests/Workers/AnotherWorker.php | 8 ++++++-- tests/Workers/TestWorker.php | 8 ++++++-- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/tests/ParallelTest.php b/tests/ParallelTest.php index fa3e2d8..3091ef3 100644 --- a/tests/ParallelTest.php +++ b/tests/ParallelTest.php @@ -36,7 +36,7 @@ public function testThatWorkerMustBeDefinedValidates(): void { */ public function testThatClosureCanBeUsedAsWorker(): void { Scheduler::using(static function($input) { - usleep(random_int(100, 500) * 1000); + usleep(random_int(100, 500) * 1000); return $input * 2; }); @@ -64,15 +64,23 @@ public function testParallel(): void { Workers\TestWorker::class, Workers\AnotherWorker::class, ]; - $tasks = []; $multipliers = [ 2, 4, 8 ]; + // build example "tasks" + $tasks = []; + $total = 0; foreach ($workers as $idx => $worker) { - // register worker - Scheduler::using($worker, $multipliers); - // build example "tasks" $tasks[$worker] = range(($idx + 1) * 100, ($idx + 1) * 100 + 25); + $total += count($tasks[$worker]); + } + + foreach ($workers as $worker) { + // register worker + Scheduler::using($worker, $multipliers) + // init progress bar for worker + ->withProgress(steps: $total); + // run example tasks foreach ($tasks[$worker] as $task) { try { Scheduler::runTask($task); @@ -85,9 +93,10 @@ public function testParallel(): void { $results = []; // fetch processed tasks and store their results foreach (Scheduler::getProcessedTasks() as $processed_task) { + $result = $processed_task->getResult(); echo sprintf("Task result from #%s => %u\n", $worker_class = $processed_task->getWorkerClass(), - $result = $processed_task->getResult()); + $result[1]); $results[$worker_class][] = $result; } diff --git a/tests/Workers/AnotherWorker.php b/tests/Workers/AnotherWorker.php index 80bea78..54e8b2a 100644 --- a/tests/Workers/AnotherWorker.php +++ b/tests/Workers/AnotherWorker.php @@ -11,10 +11,14 @@ public function __construct( ) {} protected function process(int $number = 0): array { - $microseconds = random_int(100, 500); - echo sprintf("AnotherWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds); + $microseconds = random_int(100, 1000); + $this->setMessage(sprintf("AnotherWorker >> Hello from task #%u, I'll wait %sms", $number, $microseconds)); + usleep($microseconds * 1000); + $this->setMessage(sprintf('Hey! I finished waiting %sms from task #%u!', $microseconds, $number)); + $this->advance(); + return [ $number, $number * array_product($this->multipliers) ]; } diff --git a/tests/Workers/TestWorker.php b/tests/Workers/TestWorker.php index a0c929d..655149a 100644 --- a/tests/Workers/TestWorker.php +++ b/tests/Workers/TestWorker.php @@ -11,10 +11,14 @@ public function __construct( ) {} protected function process(int $number = 0): array { - $microseconds = random_int(100, 500); - echo sprintf("TestWorker >> Hello from task #%u, I'll wait %sms\n", $number, $microseconds); + $microseconds = random_int(100, 1000); + $this->setMessage(sprintf("TestWorker >> Hello from task #%u, I'll wait %sms", $number, $microseconds)); + usleep($microseconds * 1000); + $this->setMessage(sprintf('Hey! I finished waiting %sms from task #%u!', $microseconds, $number)); + $this->advance(); + return [ $number, $number * array_product($this->multipliers) ]; } From 32aa66c85ae0da3ce5da52d26dc9888270a80f40 Mon Sep 17 00:00:00 2001 From: "Hermann D. Schimpf" Date: Tue, 14 Mar 2023 11:13:30 -0300 Subject: [PATCH 11/12] Non-threaded ProgressBar --- src/Parallel/Internals/ProgressBarWorker.php | 2 + src/Parallel/ParallelWorker.php | 39 +++++-- src/Parallel/Scheduler.php | 102 +++++++++++++++++-- 3 files changed, 125 insertions(+), 18 deletions(-) diff --git a/src/Parallel/Internals/ProgressBarWorker.php b/src/Parallel/Internals/ProgressBarWorker.php index 4506eae..440bdf7 100644 --- a/src/Parallel/Internals/ProgressBarWorker.php +++ b/src/Parallel/Internals/ProgressBarWorker.php @@ -114,6 +114,8 @@ private function registerWorker(int $steps = 0): void { if ( !$this->progressBarStarted) { // start Worker ProgressBar $this->progressBar->start($steps); + $this->progressBarStarted = true; + } else { // update steps $this->progressBar->setMaxSteps($steps); diff --git a/src/Parallel/ParallelWorker.php b/src/Parallel/ParallelWorker.php index f27fbe1..272b022 100644 --- a/src/Parallel/ParallelWorker.php +++ b/src/Parallel/ParallelWorker.php @@ -2,6 +2,7 @@ namespace HDSSolutions\Console\Parallel; +use Closure; use HDSSolutions\Console\Parallel\Internals\Messages\ProgressBarActionMessage; use HDSSolutions\Console\Parallel\Internals\Messages\StatsReportMessage; use parallel\Channel; @@ -22,9 +23,9 @@ abstract class ParallelWorker implements Contracts\ParallelWorker { private string $identifier; /** - * @var Channel|null Channel of communication between Task and ProgressBar + * @var Channel|Closure|null Channel of communication between Task and ProgressBar */ - private ?Channel $progressBarChannel = null; + private Channel | Closure | null $progressBarChannel = null; /** * @var float Time when process started @@ -45,7 +46,13 @@ final public function getState(): int { return $this->state; } - final public function connectProgressBar(string $uuid, string $identifier): bool { + final public function connectProgressBar(string | Closure $uuid, string $identifier = null): bool { + if ( !extension_loaded('parallel')) { + $this->progressBarChannel = $uuid; + + return true; + } + // store worker identifier $this->identifier = $identifier; // connect to channel @@ -113,14 +120,24 @@ final public function getProcessedTask(): ProcessedTask { } private function newProgressBarAction(string $action, ...$args): void { - $this->progressBarChannel->send(new StatsReportMessage( - worker_id: $this->identifier, - memory_usage: memory_get_usage(), - )); - $this->progressBarChannel->send(new ProgressBarActionMessage( - action: $action, - args: $args, - )); + // check if parallel is available + if (extension_loaded('parallel')) { + // report memory usage + $this->progressBarChannel->send(new StatsReportMessage( + worker_id: $this->identifier, + memory_usage: memory_get_usage(), + )); + // request ProgressBar action + $this->progressBarChannel->send(new ProgressBarActionMessage( + action: $action, + args: $args, + )); + + return; + } + + // redirect action to ProgressBar executor + ($this->progressBarChannel)($action, $args); } } diff --git a/src/Parallel/Scheduler.php b/src/Parallel/Scheduler.php index 6368f01..38a9b21 100644 --- a/src/Parallel/Scheduler.php +++ b/src/Parallel/Scheduler.php @@ -15,6 +15,7 @@ use parallel\Future; use parallel\Runtime; use RuntimeException; +use Symfony\Component\Console\Helper\Helper; use Symfony\Component\Console\Helper\ProgressBar; use Symfony\Component\Console\Output\ConsoleOutput; use Throwable; @@ -51,6 +52,21 @@ final class Scheduler { */ private ?ProgressBar $progressBar = null; + /** + * @var bool Flag to identify if ProgressBar is already started (non-threaded) + */ + private bool $progressBarStarted = false; + + /** + * @var array Memory usage stats (non-threaded) + */ + private array $memory_stats = [ 'current' => 0, 'peak' => 0 ]; + + /** + * @var array Total of items processed per second (non-threaded) + */ + private array $items = []; + /** * @var Future|null Thread controlling the ProgressBar */ @@ -78,6 +94,19 @@ private static function instance(): self { public static function registerWorkerWithProgressBar(RegisteredWorker $registered_worker, int $steps = 0): void { self::instance()->initProgressBar(); + if ( !extension_loaded('parallel')) { + // check if ProgressBar isn't already started + if ( !self::instance()->progressBarStarted) { + // start ProgressBar + self::instance()->progressBar->start($steps); + self::instance()->progressBarStarted = true; + + } else { + // update steps + self::instance()->progressBar->setMaxSteps($steps); + } + } + // register Worker ProgressBar self::instance()->progressBarChannel?->send(new ProgressBarRegistrationMessage( worker: $registered_worker->getWorkerClass(), @@ -91,7 +120,7 @@ private function initProgressBar(): void { // start a normal ProgressBar if parallel isn't available (non-threaded) if ( !extension_loaded('parallel')) { - // TODO non-threaded + // create a non-threaded ProgressBar instance $this->progressBar = $this->createProgressBarInstance(); return; } @@ -128,16 +157,19 @@ private function initProgressBar(): void { private function createProgressBarInstance(): ProgressBar { $progressBar = new ProgressBar(new ConsoleOutput()); - // set initial parameters + // configure ProgressBar settings $progressBar->setBarWidth( 80 ); $progressBar->setRedrawFrequency( 100 ); $progressBar->minSecondsBetweenRedraws( 0.1 ); $progressBar->maxSecondsBetweenRedraws( 0.2 ); $progressBar->setFormat(" %current% of %max%: %message%\n". " [%bar%] %percent:3s%%\n". - " elapsed: %elapsed:6s%, remaining: %remaining:-6s%, %items_per_second% items/s\n". + " elapsed: %elapsed:6s%, remaining: %remaining:-6s%, %items_per_second% items/s".(extension_loaded('parallel') ? "\n" : ','). " memory: %threads_memory%\n"); + // set initial values $progressBar->setMessage('Starting...'); + $progressBar->setMessage('??', 'items_per_second'); + $progressBar->setMessage('??', 'threads_memory'); return $progressBar; } @@ -297,24 +329,80 @@ private function runNextTask(): void { ]); } else { + // get registered worker + $registered_worker = $pending_task->getRegisteredWorker(); // get Worker class to instantiate - $worker_class = $pending_task->getRegisteredWorker()->getWorkerClass(); + $worker_class = $registered_worker->getWorkerClass(); + /** @var ParallelWorker $worker Instance of the Worker */ - $worker = new $worker_class(); + $worker = new $worker_class(...$registered_worker->getArgs()); + // build task params + $params = $worker instanceof Worker + // process task using local Worker + ? [ $registered_worker->getClosure(), ...$pending_task->getData() ] + // process task using user Worker + : [ ...$pending_task->getData() ]; + + // check if worker has ProgressBar enabled + if ($registered_worker->hasProgressEnabled()) { + // connect worker to ProgressBar + $worker->connectProgressBar(function(string $action, array $args) { + // update stats + if ($action === 'advance') { + // count processed item + $this->items[ time() ] = ($this->items[ time() ] ?? 0) + 1; + } + // update ProgressBar memory usage report + $this->progressBar->setMessage($this->getMemoryUsage(), 'threads_memory'); + // update ProgressBar items per second report + $this->progressBar->setMessage($this->getItemsPerSecond(), 'items_per_second'); + + // execute progress bar action + $this->progressBar->$action(...$args); + }); + } + // process task using worker - $worker->start(...$pending_task->getData()); + $worker->start(...$params); // store Worker result $this->futures[] = $worker->getProcessedTask(); } // wait for thread to start - if ($this->starter?->recv() !== true) { + if (($this->starter?->recv() ?? true) !== true) { throw new RuntimeException('Failed to start Task'); } } } + private function getMemoryUsage(): string { + // update memory usage for this thread + $this->memory_stats['current'] = memory_get_usage(true); + // update peak memory usage + if ($this->memory_stats['current'] > $this->memory_stats['peak']) { + $this->memory_stats['peak'] = $this->memory_stats['current']; + } + + // current memory used + $main = Helper::formatMemory($this->memory_stats['current']); + // peak memory usage + $peak = Helper::formatMemory($this->memory_stats['peak']); + + return "$main, ↑ $peak"; + } + + private function getItemsPerSecond(): string { + // check for empty list + if (empty($this->items)) return '0'; + + // keep only last 15s for average + $this->items = array_slice($this->items, -15, preserve_keys: true); + + // return the average of items processed per second + return '~'.number_format(floor(array_sum($this->items) / count($this->items) * 100) / 100, 2); + } + private function getMaxCpuUsage(): int { // return configured max CPU usage return $this->max_cpu_count ??= (isset($_SERVER['PARALLEL_MAX_COUNT']) ? (int) $_SERVER['PARALLEL_MAX_COUNT'] : cpu_count( (float) ($_SERVER['PARALLEL_MAX_PERCENT'] ?? 1.0) )); From 53b04030e6b0248059d69ab9365ab04878644d7d Mon Sep 17 00:00:00 2001 From: "Hermann D. Schimpf" Date: Tue, 14 Mar 2023 11:30:04 -0300 Subject: [PATCH 12/12] Updated README with ProgressBar examples --- README.md | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/README.md b/README.md index c227638..a213e28 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,57 @@ foreach (Scheduler::getProcessedTasks() as $processed_task) { } ``` +### ProgressBar + +#### Requeriments +- `symfony/console` package +- Enable a ProgressBar for the worker calling the `withProgress()` method. + +```php +use HDSSolutions\Console\Parallel\Scheduler; + +$tasks = range(1, 10); + +$worker = new ExampleWorker(); +Scheduler::using($worker) + ->withProgress(steps: count($tasks); +``` + +#### Usage from Worker +Available methods are: +- `setMessage(string $message)` +- `advance(int $steps)` +- `setProgress(int $step)` +- `display()` +- `clear()` + +```php +use HDSSolutions\Console\Parallel\ParallelWorker; + +final class ExampleWorker extends ParallelWorker { + + protected function process(int $number = 0): int { + // example process + $microseconds = random_int(100, 500); + $this->setMessage(sprintf("ExampleWorker >> Hello from task #%u, I'll wait %sms", $number, $microseconds)); + usleep($microseconds * 1000); + $this->advance(); + // end example process + + return $number; + } + +} +``` + +#### Example output +```bash + 28 of 52: ExampleWorker >> Hello from task #123, I'll wait 604ms + [===========================================>------------------------------------] 53% + elapsed: 2 secs, remaining: 2 secs, ~13.50 items/s + memory: 562 KiB, threads: 12x ~474 KiB, Σ 5,6 MiB ↑ 5,6 MiB +``` + ## Graceful close all resources This method will close all resources used internally by the `Scheduler` instance. ```php