Skip to content

Commit

Permalink
feat: optimize worker code
Browse files Browse the repository at this point in the history
  • Loading branch information
bingcool committed May 31, 2024
1 parent bb8581a commit cada953
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 71 deletions.
2 changes: 1 addition & 1 deletion Test/WorkerCron/LocalOrder/LocalOrderHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public function doCronTask($cron, string $cronName)


sleep(60);
AbstractBaseWorker::getProcessInstance()->reboot(3);
//AbstractBaseWorker::getProcessInstance()->reboot(3);
var_dump("cron end");

// goApp(function() {
Expand Down
2 changes: 1 addition & 1 deletion Test/WorkerCron/conf/product_conf.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
'handler' => \Swoolefy\Worker\Cron\CronLocalProcess::class,
'worker_num' => 1, // 默认动态进程数量
'max_handle' => 100, //消费达到10000后reboot进程
'life_time' => 3600, // 每隔3600s重启进程
'life_time' => 60, // 每隔3600s重启进程
'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
'extend_data' => [],
'args' => [
Expand Down
28 changes: 14 additions & 14 deletions Test/WorkerCron/conf/schedule_conf.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

return // 定时fork进程处理任务
[
[
'process_name' => 'system-schedule-task', // 进程名称
'handler' => \Swoolefy\Worker\Cron\CronForkProcess::class,
'description' => '系统fork模式任务调度',
'worker_num' => 1, // 默认动态进程数量
'max_handle' => 100, //消费达到10000后reboot进程
'life_time' => 3600, // 每隔3600s重启进程
'limit_run_coroutine_num' => 10, // 当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
'extend_data' => [],
'args' => [
// 定时任务列表
'task_list' => \Test\Scripts\Kernel::buildScheduleTaskList()
],
],
// [
// 'process_name' => 'system-schedule-task', // 进程名称
// 'handler' => \Swoolefy\Worker\Cron\CronForkProcess::class,
// 'description' => '系统fork模式任务调度',
// 'worker_num' => 1, // 默认动态进程数量
// 'max_handle' => 100, //消费达到10000后reboot进程
// 'life_time' => 3600, // 每隔3600s重启进程
// 'limit_run_coroutine_num' => 10, // 当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
// 'extend_data' => [],
// 'args' => [
// // 定时任务列表
// 'task_list' => \Test\Scripts\Kernel::buildScheduleTaskList()
// ],
// ],
];
2 changes: 1 addition & 1 deletion Test/WorkerDaemon/PipeWorkerProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public function loopHandle()
}, $a, $b);

var_dump('start start');
sleep(120);
sleep(15);
var_dump("gggggggggggggggggggggggggg");
}

Expand Down
22 changes: 11 additions & 11 deletions Test/WorkerDaemon/conf/monitor_conf.php
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
<?php

return [
[
// 监控进程
'process_name' => 'test-monitor-worker',
'handler' => \Test\WorkerDaemon\MonitorWorkerProcess::class,
'worker_num' => 1, // 默认动态进程数量
'max_handle' => 100, //消费达到10000后reboot进程
'life_time' => 60, // 每隔3600s重启进程
'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
'extend_data' => [],
'args' => []
],
// [
// // 监控进程
// 'process_name' => 'test-monitor-worker',
// 'handler' => \Test\WorkerDaemon\MonitorWorkerProcess::class,
// 'worker_num' => 1, // 默认动态进程数量
// 'max_handle' => 100, //消费达到10000后reboot进程
// 'life_time' => 60, // 每隔3600s重启进程
// 'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
// 'extend_data' => [],
// 'args' => []
// ],
];
22 changes: 11 additions & 11 deletions Test/WorkerDaemon/conf/pipe_conf.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@
'handler' => \Test\WorkerDaemon\PipeWorkerProcess::class,
'worker_num' => 3, // 默认动态进程数量
'max_handle' => 100, //消费达到10000后reboot进程
'life_time' => 3600, // 每隔3600s重启进程
'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
'extend_data' => [],
'args' => []
],
[
'process_name' => 'tick-pipe-worker-test',
'handler' => \Test\WorkerDaemon\PipeTestWorkerProcess::class,
'worker_num' => 3, // 默认动态进程数量
'max_handle' => 100, //消费达到10000后reboot进程
'life_time' => 3600, // 每隔3600s重启进程
'life_time' => 60, // 每隔3600s重启进程
'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
'extend_data' => [],
'args' => []
],
// [
// 'process_name' => 'tick-pipe-worker-test',
// 'handler' => \Test\WorkerDaemon\PipeTestWorkerProcess::class,
// 'worker_num' => 3, // 默认动态进程数量
// 'max_handle' => 100, //消费达到10000后reboot进程
// 'life_time' => 3600, // 每隔3600s重启进程
// 'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
// 'extend_data' => [],
// 'args' => []
// ],
];
46 changes: 19 additions & 27 deletions src/Worker/AbstractBaseWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ public function __start(Process $swooleProcess)
if (!$this->handing) {
$exitFunction($timerId, $masterPid);
}else {
$this->fmtWriteInfo("Cron Process={$this->getProcessName()} is handing, pid={$this->getPid()}");
$this->fmtWriteInfo("【cron-task-handing】Cron Process={$this->getProcessName()} is handing, pid={$this->getPid()}");
}
}else if (SystemEnv::isDaemonService()) {
// daemon防止任务还在进行中,强制退出
Expand All @@ -489,7 +489,7 @@ public function __start(Process $swooleProcess)
if (!$this->useLoopHandle || ($lastTime > 1800) ) {
$exitFunction($timerId, $masterPid);
}else {
$this->fmtWriteInfo("Daemon Process={$this->getProcessName()} is handing, pid={$this->getPid()}");
$this->fmtWriteInfo("【daemon-task-handing】Daemon Process={$this->getProcessName()} is handing, pid={$this->getPid()}");
}
}else {
$exitFunction($timerId, $masterPid);
Expand All @@ -512,7 +512,7 @@ public function __start(Process $swooleProcess)
}
}

if ($this->isMasterLive() && $this->getProcessWorkerId() == 0 && $this->masterPid) {
if ($this->isMasterLive() && $this->isWorker0() && $this->masterPid && $this->isDue()) {
$this->saveMasterId($this->masterPid);
}

Expand Down Expand Up @@ -1235,17 +1235,7 @@ public function reboot(float $waitTime = 10, bool $includeDynamicProcess = true)

$channel = new Channel(1);
$timerId = \Swoole\Timer::after($waitTime * 1000, function () use ($pid) {
try {
$this->runtimeCoroutineWait($this->maxWaitTimeOfExit);
(new \Swoolefy\Core\EventApp)->registerApp(function () {
$this->onShutDown();
});
} catch (\Throwable $throwable) {
$this->onHandleException($throwable);
} finally {
// 自身进程退出
$this->kill($pid, SIGTERM);
}
$this->exitNow($pid, 5);
});

$this->rebootTimerId = $timerId;
Expand Down Expand Up @@ -1332,10 +1322,10 @@ protected function exitNow(int $pid, int $maxWaitTimeOfExit)
/**
* registerTickReboot register time reboot, will be called in init() function
*
* @param $cron_expression
* @param int|string $lifeTime
* @return void
*/
protected function registerTickReboot($cron_expression)
protected function registerTickReboot($lifeTime)
{
/**
* local模式下的定时任务模式下不能设置定时重启,否则长时间执行的任务会被kill掉,而是在回调函数注册callback闭包来判断是否达到重启时间
Expand All @@ -1345,19 +1335,21 @@ protected function registerTickReboot($cron_expression)
return;
}

if (is_numeric($cron_expression)) {
$randNum = rand(30, 60);
// daemon下使用loopHandle模式,则不注册定时重启,会在业务处理完后重启
if ($this->useLoopHandle && is_numeric($lifeTime)) {
return;
}

if (is_numeric($lifeTime)) {
$randTickTime = rand(10, 15);
$tickTime = $randTickTime * 1000;
// for Example reboot/600s after 600s reboot this process
if ($cron_expression < 120) {
$sleepTime = 60;
$tickTime = (30 + $randNum) * 1000;
} else {
$sleepTime = $cron_expression;
$tickTime = (60 + $randNum) * 1000;
if ($lifeTime < 60) {
$lifeTime = 60;
}

\Swoole\Timer::tick($tickTime, function ($timerId) use ($sleepTime) {
if (time() - $this->getStartTime() >= $sleepTime) {
\Swoole\Timer::tick($tickTime, function ($timerId) use ($lifeTime) {
if (time() >= $this->getStartTime() + $lifeTime) {
$this->reboot($this->waitTime);
\Swoole\Timer::clear($timerId);
}
Expand All @@ -1368,7 +1360,7 @@ protected function registerTickReboot($cron_expression)
// cron expression of timer to reboot this process
CrontabManager::getInstance()->addRule(
'system-register-tick-reboot',
$cron_expression,
$lifeTime,
function () use ($randSleep, $isWorkerId0) {
if(!$isWorkerId0) {
$this->reboot($this->waitTime + $randSleep);
Expand Down
8 changes: 5 additions & 3 deletions src/Worker/AbstractWorkerProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ abstract class AbstractWorkerProcess extends AbstractBaseWorker
protected $maxHandle = 10000;

/**
* @var int
* @var int|string
*/
protected $lifeTime = 3600;

Expand Down Expand Up @@ -115,8 +115,10 @@ public function run()
}

// 定时任务处理完之后,判断达到一定时间,然后重启进程
if ( (time() > $this->getStartTime() + 3600) && $this->isDue()) {
$this->reboot(5);
if (is_numeric($this->lifeTime)) {
if ( (time() > $this->getStartTime() + $this->lifeTime) && $this->isDue()) {
$this->reboot(5);
}
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions src/Worker/Cron/CronLocalProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,15 @@ function () {
return false;
}

// 定时任务处理完之后,达到一定时间,判断然后重启进程
if ( (time() > $this->getStartTime() + 3600) && $this->isDue()) {
// 定时任务处理完之后,判断达到一定时间,然后重启进程
if (!is_numeric($this->lifeTime)) {
$this->lifeTime = 3600;
}else {
if ($this->lifeTime < 60) {
$this->lifeTime = 60;
}
}
if ( (time() > $this->getStartTime() + $this->lifeTime) && $this->isDue()) {
$this->reboot(5);
}
});
Expand Down

0 comments on commit cada953

Please sign in to comment.