Skip to content

Commit

Permalink
feat:optimize worker daemon code
Browse files Browse the repository at this point in the history
  • Loading branch information
bingcool committed Jun 1, 2024
1 parent cada953 commit 6345b96
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Test/WorkerDaemon/PipeWorkerProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public function loopHandle()

var_dump('start start');
sleep(15);
var_dump("gggggggggggggggggggggggggg");
var_dump("end end end ");
}

// public function run()
Expand Down
4 changes: 2 additions & 2 deletions Test/WorkerDaemon/conf/pipe_conf.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
[
'process_name' => 'test-pipe-worker',
'handler' => \Test\WorkerDaemon\PipeWorkerProcess::class,
'worker_num' => 3, // 默认动态进程数量
'worker_num' => 1, // 默认动态进程数量
'max_handle' => 100, //消费达到10000后reboot进程
'life_time' => 60, // 每隔3600s重启进程
'life_time' => '* * * * *', // 每隔3600s重启进程
'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
'extend_data' => [],
'args' => []
Expand Down
13 changes: 10 additions & 3 deletions src/Worker/AbstractBaseWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ public function isDue(): bool
{
if($this->isRebooting() || $this->isForceExit() || $this->isExiting() || $this->waitToExit) {
sleep(1);
$this->fmtWriteInfo("Process Wait to Exit or Reboot");
$this->fmtWriteInfo("{$this->getProcessName()}Process Wait to Exit or Reboot,Do not something");
return false;
}
return true;
Expand Down Expand Up @@ -1322,19 +1322,26 @@ protected function exitNow(int $pid, int $maxWaitTimeOfExit)
/**
* registerTickReboot register time reboot, will be called in init() function
*
* @param int|string $lifeTime
* @return void
*/
protected function registerTickReboot($lifeTime)
protected function registerTickReboot()
{
/**
* local模式下的定时任务模式下不能设置定时重启,否则长时间执行的任务会被kill掉,而是在回调函数注册callback闭包来判断是否达到重启时间
* @see \Swoolefy\Worker\Cron\CronLocalProcess
*/
if (SystemEnv::isCronService() && $this instanceof \Swoolefy\Worker\Cron\CronLocalProcess) {
if (!is_numeric($this->lifeTime)) {
$this->lifeTime = 3600;
}else {
if ($this->lifeTime < 60) {
$this->lifeTime = 60;
}
}
return;
}

$lifeTime = $this->lifeTime;
// daemon下使用loopHandle模式,则不注册定时重启,会在业务处理完后重启
if ($this->useLoopHandle && is_numeric($lifeTime)) {
return;
Expand Down
3 changes: 2 additions & 1 deletion src/Worker/AbstractWorkerProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected function init()
$this->lifeTime = $this->getArgs()['life_time'] ?? $this->lifeTime;
$this->currentRunCoroutineLastCid = $this->getArgs()['current_run_coroutine_last_cid'] ?? $this->maxHandle * 10;
$this->limitCurrentRunCoroutineNum = $this->getArgs()['limit_run_coroutine_num'] ?? null;
$this->registerTickReboot($this->lifeTime);
$this->registerTickReboot();
$this->onInit();
}

Expand Down Expand Up @@ -98,6 +98,7 @@ public function run()
$this->useLoopHandle = true;
while (true) {
if (!$this->isDue()) {
$this->fmtWriteInfo("{$this->getProcessName()}】守护进程退出|重启中,不再处理任务");
continue;
}

Expand Down
13 changes: 4 additions & 9 deletions src/Worker/Cron/CronLocalProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ function (): bool {
}

if (!$this->isDue()) {
$this->fmtWriteInfo("{$this->getProcessName()}进程退出中,不再处理任务");
$this->fmtWriteInfo("{$this->getProcessName()}定时任务进程退出|重启中,暂时不再处理任务");
return false;
}

Expand All @@ -84,16 +84,11 @@ function () {
}

// 定时任务处理完之后,判断达到一定时间,然后重启进程
if (!is_numeric($this->lifeTime)) {
$this->lifeTime = 3600;
}else {
if ($this->lifeTime < 60) {
$this->lifeTime = 60;
if (is_numeric($this->lifeTime)) {
if ( (time() > $this->getStartTime() + $this->lifeTime) && $this->isDue()) {
$this->reboot(5);
}
}
if ( (time() > $this->getStartTime() + $this->lifeTime) && $this->isDue()) {
$this->reboot(5);
}
});
}catch (\Throwable $exception) {
$this->onHandleException($exception, $this->getArgs());
Expand Down

0 comments on commit 6345b96

Please sign in to comment.