forked from micmorozov/yii2-gearman
-
Notifications
You must be signed in to change notification settings - Fork 0
/
GearmanComponent.php
118 lines (96 loc) · 2.91 KB
/
GearmanComponent.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
<?php
namespace micmorozov\yii2\gearman;
use Yii;
use micmorozov\yii2\gearman\Application;
use micmorozov\yii2\gearman\Dispatcher;
use micmorozov\yii2\gearman\Config;
use micmorozov\yii2\gearman\Process;
class GearmanComponent extends \yii\base\Component
{
public $servers;
public $user;
public $jobs = [];
private $_application;
private $_dispatcher;
private $_config;
private $_process;
public $stdStreams = [
'STDIN' => false,
'STDOUT' => false,
'STDERR' => false
];
public function getApplication()
{
if ($this->_application === null) {
$app = [];
foreach ($this->jobs as $name => $job) {
$job = Yii::createObject($job);
if (!($job instanceof JobInterface)) {
throw new \yii\base\InvalidConfigException('Gearman job must be instance of JobInterface.');
}
$job->setName($name);
for ($i = 0; $i < $job->count; $i++) {
$this->_process = null;
$application = new Application($name.$i, $this->getConfig(), $this->getProcess($name.$i));
$application->add($job);
$app[]=$application;
}
}
$this->_application = $app;
}
return $this->_application;
}
public function getDispatcher()
{
if ($this->_dispatcher === null) {
$this->_dispatcher = new Dispatcher($this->getConfig());
}
return $this->_dispatcher;
}
public function getConfig()
{
if ($this->_config === null) {
$servers = [];
foreach ($this->servers as $server) {
if (is_array($server) && isset($server['host'], $server['port'])) {
$servers[] = implode(Config::SERVER_PORT_SEPARATOR, [$server['host'], $server['port']]);
} else {
$servers[] = $server;
}
}
$this->_config = new Config([
'servers' => $servers,
'user' => $this->user,
'stdStreams' => $this->stdStreams
]);
}
return $this->_config;
}
public function setConfig(Config $config)
{
$this->_config = $config;
return $this;
}
/**
* @return Process
*/
public function getProcess($id)
{
if ($this->_process === null) {
$this->setProcess((new Process($this->getConfig(), $id)));
}
return $this->_process;
}
/**
* @param Process $process
* @return $this
*/
public function setProcess(Process $process)
{
if ($this->getConfig() === null && $process->getConfig() instanceof Config) {
$this->setConfig($process->getConfig());
}
$this->_process = $process;
return $this;
}
}