Skip to content

Commit

Permalink
Added "scheduler" command for show scheduled task list
Browse files Browse the repository at this point in the history
  • Loading branch information
luzrain committed Sep 29, 2024
1 parent 60333d2 commit eb3317c
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 7 deletions.
18 changes: 14 additions & 4 deletions src/Internal/Scheduler/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
use Luzrain\PHPStreamServer\Internal\Scheduler\Trigger\TriggerInterface;
use Luzrain\PHPStreamServer\Internal\SIGCHLDHandler;
use Luzrain\PHPStreamServer\Internal\Status;
use Luzrain\PHPStreamServer\MasterProcess;
use Luzrain\PHPStreamServer\Message\ProcessScheduledEvent;
use Luzrain\PHPStreamServer\PeriodicProcessInterface;
use Psr\Log\LoggerInterface;
use Revolt\EventLoop;
Expand All @@ -27,8 +29,10 @@ final class Scheduler
private Suspension $suspension;
private DeferredFuture|null $stopFuture = null;

public function __construct(private Status &$status)
{
public function __construct(
private readonly MasterProcess $masterProcess,
private Status &$status,
) {
$this->pool = new WorkerPool();
}

Expand Down Expand Up @@ -59,7 +63,7 @@ public function start(Suspension $suspension, LoggerInterface $logger): void

private function scheduleWorker(PeriodicProcessInterface $worker, TriggerInterface $trigger): bool
{
if ($this->status !== Status::RUNNING) {
if ($this->status === Status::SHUTDOWN) {
return false;
}

Expand All @@ -68,9 +72,15 @@ private function scheduleWorker(PeriodicProcessInterface $worker, TriggerInterfa

if ($nextRunDate !== null) {
$delay = $nextRunDate->getTimestamp() - $currentDate->getTimestamp();
EventLoop::delay($delay, fn() => $this->startWorker($worker, $trigger));
EventLoop::delay($delay, function () use($worker, $trigger): void {
$this->startWorker($worker, $trigger);
});
}

EventLoop::defer(function () use ($worker, $nextRunDate): void {
$this->masterProcess->dispatch(new ProcessScheduledEvent($worker->getId(), $nextRunDate));
});

return true;
}

Expand Down
2 changes: 1 addition & 1 deletion src/Internal/Supervisor/Supervisor.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ final class Supervisor

public function __construct(
private readonly MasterProcess $masterProcess,
private readonly int $stopTimeout,
private Status &$status,
private readonly int $stopTimeout,
) {
$this->workerPool = new WorkerPool();
}
Expand Down
60 changes: 60 additions & 0 deletions src/Internal/SystemPlugin/Command/SchedulerCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php

declare(strict_types=1);

namespace Luzrain\PHPStreamServer\Internal\SystemPlugin\Command;

use Luzrain\PHPStreamServer\Internal\Console\Command;
use Luzrain\PHPStreamServer\Internal\Console\Options;
use Luzrain\PHPStreamServer\Internal\Console\Table;
use Luzrain\PHPStreamServer\Internal\Scheduler\Trigger\TriggerFactory;
use Luzrain\PHPStreamServer\Internal\SystemPlugin\ServerStatus\PeriodicWorkerInfo;
use Luzrain\PHPStreamServer\Internal\SystemPlugin\ServerStatus\ServerStatus;

/**
* @internal
*/
final class SchedulerCommand extends Command
{
protected const COMMAND = 'scheduler';
protected const DESCRIPTION = 'Show scheduler map';

public function execute(Options $options): int
{
echo "❯ Scheduler\n";

if(!$this->masterProcess->isRunning()) {
echo " <color;bg=yellow> ! </> <color;fg=yellow>Server is not running</>\n";

return 0;
}

$status = $this->masterProcess->get(ServerStatus::class);
\assert($status instanceof ServerStatus);

if ($status->getPeriodicTasksCount() > 0) {
echo (new Table(indent: 1))
->setHeaderRow([
'User',
'Worker',
'Schedule',
'Next run',
'Status',
])
->addRows(\array_map(array: $status->getPeriodicWorkers(), callback: static fn (PeriodicWorkerInfo $w) => [
$w->user === 'root' ? $w->user : "<color;fg=gray>{$w->user}</>",
$w->name,
$w->schedule ?: '-',
$w->nextRunDate?->format('Y-m-d H:i:s') ?? '<color;fg=gray>-</>',
match(true) {
$w->nextRunDate !== null => '[<color;fg=green>OK</>]',
default => '[<color;fg=red>ERROR</>]',
},
]));
} else {
echo " <color;bg=yellow> ! </> <color;fg=yellow>There are no scheduled tasks</>\n";
}

return 0;
}
}
2 changes: 2 additions & 0 deletions src/Internal/SystemPlugin/ServerStatus/PeriodicWorkerInfo.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ final class PeriodicWorkerInfo
public function __construct(
public string $user,
public string $name,
public string $schedule,
public \DateTimeInterface|null $nextRunDate = null,
) {
}
}
6 changes: 6 additions & 0 deletions src/Internal/SystemPlugin/ServerStatus/ServerStatus.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Luzrain\PHPStreamServer\Message\ProcessDetachedEvent;
use Luzrain\PHPStreamServer\Message\ProcessExitEvent;
use Luzrain\PHPStreamServer\Message\ProcessHeartbeatEvent;
use Luzrain\PHPStreamServer\Message\ProcessScheduledEvent;
use Luzrain\PHPStreamServer\Message\ProcessSpawnedEvent;
use Luzrain\PHPStreamServer\Message\RequestCounterIncreaseEvent;
use Luzrain\PHPStreamServer\Message\RxCounterIncreaseEvent;
Expand Down Expand Up @@ -129,6 +130,10 @@ public function subscribeToWorkerMessages(MessageHandler $handler): void
$handler->subscribe(ConnectionClosedEvent::class, weakClosure(function (ConnectionClosedEvent $message): void {
unset($this->processes[$message->pid]->connections[$message->connectionId]);
}));

$handler->subscribe(ProcessScheduledEvent::class, weakClosure(function (ProcessScheduledEvent $message): void {
$this->periodicWorkers[$message->id]->nextRunDate = $message->nextRunDate;
}));
}

public function addWorker(ProcessInterface $worker): void
Expand All @@ -143,6 +148,7 @@ public function addWorker(ProcessInterface $worker): void
$this->periodicWorkers[$worker->getId()] = new PeriodicWorkerInfo(
user: $worker->getUser(),
name: $worker->getName(),
schedule: $worker->getSchedule(),
);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/Internal/SystemPlugin/System.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Luzrain\PHPStreamServer\Internal\SystemPlugin\Command\ConnectionsCommand;
use Luzrain\PHPStreamServer\Internal\SystemPlugin\Command\ProcessesCommand;
use Luzrain\PHPStreamServer\Internal\SystemPlugin\Command\ReloadCommand;
use Luzrain\PHPStreamServer\Internal\SystemPlugin\Command\SchedulerCommand;
use Luzrain\PHPStreamServer\Internal\SystemPlugin\Command\StartCommand;
use Luzrain\PHPStreamServer\Internal\SystemPlugin\Command\StatusCommand;
use Luzrain\PHPStreamServer\Internal\SystemPlugin\Command\StopCommand;
Expand Down Expand Up @@ -53,6 +54,7 @@ public function commands(): array
new WorkersCommand($this->masterProcess),
new ProcessesCommand($this->masterProcess),
new ConnectionsCommand($this->masterProcess),
new SchedulerCommand($this->masterProcess),
];
}
}
4 changes: 2 additions & 2 deletions src/MasterProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public function __construct(
$this->pidFile = $pidFile ?? \sprintf('%s/phpss%s.pid', $runDirectory, \hash('xxh32', $this->startFile));
$this->socketFile = \sprintf('%s/phpss%s.socket', $runDirectory, \hash('xxh32', $this->startFile . 'rx'));

$this->supervisor = new Supervisor($this, $stopTimeout, $this->status);
$this->scheduler = new Scheduler($this->status);
$this->supervisor = new Supervisor($this, $this->status, $stopTimeout);
$this->scheduler = new Scheduler($this, $this->status);
$this->container = new ArrayContainer();
}

Expand Down
19 changes: 19 additions & 0 deletions src/Message/ProcessScheduledEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);

namespace Luzrain\PHPStreamServer\Message;

use Luzrain\PHPStreamServer\Message;

/**
* @implements Message<void>
*/
final readonly class ProcessScheduledEvent implements Message
{
public function __construct(
public int $id,
public \DateTimeInterface|null $nextRunDate,
) {
}
}

0 comments on commit eb3317c

Please sign in to comment.