Skip to content

Commit

Permalink
Remove SupervisorDriver
Browse files Browse the repository at this point in the history
  • Loading branch information
luzrain committed Jul 11, 2024
1 parent e9d1103 commit c37fa3d
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 185 deletions.
3 changes: 2 additions & 1 deletion src/Internal/MasterProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use PHPUnit\Runner\ErrorException;
use Psr\Log\LoggerInterface;
use Revolt\EventLoop;
use Revolt\EventLoop\Driver\StreamSelectDriver;
use Revolt\EventLoop\Suspension;
use function Amp\Future\awaitAll;

Expand Down Expand Up @@ -157,7 +158,7 @@ private function initServer(): void
}

// Init event loop.
EventLoop::setDriver(new SupervisorDriver());
EventLoop::setDriver(new StreamSelectDriver());
EventLoop::setErrorHandler(ErrorHandler::handleException(...));
$this->suspension = EventLoop::getDriver()->getSuspension();

Expand Down
160 changes: 0 additions & 160 deletions src/Internal/SupervisorDriver.php

This file was deleted.

58 changes: 34 additions & 24 deletions src/Plugin/Supervisor/Supervisor.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,20 @@ public function start(MasterProcess $masterProcess, Suspension $suspension): voi
$this->workerRelay = new Relay($masterPipe);
$this->serverStatus->subscribeToWorkerMessages($this->workerRelay);

EventLoop::getDriver()->onChildProcessExit($this->onChildStop(...));
EventLoop::repeat(WorkerProcess::HEARTBEAT_PERIOD, fn() => $this->monitorWorkerStatus());

$this->watchChildProcesses();
$this->spawnWorkers();
}

public function stop(): Future
public function watchChildProcesses(): void
{
$this->stopFuture = new DeferredFuture();

foreach ($this->workerPool->getAlivePids() as $pid) {
\posix_kill($pid, SIGTERM);
}

if ($this->workerPool->getWorkerCount() === 0) {
$this->stopFuture->complete();
} else {
EventLoop::delay($this->stopTimeout, function (): void {
// Send SIGKILL signal to all child processes ater timeout
foreach ($this->workerPool->getAlivePids() as $pid) {
\posix_kill($pid, SIGKILL);
$worker = $this->workerPool->getWorkerByPid($pid);
$this->logger->notice(\sprintf('Worker %s[pid:%s] killed after %ss timeout', $worker->name, $pid, $this->stopTimeout));
}
$this->stopFuture->complete();
});
}
EventLoop::onSignal(SIGCHLD, function () {
while (($pid = \pcntl_wait($status, WNOHANG)) > 0) {
$exitCode = \pcntl_wexitstatus($status) ?: 0;
$this->onChildStop($pid, $exitCode);
}
});

return $this->stopFuture->getFuture();
EventLoop::repeat(WorkerProcess::HEARTBEAT_PERIOD, fn() => $this->monitorWorkerStatus());
}

private function spawnWorkers(): void
Expand Down Expand Up @@ -156,6 +141,31 @@ private function onChildStop(int $pid, int $exitCode): void
}
}

public function stop(): Future
{
$this->stopFuture = new DeferredFuture();

foreach ($this->workerPool->getAlivePids() as $pid) {
\posix_kill($pid, SIGTERM);
}

if ($this->workerPool->getWorkerCount() === 0) {
$this->stopFuture->complete();
} else {
EventLoop::delay($this->stopTimeout, function (): void {
// Send SIGKILL signal to all child processes ater timeout
foreach ($this->workerPool->getAlivePids() as $pid) {
\posix_kill($pid, SIGKILL);
$worker = $this->workerPool->getWorkerByPid($pid);
$this->logger->notice(\sprintf('Worker %s[pid:%s] killed after %ss timeout', $worker->name, $pid, $this->stopTimeout));
}
$this->stopFuture->complete();
});
}

return $this->stopFuture->getFuture();
}

public function reload(): void
{
foreach ($this->workerPool->getAlivePids() as $pid) {
Expand Down

0 comments on commit c37fa3d

Please sign in to comment.