Worker.php
15.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
<?php
namespace common\components\jqueue;
use Yii;
use common\components\jqueue\base\Queue;
use common\components\jqueue\base\Job;
use Exception;
use Throwable;
use function ignore_user_abort;
use function set_time_limit;
use function is_null;
use function pcntl_signal;
use function pcntl_alarm;
use function max;
use function memory_get_usage;
use function extension_loaded;
use function posix_kill;
use function getmypid;
use function usleep;
use function sleep;
use function json_decode;
use function version_compare;
use function mb_strpos;
/**
* Class Worker 任务处理器
* 注意: 当修改处理器的代码, 需要重启队列处理器进程(通过supervisor)才能生效
* @package common\components\jqueue
*/
class Worker
{
const DEBUG = true; // 调试开关(开启日志)
/**
* 是否终止Worker进程
* @var bool
*/
public $shouldQuit = false;
/**
* 是否收到软件终止信号
* @var bool
*/
public $signalTerm = false;
/**
* Indicates if the worker is paused.
* @var bool
*/
public $paused = false;
/**
* 当前处理任务
* @var Job
*/
public $currentJob = false;
/**
* 当前处理任务超时时间(秒)
* @var int
*/
public $currentTimeout = 55;
/**
* 统一控制日志输出
* @param $level
* @param $msg
*/
private static function Log($msg) {
if (self::DEBUG == false || YII_ENV_PROD) { // 正式服屏蔽消息日志
return ;
}
Yii::$app->dlog->debug($msg);
}
/**
* Supervisor进程工作函数
* 说明: 如果队列有任务则读取1个处理; 如果没有任务则休眠3秒(避免执行时间太短supervisor异常退出进程)
* @param Queue $queueInstance
* @param $queueName
* @param array $options
* @param null $callback
*/
public function work(Queue $queueInstance, $queueName, $options = [], $callback = null)
{
ignore_user_abort(true); // 忽略客户端关闭,PHP脚本继续执行
set_time_limit(0); // 取消PHP执行时间限制
$attempt = isset($options['attempt']) ? $options['attempt'] : 3; // 每一个任务的尝试次数
$delay = isset($options['delay']) ? $options['delay'] : 60; // 延迟多少秒把任务重新加入队列, 0为马上激活,和 $options['attempt'] 配合使用
$sleep = isset($options['sleep']) ? $options['sleep'] : 3; // 每一次任务执行空闲周期的沉睡时间
$timeout = isset($options['timeout']) ? $options['timeout'] : 55; // 任务超时关闭进程时间
$memory = isset($options['memory']) ? $options['memory'] : 128; // 占用的最大内存
// 开启进程异步信号
$this->listenForSignals();
$job = $this->getNextJob($queueInstance, $queueName);
$this->registerTimeoutHandler($job, $timeout);
if ($job instanceof Job) {
if ($attempt > 0 && $job->getAttempts() > $attempt) { // 超过最大重试次数, 删除任务
try {
if ($job->failed()) {
$job->delete();
}
} catch (Exception $e) {
$this->stopWorkerIfLostConnection($job, $e);
}
} else {
try {
// 执行任务
$job->execute($callback);
} catch (Exception $e) {
$this->stopWorkerIfLostConnection($job, $e);
if (!$job->isDeleted()) {
self::Log("work: 任务未删除进行第{$job->getAttempts()}次失败重试:{$job->getPayload()}");
$job->release($delay);
}
}
}
} else {
$this->sleep($sleep);
}
// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
// 检查是否需要终止脚本进程
$this->stopIfNecessary($job, $memory);
}
/**
* 定时任务后台监听进程(采用死循环方式实现)
* $options['attempt'] : int 队列任务失败尝试次数,0为不限制
* $options['memory'] : int 允许使用的最大内存
* $options['sleep'] : int 每次检测的时间间隔
*
* @param Queue $queueInstance 队列实例
* @param string $queueName 监听队列的名称(在pushon的时候把任务推送到哪个队列,则需要监听相应的队列才能获取任务)
* @param array $options 参数数组
* $options['attempt'] : int 队列任务失败尝试次数,0为不限制
* $options['delayAttempt'] : int 重新激活失败的任务的时间周期,0为马上激活
* $options['memory'] : int 允许使用的最大内存
* $options['sleep'] : int 每次检测的时间间隔
* @param null $callback
* @throws Exception
*/
public function listen(Queue $queueInstance, $queueName, $options = [], $callback = null)
{
ignore_user_abort(true); // 忽略客户端关闭,PHP脚本继续执行
set_time_limit(0); // 取消PHP执行时间限制
$attempt = isset($options['attempt']) ? $options['attempt'] : 10; // 每一个任务的尝试次数
$delay = isset($options['delay']) ? $options['delay'] : 60; // 延迟多少秒把任务重新加入队列, 0为马上激活,和 $options['attempt'] 配合使用
$sleep = isset($options['sleep']) ? $options['sleep'] : 3; // 每一个监听周期的沉睡时间
$timeout = isset($options['timeout']) ? $options['timeout'] : 55; // 任务超时关闭进程,比delayAttempt短几秒钟
$memory = isset($options['memory']) ? $options['memory'] : 128; // 占用的最大内存
// 开启进程异步信号
$this->listenForSignals();
while (true) {
$job = $this->getNextJob($queueInstance, $queueName);
$this->registerTimeoutHandler($job, $timeout);
if($job instanceof Job){
if($attempt > 0 && $job->getAttempts() > $attempt) { // 超过最大重试次数, 删除任务
try {
if ($job->failed()) {
$job->delete();
}
} catch (Exception $e) {
$this->stopWorkerIfLostConnection($job, $e);
}
} else {
try {
$job->execute($callback);//这里有mysql操作
} catch (Exception $e) {
$this->stopWorkerIfLostConnection($job, $e);
if (!$job->isDeleted()) {
$job->release($delay);
self::Log("listen: 任务未删除进行第{$job->getAttempts()}次失败重试:{$job->getPayload()}");
}
}
}
} else {
$this->sleep($sleep);
}
// 检查是否需要终止脚本进程
$this->stopIfNecessary($job, $memory);
}
}
/**
* 从队列中获取下一个任务
* @param Queue $queueInstance 队列实例
* @param string $queueName 监听队列的名称(在pushon的时候把任务推送到哪个队列,则需要监听相应的队列才能获取任务)
*/
protected function getNextJob(Queue $queueInstance, $queueName)
{
try {
if (!is_null($job = $queueInstance->pop($queueName))) {
return $job;
}
} catch (Exception $e) {
self::Log(getmypid() . ':出列异常:' . $e->getMessage());
$this->stopWorkerIfLostConnection($e);
} catch (Throwable $e) {
self::Log(getmypid() . ':出列异常:' . $e->getMessage());
$this->stopWorkerIfLostConnection($e);
}
}
/**
* 监控任务超时(PHP 7.1+).
* 如果执行任务的脚本超时, pcntl_alarm 将会启动并杀死当前的 work 进程。杀死进程后, work 进程将会被守护进程(Supervisor)重启,继续进行下一个任务。
* @param $job
* @param $timeout
* timeout 功能针对 PHP 7.1+ 和 pcntl PHP 扩展进行了优化。也即
* timeout 应该永远都要比[任务过期时间expire]短至少几秒钟的时间。这样就能保证任务进程总能在失败重试前就被杀死了。
* 如果你的 -timeout 选项大于 [任务过期时间expire] 配置选项,你的任务可能被执行两次。
* 参考资料:https://laravel-china.org/docs/laravel/5.5/queues/1324#delayed-dispatching
* @return void
*/
protected function registerTimeoutHandler($job, $timeout)
{
// 记录当前任务参数, 因为pcntl_signal的handler回调函数不支持传参
$this->currentJob = $job;
$this->currentTimeout = $timeout;
if ($this->supportsAsyncSignals()) {
// 注册 SIGALRM 闹钟信号的处理函数
// note: We will register a signal handler for the alarm signal so that we can kill this
// process if it is running too long because it has frozen. This uses the async
// signals supported in recent versions of PHP to accomplish it conveniently.
pcntl_signal(SIGALRM, function () {
// 将失败任务记录到数据库
$failReason = '执行任务的脚本超时, 系统自动杀死进程';
$details = '进程ID[' . getmypid() . "], 任务执行时间超过[{$this->currentTimeout}]秒, 系统闹钟上报SIGALRM异步信号";
$this->saveFailJob($this->currentJob, $failReason, $details);
self::Log(':registerTimeoutHandler: 执行任务的脚本超时自动杀死进程=>' . $failReason . ' | ' . $details);
$this->kill(1);
});
// 任务执行时间超过$timeout秒, 闹钟响起, 系统将会收到SIGALRM信号
pcntl_alarm(max($timeout, 0));
}
}
/**
* 停止进程(带检测判断)
* @param Job $job
* @param $memoryLimit
*/
protected function stopIfNecessary($job, $memoryLimit)
{
if ($this->shouldQuit) {
// 收到软件终止信号
if ($this->signalTerm) {
$failReason = '系统终止任务进程';
$details = '系统收到SIGTERM软件终止信号';
// 将失败任务记录到数据库
$this->saveFailJob($job, $failReason, $details);
}
$this->kill();
} else {
// 检查是否超过最大使用内存
$memoryUse = memory_get_usage() / 1024 / 1024;
if ($memoryUse >= $memoryLimit) {
// 将失败任务记录到数据库
$failReason = '超过队列任务最大可用内存, 退出脚本';
$details = '当前占用内存[' . $memoryUse . 'MB] | 最大可用内存[' . $memoryLimit . 'MB]';
$this->saveFailJob($job, $failReason, $details);
// 停止监听并退出脚本
$this->stop(12);
}
}
}
/**
* 杀死进程
* @param int $status
* @return void
*/
public function kill($status = 0)
{
if (extension_loaded('posix')) {
posix_kill(getmypid(), SIGKILL);
}
exit($status);
}
/**
* 休眠
* @param $seconds
*/
public function sleep($seconds)
{
if ($seconds < 1) {
usleep($seconds * 1000000);
} else {
sleep($seconds);
}
}
/**
* 停止监听并退出脚本
* @param int $status
* @return void
*/
public function stop($status = 0)
{
Yii::$app->end();
exit($status);
}
/**
* 将失败任务记录到数据库
* @param Job $job
* @param $failReason
* @param $details
*/
protected function saveFailJob($job = false, $failReason, $details)
{
if ($job) {
$payload = $job->getPayload();
$payload = json_decode($payload, true);
$payloadId = $payload['id'];
} else {
$payloadId = 'unknown_payload_id';
}
Job::handleFailJob($payloadId, $failReason, $details);
}
/**
* 终止脚本进程(当数据库链接丢失的时候)
* @param Job $job
* @param Exception $e
* @return void
*/
protected function stopWorkerIfLostConnection($job = false, $e)
{
if ($this->causedByLostConnection($e)) {
$this->shouldQuit = true;
$failReason = '数据库连接丢失=>' . $e->getMessage();
$details = $e->getTraceAsString();
} else {
$failReason = '应用程序异常=>' . $e->getMessage();
$details = $e->getTraceAsString();
}
self::Log(getmypid() . 'stopWorkerIfLostConnection: ' . $failReason . ' | ' . $details);
// 将失败任务记录到数据库
$this->saveFailJob($job, $failReason, $details);
}
/**
* 判断指定的异常是否由于数据库连接丢失造成
* @param Exception $e
* @return bool
*/
protected function causedByLostConnection(Exception $e)
{
$message = $e->getMessage();
return $this->contains($message, [
'server has gone away',
'no connection to the server',
'Lost connection',
'is dead or not enabled',
'Error while sending',
'decryption failed or bad record mac',
'server closed the connection unexpectedly',
'SSL connection has been closed unexpectedly',
'Error writing data to the connection',
'Resource deadlock avoided',
'Transaction() on null',
'child connection forced to terminate due to client_idle_limit',
'query_wait_timeout',
'reset by peer',
'Can\'t connect to',
'failed with errno=32 Broken pipe',
'Failed to read from socket'
]);
}
/**
* 启用异步信号来监听脚本进程
* 该函数用于 PHP 7.1 版本以上,用于脚本的信号处理。
* 所谓的信号处理,就是由 Process Monitor(如 Supervisor )发送并与我们的脚本进行通信的异步通知。
* @return void
*/
protected function listenForSignals()
{
if ($this->supportsAsyncSignals()) {
//self::Log(getmypid() . ':listenForSignals:开启异步通信');
pcntl_async_signals(true); // 开启异步信号
// 终止进程: 软件终止信号
pcntl_signal(SIGTERM, function () {
$this->shouldQuit = true;
$this->signalTerm = true;
});
// 终止进程: 用户定义信号2
pcntl_signal(SIGUSR2, function () {
$this->paused = true;
});
// 忽略信号: 继续执行一个停止的进程
pcntl_signal(SIGCONT, function () {
$this->paused = false;
});
}
}
/**
* 判断当前系统是否支持异步信号(PHP 7.1.0以上版本支持)
* @return bool
*/
protected function supportsAsyncSignals()
{
return version_compare(PHP_VERSION, '7.1.0') >= 0 && extension_loaded('pcntl');
}
/**
* 查找子串
* @param string $haystack
* @param string|array $needles
* @return bool
*/
public static function contains($haystack, $needles)
{
foreach ((array) $needles as $needle) {
if ($needle !== '' && mb_strpos($haystack, $needle) !== false) {
return true;
}
}
return false;
}
}