Worker.php 15.9 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448
<?php

namespace common\components\jqueue;

use Yii;
use common\components\jqueue\base\Queue;
use common\components\jqueue\base\Job;
use Exception;
use Throwable;
use function ignore_user_abort;
use function set_time_limit;
use function is_null;
use function pcntl_signal;
use function pcntl_alarm;
use function max;
use function memory_get_usage;
use function extension_loaded;
use function posix_kill;
use function getmypid;
use function usleep;
use function sleep;
use function json_decode;
use function version_compare;
use function mb_strpos;

/**
 * Class Worker 任务处理器
 * 注意: 当修改处理器的代码, 需要重启队列处理器进程(通过supervisor)才能生效
 * @package common\components\jqueue
 */
class Worker
{
    const DEBUG = true; // 调试开关(开启日志)

    /**
     * 是否终止Worker进程
     * @var bool
     */
    public $shouldQuit = false;

    /**
     * 是否收到软件终止信号
     * @var bool
     */
    public $signalTerm = false;

    /**
     * Indicates if the worker is paused.
     * @var bool
     */
    public $paused = false;

    /**
     * 当前处理任务
     * @var Job
     */
    public $currentJob = false;

    /**
     * 当前处理任务超时时间(秒)
     * @var int
     */
    public $currentTimeout = 55;
    
    /**
     * 统一控制日志输出
     * @param $level
     * @param $msg
     */
    private static function Log($msg) {
        if (self::DEBUG == false || YII_ENV_PROD) { // 正式服屏蔽消息日志
            return ;
        }

        Yii::$app->dlog->debug($msg);
    }

    /**
     * Supervisor进程工作函数
     * 说明: 如果队列有任务则读取1个处理; 如果没有任务则休眠3秒(避免执行时间太短supervisor异常退出进程)
     * @param Queue $queueInstance
     * @param $queueName
     * @param array $options
     * @param null $callback
     */
    public function work(Queue $queueInstance, $queueName, $options = [], $callback = null)
    {
        ignore_user_abort(true); // 忽略客户端关闭,PHP脚本继续执行
        set_time_limit(0); // 取消PHP执行时间限制

        $attempt = isset($options['attempt']) ? $options['attempt'] : 3;    // 每一个任务的尝试次数
        $delay = isset($options['delay']) ? $options['delay'] : 60;         // 延迟多少秒把任务重新加入队列, 0为马上激活,和 $options['attempt'] 配合使用
        $sleep = isset($options['sleep']) ? $options['sleep'] : 3;          // 每一次任务执行空闲周期的沉睡时间
        $timeout = isset($options['timeout']) ? $options['timeout'] : 55;   // 任务超时关闭进程时间
        $memory = isset($options['memory']) ? $options['memory'] : 128;     // 占用的最大内存
        // 开启进程异步信号
        $this->listenForSignals();
        $job = $this->getNextJob($queueInstance, $queueName);
        $this->registerTimeoutHandler($job, $timeout);

        if ($job instanceof Job) {
            if ($attempt > 0 && $job->getAttempts() > $attempt) { // 超过最大重试次数, 删除任务
                try {
                    if ($job->failed()) {
                        $job->delete();
                    }
                } catch (Exception $e) {
                    $this->stopWorkerIfLostConnection($job, $e);
                }
            } else {
                try {
                    // 执行任务
                    $job->execute($callback);
                } catch (Exception $e) {
                    $this->stopWorkerIfLostConnection($job, $e);
                    if (!$job->isDeleted()) {
                        self::Log("work: 任务未删除进行第{$job->getAttempts()}次失败重试:{$job->getPayload()}");
                        $job->release($delay);
                    }
                }
            }
        } else {
            $this->sleep($sleep);
        }

        // Finally, we will check to see if we have exceeded our memory limits or if
        // the queue should restart based on other indications. If so, we'll stop
        // this worker and let whatever is "monitoring" it restart the process.
        // 检查是否需要终止脚本进程
        $this->stopIfNecessary($job, $memory);
    }

    /**
     * 定时任务后台监听进程(采用死循环方式实现)
     * $options['attempt'] : int 队列任务失败尝试次数,0为不限制
     * $options['memory'] : int 允许使用的最大内存
     * $options['sleep'] : int 每次检测的时间间隔
     *
     * @param Queue $queueInstance 队列实例
     * @param string $queueName 监听队列的名称(在pushon的时候把任务推送到哪个队列,则需要监听相应的队列才能获取任务)
     * @param array $options 参数数组
     *      $options['attempt'] : int 队列任务失败尝试次数,0为不限制
     *      $options['delayAttempt'] : int 重新激活失败的任务的时间周期,0为马上激活
     *      $options['memory'] : int 允许使用的最大内存
     *      $options['sleep'] : int 每次检测的时间间隔
     * @param null $callback
     * @throws Exception
     */
    public function listen(Queue $queueInstance, $queueName, $options = [], $callback = null)
    {
        ignore_user_abort(true); // 忽略客户端关闭,PHP脚本继续执行
        set_time_limit(0); // 取消PHP执行时间限制

        $attempt = isset($options['attempt']) ? $options['attempt'] : 10;   // 每一个任务的尝试次数
        $delay = isset($options['delay']) ? $options['delay'] : 60;         // 延迟多少秒把任务重新加入队列, 0为马上激活,和 $options['attempt'] 配合使用
        $sleep = isset($options['sleep']) ? $options['sleep'] : 3;          // 每一个监听周期的沉睡时间
        $timeout = isset($options['timeout']) ? $options['timeout'] : 55;   // 任务超时关闭进程,比delayAttempt短几秒钟
        $memory = isset($options['memory']) ? $options['memory'] : 128;     // 占用的最大内存

        // 开启进程异步信号
        $this->listenForSignals();

        while (true) {
            $job = $this->getNextJob($queueInstance, $queueName);
            $this->registerTimeoutHandler($job, $timeout);
            if($job instanceof Job){
                if($attempt > 0 && $job->getAttempts() > $attempt) {  // 超过最大重试次数, 删除任务
                    try {
                        if ($job->failed()) {
                            $job->delete();
                        }
                    } catch (Exception $e) {
                        $this->stopWorkerIfLostConnection($job, $e);
                    }
                } else {
                    try {
                        $job->execute($callback);//这里有mysql操作
                    } catch (Exception $e) {
                        $this->stopWorkerIfLostConnection($job, $e);
                        if (!$job->isDeleted()) {
                            $job->release($delay);
                            self::Log("listen: 任务未删除进行第{$job->getAttempts()}次失败重试:{$job->getPayload()}");
                        }
                    }
                }
            } else {
                $this->sleep($sleep);
            }

            // 检查是否需要终止脚本进程
            $this->stopIfNecessary($job, $memory);
        }
    }

    /**
     * 从队列中获取下一个任务
     * @param Queue $queueInstance 队列实例
     * @param string $queueName 监听队列的名称(在pushon的时候把任务推送到哪个队列,则需要监听相应的队列才能获取任务)
     */
    protected function getNextJob(Queue $queueInstance, $queueName)
    {
        try {
            if (!is_null($job = $queueInstance->pop($queueName))) {
                return $job;
            }
        } catch (Exception $e) {
            self::Log(getmypid() . ':出列异常:' . $e->getMessage());
            $this->stopWorkerIfLostConnection($e);
        } catch (Throwable $e) {
            self::Log(getmypid() . ':出列异常:' . $e->getMessage());
            $this->stopWorkerIfLostConnection($e);
        }
    }

    /**
     * 监控任务超时(PHP 7.1+).
     * 如果执行任务的脚本超时, pcntl_alarm 将会启动并杀死当前的 work 进程。杀死进程后, work 进程将会被守护进程(Supervisor)重启,继续进行下一个任务。
     * @param   $job
     * @param   $timeout
     * timeout 功能针对 PHP 7.1+ 和 pcntl PHP 扩展进行了优化。也即
     * timeout 应该永远都要比[任务过期时间expire]短至少几秒钟的时间。这样就能保证任务进程总能在失败重试前就被杀死了。
     * 如果你的 -timeout 选项大于 [任务过期时间expire] 配置选项,你的任务可能被执行两次。
     * 参考资料:https://laravel-china.org/docs/laravel/5.5/queues/1324#delayed-dispatching
     * @return void
     */
    protected function registerTimeoutHandler($job, $timeout)
    {
        // 记录当前任务参数, 因为pcntl_signal的handler回调函数不支持传参
        $this->currentJob = $job;
        $this->currentTimeout = $timeout;

        if ($this->supportsAsyncSignals()) {
            // 注册 SIGALRM 闹钟信号的处理函数
            // note:  We will register a signal handler for the alarm signal so that we can kill this
            // process if it is running too long because it has frozen. This uses the async
            // signals supported in recent versions of PHP to accomplish it conveniently.
            pcntl_signal(SIGALRM, function () {
                // 将失败任务记录到数据库
                $failReason = '执行任务的脚本超时, 系统自动杀死进程';
                $details = '进程ID[' . getmypid() . "], 任务执行时间超过[{$this->currentTimeout}]秒, 系统闹钟上报SIGALRM异步信号";
                $this->saveFailJob($this->currentJob, $failReason, $details);
                self::Log(':registerTimeoutHandler: 执行任务的脚本超时自动杀死进程=>' . $failReason . ' | ' .  $details);
                $this->kill(1);
            });

            // 任务执行时间超过$timeout秒, 闹钟响起, 系统将会收到SIGALRM信号
            pcntl_alarm(max($timeout, 0));
        }
    }

    /**
     * 停止进程(带检测判断)
     * @param  Job  $job
     * @param  $memoryLimit
     */
    protected function stopIfNecessary($job, $memoryLimit)
    {
        if ($this->shouldQuit) {
            // 收到软件终止信号
            if ($this->signalTerm) {
                $failReason = '系统终止任务进程';
                $details = '系统收到SIGTERM软件终止信号';
                // 将失败任务记录到数据库
                $this->saveFailJob($job, $failReason, $details);
            }

            $this->kill();
        } else {
            // 检查是否超过最大使用内存
            $memoryUse = memory_get_usage() / 1024 / 1024;
            if ($memoryUse >= $memoryLimit) {
                // 将失败任务记录到数据库
                $failReason = '超过队列任务最大可用内存, 退出脚本';
                $details = '当前占用内存[' . $memoryUse . 'MB] | 最大可用内存[' . $memoryLimit . 'MB]';
                $this->saveFailJob($job, $failReason, $details);

                // 停止监听并退出脚本
                $this->stop(12);
            }
        }
    }

    /**
     * 杀死进程
     * @param  int  $status
     * @return void
     */
    public function kill($status = 0)
    {
        if (extension_loaded('posix')) {
            posix_kill(getmypid(), SIGKILL);
        }

        exit($status);
    }

    /**
     * 休眠
     * @param $seconds
     */
    public function sleep($seconds)
    {
        if ($seconds < 1) {
            usleep($seconds * 1000000);
        } else {
            sleep($seconds);
        }
    }

    /**
     * 停止监听并退出脚本
     * @param  int  $status
     * @return void
     */
    public function stop($status = 0)
    {
        Yii::$app->end();
        exit($status);
    }

    /**
     * 将失败任务记录到数据库
     * @param Job $job
     * @param $failReason
     * @param $details
     */
    protected function saveFailJob($job = false, $failReason, $details)
    {
        if ($job) {
            $payload = $job->getPayload();
            $payload = json_decode($payload, true);
            $payloadId = $payload['id'];
        } else {
            $payloadId = 'unknown_payload_id';
        }

        Job::handleFailJob($payloadId, $failReason, $details);
    }

    /**
     * 终止脚本进程(当数据库链接丢失的时候)
     * @param  Job  $job
     * @param  Exception  $e
     * @return void
     */
    protected function stopWorkerIfLostConnection($job = false, $e)
    {
        if ($this->causedByLostConnection($e)) {
            $this->shouldQuit = true;
            $failReason = '数据库连接丢失=>' . $e->getMessage();
            $details = $e->getTraceAsString();
        } else {
            $failReason = '应用程序异常=>' . $e->getMessage();
            $details = $e->getTraceAsString();
        }

        self::Log(getmypid() . 'stopWorkerIfLostConnection: ' . $failReason . ' | ' . $details);

        // 将失败任务记录到数据库
        $this->saveFailJob($job, $failReason, $details);
    }

    /**
     * 判断指定的异常是否由于数据库连接丢失造成
     * @param  Exception  $e
     * @return bool
     */
    protected function causedByLostConnection(Exception $e)
    {
        $message = $e->getMessage();

        return $this->contains($message, [
            'server has gone away',
            'no connection to the server',
            'Lost connection',
            'is dead or not enabled',
            'Error while sending',
            'decryption failed or bad record mac',
            'server closed the connection unexpectedly',
            'SSL connection has been closed unexpectedly',
            'Error writing data to the connection',
            'Resource deadlock avoided',
            'Transaction() on null',
            'child connection forced to terminate due to client_idle_limit',
            'query_wait_timeout',
            'reset by peer',
            'Can\'t connect to',
            'failed with errno=32 Broken pipe',
            'Failed to read from socket'
        ]);
    }

    /**
     * 启用异步信号来监听脚本进程
     * 该函数用于 PHP 7.1 版本以上,用于脚本的信号处理。
     * 所谓的信号处理,就是由 Process Monitor(如 Supervisor )发送并与我们的脚本进行通信的异步通知。
     * @return void
     */
    protected function listenForSignals()
    {
        if ($this->supportsAsyncSignals()) {
            //self::Log(getmypid() . ':listenForSignals:开启异步通信');
            pcntl_async_signals(true); // 开启异步信号

            // 终止进程: 软件终止信号
            pcntl_signal(SIGTERM, function () {
                $this->shouldQuit = true;
                $this->signalTerm = true;
            });

            //  终止进程: 用户定义信号2
            pcntl_signal(SIGUSR2, function () {
                $this->paused = true;
            });

            // 忽略信号: 继续执行一个停止的进程
            pcntl_signal(SIGCONT, function () {
                $this->paused = false;
            });
        }
    }

    /**
     * 判断当前系统是否支持异步信号(PHP 7.1.0以上版本支持)
     * @return bool
     */
    protected function supportsAsyncSignals()
    {
        return version_compare(PHP_VERSION, '7.1.0') >= 0 && extension_loaded('pcntl');
    }

    /**
     * 查找子串
     * @param  string  $haystack
     * @param  string|array  $needles
     * @return bool
     */
    public static function contains($haystack, $needles)
    {
        foreach ((array) $needles as $needle) {
            if ($needle !== '' && mb_strpos($haystack, $needle) !== false) {
                return true;
            }
        }

        return false;
    }
}