diff --git a/src/Internals/Commands/Runner/RemoveTaskMessage.php b/src/Internals/Commands/Runner/RemoveTaskMessage.php new file mode 100644 index 0000000..12f5920 --- /dev/null +++ b/src/Internals/Commands/Runner/RemoveTaskMessage.php @@ -0,0 +1,20 @@ +task_id++; + // register task - $this->tasks[] = $task = new Task( - identifier: count($this->tasks), + $this->tasks[$task_id] = $task = new Task( + identifier: $task_id, worker_class: $worker->getWorkerClass(), worker_id: $this->selected_worker, input: $data, ); // and put identifier on the pending tasks list - $this->pending_tasks[] = $task->getIdentifier(); + $this->pending_tasks[$task_id] = $task->getIdentifier(); // if we are on a non-threaded environment, if ( !PARALLEL_EXT_LOADED) { @@ -119,6 +122,31 @@ protected function getTasks(): array | false { return false; } + protected function removeTask(int $task_id): bool { + // remove it from pending tasks + if (array_key_exists($task_id, $this->pending_tasks)) { + unset($this->pending_tasks[$task_id]); + } + + // remove it from running tasks + if (array_key_exists($task_id, $this->running_tasks)) { + // stop the task if it is still running + try { $this->running_tasks[$task_id]->cancel(); + } catch (Throwable) {} + + unset($this->running_tasks[$task_id]); + } + + // remove it from the task list + if (array_key_exists($task_id, $this->tasks)) { + unset($this->tasks[$task_id]); + + return $this->send(true); + } + + return $this->send(false); + } + protected function removeAllTasks(): bool { $this->stopRunningTasks(); diff --git a/src/Internals/Runner/ManagesTasks.php b/src/Internals/Runner/ManagesTasks.php index d749345..45e2552 100644 --- a/src/Internals/Runner/ManagesTasks.php +++ b/src/Internals/Runner/ManagesTasks.php @@ -15,9 +15,10 @@ trait ManagesTasks { - /** - * @var Task[] Collection of tasks - */ + /** @var int Current Task ID */ + private int $task_id = 0; + + /** @var Task[] Collection of tasks */ private array $tasks = []; /** @var ?Channel Channel to wait for tasks started event */ @@ -44,13 +45,13 @@ private function hasRunningTasks(): bool { private function cleanFinishedTasks(): void { $finished_tasks = []; - foreach ($this->running_tasks as $idx => $future) { + foreach ($this->running_tasks as $idx => $running_task) { // check if future is already done working - if ( !PARALLEL_EXT_LOADED || $future->done()) { + if ( !PARALLEL_EXT_LOADED || $running_task->done()) { // store the ProcessedTask try { // get the result of the process - [ $task_id, $result ] = PARALLEL_EXT_LOADED ? $future->value() : $future; + [ $task_id, $result ] = PARALLEL_EXT_LOADED ? $running_task->value() : $running_task; // ignore result if Task was removed, probably through Scheduler::removeTasks() if (!array_key_exists($task_id, $this->tasks)) continue; // store result and update state of the Task @@ -159,7 +160,7 @@ private function startNextPendingTask(): void { $worker->start(...$params); // store Worker result - $this->running_tasks[] = [ $task_id, $worker->getResult() ]; + $this->running_tasks[$task_id] = [ $task_id, $worker->getResult() ]; } } diff --git a/src/RegisteredWorker.php b/src/RegisteredWorker.php index 0575c41..cc43b45 100644 --- a/src/RegisteredWorker.php +++ b/src/RegisteredWorker.php @@ -41,7 +41,7 @@ public function withProgress(bool $with_progress = true, int $steps = 0): void { $this->with_progress = $with_progress; // check if caller is Runner - $caller = debug_backtrace(!DEBUG_BACKTRACE_PROVIDE_OBJECT | DEBUG_BACKTRACE_IGNORE_ARGS, 2)[1]; + $caller = debug_backtrace(!DEBUG_BACKTRACE_PROVIDE_OBJECT | DEBUG_BACKTRACE_IGNORE_ARGS, 2)[1] ?? null; if (($caller['class'] ?? null) === Internals\Runner::class || !PARALLEL_EXT_LOADED) { $this->steps = $steps; diff --git a/src/Scheduler.php b/src/Scheduler.php index c9a651f..62dca48 100644 --- a/src/Scheduler.php +++ b/src/Scheduler.php @@ -42,6 +42,9 @@ private function __construct() { // create runner instance for non-threaded environment : new Internals\Runner($this->uuid); + // wait a small amount of time to allow Runner to start + usleep(10_000); + // wait until Runner starts listening for events if (PARALLEL_EXT_LOADED) $this->recv(); } @@ -191,6 +194,26 @@ public static function getTasks(): Generator | array { yield from self::instance()->runner->processMessage($message); } + /** + * Remove a Task from the processing queue.
+ * **IMPORTANT**: The task will be stopped immediately if it is currently being processed. + * + * @param Task $task Task to be removed + * + * @return bool + */ + public static function removeTask(Task $task): bool { + $message = new Commands\Runner\RemoveTaskMessage($task->getIdentifier()); + + if (PARALLEL_EXT_LOADED) { + self::instance()->send($message); + + return self::instance()->recv(); + } + + return self::instance()->runner->processMessage($message); + } + /** * Remove all pending tasks from the processing queue.
* Tasks that weren't processed will remain in the {@see Task::STATE_Pending} state diff --git a/tests/ParallelTest.php b/tests/ParallelTest.php index ebb9e83..0da9674 100644 --- a/tests/ParallelTest.php +++ b/tests/ParallelTest.php @@ -5,7 +5,6 @@ use HDSSolutions\Console\Parallel\Internals\Worker; use HDSSolutions\Console\Parallel\RegisteredWorker; use HDSSolutions\Console\Parallel\Scheduler; -use HDSSolutions\Console\Tests\Workers; use PHPUnit\Framework\TestCase; use RuntimeException; use Throwable; @@ -15,13 +14,10 @@ final class ParallelTest extends TestCase { public function testThatParallelExtensionIsAvailable(): void { // check that ext-parallel is available - $this->assertTrue($loaded = extension_loaded('parallel'), 'Parallel extension isn\'t available'); + $this->assertTrue(extension_loaded('parallel'), 'Parallel extension isn\'t available'); - // check if extension is available - if ($loaded) { - // set bootstrap file - bootstrap(__DIR__.'/../vendor/autoload.php'); - } + // set bootstrap file + bootstrap(__DIR__.'/../vendor/autoload.php'); } public function testThatWorkerMustBeDefinedValidates(): void {