Skip to content

Commit

Permalink
Merge pull request #24
Browse files Browse the repository at this point in the history
Added a method to remove tasks from the queue
  • Loading branch information
hschimpf authored Mar 27, 2024
2 parents b1125d6 + e3965e1 commit 91a3854
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 18 deletions.
20 changes: 20 additions & 0 deletions src/Internals/Commands/Runner/RemoveTaskMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php declare(strict_types=1);

namespace HDSSolutions\Console\Parallel\Internals\Commands\Runner;

use HDSSolutions\Console\Parallel\Internals\Commands\ParallelCommandMessage;
use HDSSolutions\Console\Parallel\Internals\Runner;

/**
* Message sent to {@see Runner} to execute {@see Runner::removeTask()} action
*/
final class RemoveTaskMessage extends ParallelCommandMessage {

/**
* @param int $task_id
*/
public function __construct(int $task_id) {
parent::__construct('remove_task', [ $task_id ]);
}

}
34 changes: 31 additions & 3 deletions src/Internals/Runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,18 @@ protected function queueTask(array $data): int {
throw new RuntimeException('No worker is defined');
}

// get next task id
$task_id = $this->task_id++;

// register task
$this->tasks[] = $task = new Task(
identifier: count($this->tasks),
$this->tasks[$task_id] = $task = new Task(
identifier: $task_id,
worker_class: $worker->getWorkerClass(),
worker_id: $this->selected_worker,
input: $data,
);
// and put identifier on the pending tasks list
$this->pending_tasks[] = $task->getIdentifier();
$this->pending_tasks[$task_id] = $task->getIdentifier();

// if we are on a non-threaded environment,
if ( !PARALLEL_EXT_LOADED) {
Expand All @@ -119,6 +122,31 @@ protected function getTasks(): array | false {
return false;
}

protected function removeTask(int $task_id): bool {
// remove it from pending tasks
if (array_key_exists($task_id, $this->pending_tasks)) {
unset($this->pending_tasks[$task_id]);
}

// remove it from running tasks
if (array_key_exists($task_id, $this->running_tasks)) {
// stop the task if it is still running
try { $this->running_tasks[$task_id]->cancel();
} catch (Throwable) {}

unset($this->running_tasks[$task_id]);
}

// remove it from the task list
if (array_key_exists($task_id, $this->tasks)) {
unset($this->tasks[$task_id]);

return $this->send(true);
}

return $this->send(false);
}

protected function removeAllTasks(): bool {
$this->stopRunningTasks();

Expand Down
15 changes: 8 additions & 7 deletions src/Internals/Runner/ManagesTasks.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@

trait ManagesTasks {

/**
* @var Task[] Collection of tasks
*/
/** @var int Current Task ID */
private int $task_id = 0;

/** @var Task[] Collection of tasks */
private array $tasks = [];

/** @var ?Channel Channel to wait for tasks started event */
Expand All @@ -44,13 +45,13 @@ private function hasRunningTasks(): bool {

private function cleanFinishedTasks(): void {
$finished_tasks = [];
foreach ($this->running_tasks as $idx => $future) {
foreach ($this->running_tasks as $idx => $running_task) {
// check if future is already done working
if ( !PARALLEL_EXT_LOADED || $future->done()) {
if ( !PARALLEL_EXT_LOADED || $running_task->done()) {
// store the ProcessedTask
try {
// get the result of the process
[ $task_id, $result ] = PARALLEL_EXT_LOADED ? $future->value() : $future;
[ $task_id, $result ] = PARALLEL_EXT_LOADED ? $running_task->value() : $running_task;
// ignore result if Task was removed, probably through Scheduler::removeTasks()
if (!array_key_exists($task_id, $this->tasks)) continue;
// store result and update state of the Task
Expand Down Expand Up @@ -159,7 +160,7 @@ private function startNextPendingTask(): void {
$worker->start(...$params);

// store Worker result
$this->running_tasks[] = [ $task_id, $worker->getResult() ];
$this->running_tasks[$task_id] = [ $task_id, $worker->getResult() ];
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/RegisteredWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public function withProgress(bool $with_progress = true, int $steps = 0): void {
$this->with_progress = $with_progress;

// check if caller is Runner
$caller = debug_backtrace(!DEBUG_BACKTRACE_PROVIDE_OBJECT | DEBUG_BACKTRACE_IGNORE_ARGS, 2)[1];
$caller = debug_backtrace(!DEBUG_BACKTRACE_PROVIDE_OBJECT | DEBUG_BACKTRACE_IGNORE_ARGS, 2)[1] ?? null;
if (($caller['class'] ?? null) === Internals\Runner::class || !PARALLEL_EXT_LOADED) {
$this->steps = $steps;

Expand Down
23 changes: 23 additions & 0 deletions src/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ private function __construct() {
// create runner instance for non-threaded environment
: new Internals\Runner($this->uuid);

// wait a small amount of time to allow Runner to start
usleep(10_000);

// wait until Runner starts listening for events
if (PARALLEL_EXT_LOADED) $this->recv();
}
Expand Down Expand Up @@ -191,6 +194,26 @@ public static function getTasks(): Generator | array {
yield from self::instance()->runner->processMessage($message);
}

/**
* Remove a Task from the processing queue.<br/>
* **IMPORTANT**: The task will be stopped immediately if it is currently being processed.
*
* @param Task $task Task to be removed
*
* @return bool
*/
public static function removeTask(Task $task): bool {
$message = new Commands\Runner\RemoveTaskMessage($task->getIdentifier());

if (PARALLEL_EXT_LOADED) {
self::instance()->send($message);

return self::instance()->recv();
}

return self::instance()->runner->processMessage($message);
}

/**
* Remove all pending tasks from the processing queue.<br>
* Tasks that weren't processed will remain in the {@see Task::STATE_Pending} state
Expand Down
10 changes: 3 additions & 7 deletions tests/ParallelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use HDSSolutions\Console\Parallel\Internals\Worker;
use HDSSolutions\Console\Parallel\RegisteredWorker;
use HDSSolutions\Console\Parallel\Scheduler;
use HDSSolutions\Console\Tests\Workers;
use PHPUnit\Framework\TestCase;
use RuntimeException;
use Throwable;
Expand All @@ -15,13 +14,10 @@ final class ParallelTest extends TestCase {

public function testThatParallelExtensionIsAvailable(): void {
// check that ext-parallel is available
$this->assertTrue($loaded = extension_loaded('parallel'), 'Parallel extension isn\'t available');
$this->assertTrue(extension_loaded('parallel'), 'Parallel extension isn\'t available');

// check if extension is available
if ($loaded) {
// set bootstrap file
bootstrap(__DIR__.'/../vendor/autoload.php');
}
// set bootstrap file
bootstrap(__DIR__.'/../vendor/autoload.php');
}

public function testThatWorkerMustBeDefinedValidates(): void {
Expand Down

0 comments on commit 91a3854

Please sign in to comment.