Skip to content

Commit

Permalink
Optimize WorkerServiec
Browse files Browse the repository at this point in the history
  • Loading branch information
bingcool committed Mar 4, 2023
1 parent 52275a5 commit 4f074c1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 36 deletions.
33 changes: 17 additions & 16 deletions src/Worker/AbstractBaseWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ abstract class AbstractBaseWorker
* @param string $process_name
* @param bool $async
* @param array $args
* @param null $extend_data
* @param mixed $extend_data
* @param bool $enable_coroutine
* @return void
*/
Expand Down Expand Up @@ -569,7 +569,7 @@ protected function installRegisterShutdownFunction()
}

if(!in_array($error['type'], [E_NOTICE, E_WARNING]) ) {
$exception = new \Exception($errorStr, $error['type']);
$exception = new WorkerException($errorStr, $error['type']);
$this->onHandleException($exception);
}
}
Expand All @@ -579,7 +579,7 @@ protected function installRegisterShutdownFunction()
/**
* writeByProcessName worker send message to process
* @param string $process_name
* @param $data
* @param mixed $data
* @param int $process_worker_id process_worker_id=-1 all process
* @param bool $is_use_master_proxy
* @return bool
Expand Down Expand Up @@ -649,7 +649,7 @@ public function writeByProcessName(
}

/**
* writeToMasterProcess direct semd message to other process
* writeToMasterProcess direct send message to other process
* @param mixed $data
* @return bool
*/
Expand Down Expand Up @@ -684,6 +684,7 @@ public function writeToWorkerByMasterProxy(string $process_name, $data, int $pro
* @param string $dynamic_process_name
* @param int $dynamic_process_num
* @return void
* @throws WorkerException
*/
public function notifyMasterCreateDynamicProcess(string $dynamic_process_name, int $dynamic_process_num = 2)
{
Expand Down Expand Up @@ -803,7 +804,7 @@ public function isDynamicDestroy(bool $is_destroy)
/**
* @return bool
*/
public function isWorker0()
public function isWorker0(): bool
{
return $this->getProcessWorkerId() == 0;
}
Expand Down Expand Up @@ -883,7 +884,7 @@ public function setMasterPid(int $master_pid)
/**
* @return int
*/
public function getMasterPid()
public function getMasterPid(): int
{
return $this->masterPid;
}
Expand All @@ -900,7 +901,7 @@ public function setWaitTime(float $wait_time = 30)
* getWaitTime
* @return int
*/
public function getWaitTime()
public function getWaitTime(): int
{
return $this->waitTime;
}
Expand All @@ -909,7 +910,7 @@ public function getWaitTime()
* isRebooting
* @return bool
*/
public function isRebooting()
public function isRebooting(): bool
{
return $this->isReboot;
}
Expand All @@ -918,7 +919,7 @@ public function isRebooting()
* isExiting
* @return bool
*/
public function isExiting()
public function isExiting(): bool
{
return $this->isExit;
}
Expand All @@ -928,7 +929,7 @@ public function isExiting()
*
* @return bool
*/
public function isForceExit()
public function isForceExit(): bool
{
return $this->isForceExit;
}
Expand All @@ -938,7 +939,7 @@ public function isForceExit()
*
* @return bool
*/
public function isDue()
public function isDue(): bool
{
if($this->isRebooting() || $this->isForceExit() || $this->isExiting()) {
sleep(1);
Expand All @@ -952,7 +953,7 @@ public function isDue()
*
* @return bool
*/
public function isStaticProcess()
public function isStaticProcess(): bool
{
if ($this->processType == self::PROCESS_STATIC_TYPE) {
return true;
Expand All @@ -964,15 +965,15 @@ public function isStaticProcess()
*
* @return bool
*/
public function isDynamicProcess()
public function isDynamicProcess(): bool
{
return !$this->isStaticProcess();
}

/**
* @return Process
*/
public function getSwooleProcess()
public function getSwooleProcess(): Process
{
return $this->swooleProcess;
}
Expand All @@ -982,7 +983,7 @@ public function getSwooleProcess()
*
* @return int
*/
public function getProcessWorkerId()
public function getProcessWorkerId(): int
{
return $this->processWorkerId;
}
Expand All @@ -991,7 +992,7 @@ public function getProcessWorkerId()
* getPid
* @return int
*/
public function getPid()
public function getPid(): int
{
return $this->swooleProcess->pid;
}
Expand Down
35 changes: 15 additions & 20 deletions src/Worker/MainManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,8 @@ public function __construct(array $config = [], ...$args)
* @param int $process_worker_num
* @param bool $async
* @param array $args
* @param null $extend_data
* @param mixed $extend_data
* @param bool $enable_coroutine
* @throws Exception
*/
public function addProcess(
string $process_name,
Expand Down Expand Up @@ -807,7 +806,7 @@ public function saveStatusToFile(array $status = [])
* @param string $process_name
* @param int $process_num
* @return mixed
* @throws \Exception
* @throws WorkerException
*/
public function createDynamicProcess(string $process_name, int $process_num = 2)
{
Expand All @@ -821,7 +820,7 @@ public function createDynamicProcess(string $process_name, int $process_num = 2)
if ($this->processLists[$key]['dynamic_process_destroying'] ?? false) {
$msg = "【Warning】 Process name={$process_name} is exiting now,forbidden to create dynamic process, please try again after moment";
$this->writeInfo($msg);
throw new \Exception($msg);
throw new WorkerException($msg);
}

if ($process_num <= 0) {
Expand Down Expand Up @@ -850,7 +849,7 @@ public function createDynamicProcess(string $process_name, int $process_num = 2)
if ($runningProcessWorkerNum >= $totalProcessNum) {
$msg = "【Warning】 Children process num={$totalProcessNum}, achieve max_process_num,forbidden to create process";
$this->writeInfo($msg);
throw new \Exception($msg);
throw new WorkerException($msg);
}

for ($workerId = $runningProcessWorkerNum; $workerId < $totalProcessNum; $workerId++) {
Expand Down Expand Up @@ -885,9 +884,9 @@ public function createDynamicProcess(string $process_name, int $process_num = 2)
* @param string $process_name
* @param int $process_num
* @return void
* @throws \Exception
* @throws WorkerException
*/
public function destroyDynamicProcess(string $process_name, $process_num = -1)
public function destroyDynamicProcess(string $process_name, int $process_num = -1)
{
$processWorkers = $this->getProcessByName($process_name, -1);
$key = md5($process_name);
Expand All @@ -913,7 +912,7 @@ public function destroyDynamicProcess(string $process_name, $process_num = -1)
*
* @param string $process_name
* @return int
* @throws \Exception
* @throws WorkerException
*/
public function getDynamicProcessNum(string $process_name)
{
Expand Down Expand Up @@ -1105,7 +1104,6 @@ private function installReportStatus()
* @param string $process_name
* @param int $process_worker_id
* @return mixed|null
* @throws \Exception
*/
public function getProcessByName(string $process_name, int $process_worker_id = 0)
{
Expand All @@ -1115,7 +1113,7 @@ public function getProcessByName(string $process_name, int $process_worker_id =
} else if ($process_worker_id < 0) {
return $this->processWorkers[$key];
} else {
throw new RuntimeException("Missing and not found process_name={$process_name}, worker_id={$process_worker_id}");
throw new WorkerException("Missing and not found process_name={$process_name}, worker_id={$process_worker_id}");
}
}

Expand Down Expand Up @@ -1145,7 +1143,6 @@ public function getProcessByPid(int $pid)
* @param string $process_name
* @param int $process_worker_id
* @return mixed
* @throws \Exception
*/
public function getPidByName(string $process_name, int $process_worker_id)
{
Expand All @@ -1157,7 +1154,7 @@ public function getPidByName(string $process_name, int $process_worker_id)
* getProcessWorkerId
* @return int
*/
public function getMasterWorkerId()
public function getMasterWorkerId(): int
{
return $this->masterWorkerId;
}
Expand All @@ -1166,7 +1163,7 @@ public function getMasterWorkerId()
* getMasterWorkerName
* @return string
*/
public function getMasterWorkerName()
public function getMasterWorkerName(): string
{
return MainManager::MASTER_WORKER_NAME;
}
Expand All @@ -1175,7 +1172,7 @@ public function getMasterWorkerName()
* isMasterExiting
* @return bool
*/
public function isMasterExiting()
public function isMasterExiting(): bool
{
return $this->isExit;
}
Expand All @@ -1185,16 +1182,15 @@ public function isMasterExiting()
* @param mixed $data
* @param int $process_worker_id
* @return bool
* @throws \Exception
*/
public function writeByProcessName(string $process_name, $data, int $process_worker_id = 0)
{
if ($this->isMaster($process_name)) {
throw new \Exception("Master process can not write msg to master process self");
throw new WorkerException("Master process can not write msg to master process self");
}

if (!$this->isRunning()) {
throw new \Exception("Master process is not start, you can not use writeByProcessName(), please checkout it");
throw new WorkerException("Master process is not start, you can not use writeByProcessName(), please checkout it");
}

$processWorkers = [];
Expand Down Expand Up @@ -1224,7 +1220,6 @@ public function writeByProcessName(string $process_name, $data, int $process_wor
* @param string $to_process_name
* @param int $to_process_worker_id
* @return bool
* @throws \Exception
*/
public function writeByMasterProxy(
$data,
Expand Down Expand Up @@ -1274,7 +1269,7 @@ public function broadcastProcessWorker(string $process_name, $data = '')
if ($process_name) {
$key = md5($process_name);
if (!isset($this->processWorkers[$key])) {
$exception = new \Exception(sprintf(
$exception = new WorkerException(sprintf(
"%s::%s not exist process=%s, please check it",
__CLASS__,
__FUNCTION__,
Expand Down Expand Up @@ -1593,7 +1588,7 @@ private function getMaxProcessNum()
* @param bool $showAll
* @return string
*/
private function getCliParams($showAll = false)
private function getCliParams(bool $showAll = false)
{
$cliParams = '';
$workerfyCliParams = getenv('ENV_CLI_PARAMS') ? json_decode(getenv('ENV_CLI_PARAMS'), true) : [];
Expand Down

0 comments on commit 4f074c1

Please sign in to comment.