dlog->debug($msg); } /** * Supervisor进程工作函数 * 说明: 如果队列有任务则读取1个处理; 如果没有任务则休眠3秒(避免执行时间太短supervisor异常退出进程) * @param Queue $queueInstance * @param $queueName * @param array $options * @param null $callback */ public function work(Queue $queueInstance, $queueName, $options = [], $callback = null) { ignore_user_abort(true); // 忽略客户端关闭,PHP脚本继续执行 set_time_limit(0); // 取消PHP执行时间限制 $attempt = isset($options['attempt']) ? $options['attempt'] : 3; // 每一个任务的尝试次数 $delay = isset($options['delay']) ? $options['delay'] : 60; // 延迟多少秒把任务重新加入队列, 0为马上激活,和 $options['attempt'] 配合使用 $sleep = isset($options['sleep']) ? $options['sleep'] : 3; // 每一次任务执行空闲周期的沉睡时间 $timeout = isset($options['timeout']) ? $options['timeout'] : 55; // 任务超时关闭进程时间 $memory = isset($options['memory']) ? $options['memory'] : 128; // 占用的最大内存 // 开启进程异步信号 $this->listenForSignals(); $job = $this->getNextJob($queueInstance, $queueName); $this->registerTimeoutHandler($job, $timeout); if ($job instanceof Job) { if ($attempt > 0 && $job->getAttempts() > $attempt) { // 超过最大重试次数, 删除任务 try { if ($job->failed()) { $job->delete(); } } catch (Exception $e) { $this->stopWorkerIfLostConnection($job, $e); } } else { try { // 执行任务 $job->execute($callback); } catch (Exception $e) { $this->stopWorkerIfLostConnection($job, $e); if (!$job->isDeleted()) { self::Log("work: 任务未删除进行第{$job->getAttempts()}次失败重试:{$job->getPayload()}"); $job->release($delay); } } } } else { $this->sleep($sleep); } // Finally, we will check to see if we have exceeded our memory limits or if // the queue should restart based on other indications. If so, we'll stop // this worker and let whatever is "monitoring" it restart the process. // 检查是否需要终止脚本进程 $this->stopIfNecessary($job, $memory); } /** * 定时任务后台监听进程(采用死循环方式实现) * $options['attempt'] : int 队列任务失败尝试次数,0为不限制 * $options['memory'] : int 允许使用的最大内存 * $options['sleep'] : int 每次检测的时间间隔 * * @param Queue $queueInstance 队列实例 * @param string $queueName 监听队列的名称(在pushon的时候把任务推送到哪个队列,则需要监听相应的队列才能获取任务) * @param array $options 参数数组 * $options['attempt'] : int 队列任务失败尝试次数,0为不限制 * $options['delayAttempt'] : int 重新激活失败的任务的时间周期,0为马上激活 * $options['memory'] : int 允许使用的最大内存 * $options['sleep'] : int 每次检测的时间间隔 * @param null $callback * @throws Exception */ public function listen(Queue $queueInstance, $queueName, $options = [], $callback = null) { ignore_user_abort(true); // 忽略客户端关闭,PHP脚本继续执行 set_time_limit(0); // 取消PHP执行时间限制 $attempt = isset($options['attempt']) ? $options['attempt'] : 10; // 每一个任务的尝试次数 $delay = isset($options['delay']) ? $options['delay'] : 60; // 延迟多少秒把任务重新加入队列, 0为马上激活,和 $options['attempt'] 配合使用 $sleep = isset($options['sleep']) ? $options['sleep'] : 3; // 每一个监听周期的沉睡时间 $timeout = isset($options['timeout']) ? $options['timeout'] : 55; // 任务超时关闭进程,比delayAttempt短几秒钟 $memory = isset($options['memory']) ? $options['memory'] : 128; // 占用的最大内存 // 开启进程异步信号 $this->listenForSignals(); while (true) { $job = $this->getNextJob($queueInstance, $queueName); $this->registerTimeoutHandler($job, $timeout); if($job instanceof Job){ if($attempt > 0 && $job->getAttempts() > $attempt) { // 超过最大重试次数, 删除任务 try { if ($job->failed()) { $job->delete(); } } catch (Exception $e) { $this->stopWorkerIfLostConnection($job, $e); } } else { try { $job->execute($callback);//这里有mysql操作 } catch (Exception $e) { $this->stopWorkerIfLostConnection($job, $e); if (!$job->isDeleted()) { $job->release($delay); self::Log("listen: 任务未删除进行第{$job->getAttempts()}次失败重试:{$job->getPayload()}"); } } } } else { $this->sleep($sleep); } // 检查是否需要终止脚本进程 $this->stopIfNecessary($job, $memory); } } /** * 从队列中获取下一个任务 * @param Queue $queueInstance 队列实例 * @param string $queueName 监听队列的名称(在pushon的时候把任务推送到哪个队列,则需要监听相应的队列才能获取任务) */ protected function getNextJob(Queue $queueInstance, $queueName) { try { if (!is_null($job = $queueInstance->pop($queueName))) { return $job; } } catch (Exception $e) { self::Log(getmypid() . ':出列异常:' . $e->getMessage()); $this->stopWorkerIfLostConnection($e); } catch (Throwable $e) { self::Log(getmypid() . ':出列异常:' . $e->getMessage()); $this->stopWorkerIfLostConnection($e); } } /** * 监控任务超时(PHP 7.1+). * 如果执行任务的脚本超时, pcntl_alarm 将会启动并杀死当前的 work 进程。杀死进程后, work 进程将会被守护进程(Supervisor)重启,继续进行下一个任务。 * @param $job * @param $timeout * timeout 功能针对 PHP 7.1+ 和 pcntl PHP 扩展进行了优化。也即 * timeout 应该永远都要比[任务过期时间expire]短至少几秒钟的时间。这样就能保证任务进程总能在失败重试前就被杀死了。 * 如果你的 -timeout 选项大于 [任务过期时间expire] 配置选项,你的任务可能被执行两次。 * 参考资料:https://laravel-china.org/docs/laravel/5.5/queues/1324#delayed-dispatching * @return void */ protected function registerTimeoutHandler($job, $timeout) { // 记录当前任务参数, 因为pcntl_signal的handler回调函数不支持传参 $this->currentJob = $job; $this->currentTimeout = $timeout; if ($this->supportsAsyncSignals()) { // 注册 SIGALRM 闹钟信号的处理函数 // note: We will register a signal handler for the alarm signal so that we can kill this // process if it is running too long because it has frozen. This uses the async // signals supported in recent versions of PHP to accomplish it conveniently. pcntl_signal(SIGALRM, function () { // 将失败任务记录到数据库 $failReason = '执行任务的脚本超时, 系统自动杀死进程'; $details = '进程ID[' . getmypid() . "], 任务执行时间超过[{$this->currentTimeout}]秒, 系统闹钟上报SIGALRM异步信号"; $this->saveFailJob($this->currentJob, $failReason, $details); self::Log(':registerTimeoutHandler: 执行任务的脚本超时自动杀死进程=>' . $failReason . ' | ' . $details); $this->kill(1); }); // 任务执行时间超过$timeout秒, 闹钟响起, 系统将会收到SIGALRM信号 pcntl_alarm(max($timeout, 0)); } } /** * 停止进程(带检测判断) * @param Job $job * @param $memoryLimit */ protected function stopIfNecessary($job, $memoryLimit) { if ($this->shouldQuit) { // 收到软件终止信号 if ($this->signalTerm) { $failReason = '系统终止任务进程'; $details = '系统收到SIGTERM软件终止信号'; // 将失败任务记录到数据库 $this->saveFailJob($job, $failReason, $details); } $this->kill(); } else { // 检查是否超过最大使用内存 $memoryUse = memory_get_usage() / 1024 / 1024; if ($memoryUse >= $memoryLimit) { // 将失败任务记录到数据库 $failReason = '超过队列任务最大可用内存, 退出脚本'; $details = '当前占用内存[' . $memoryUse . 'MB] | 最大可用内存[' . $memoryLimit . 'MB]'; $this->saveFailJob($job, $failReason, $details); // 停止监听并退出脚本 $this->stop(12); } } } /** * 杀死进程 * @param int $status * @return void */ public function kill($status = 0) { if (extension_loaded('posix')) { posix_kill(getmypid(), SIGKILL); } exit($status); } /** * 休眠 * @param $seconds */ public function sleep($seconds) { if ($seconds < 1) { usleep($seconds * 1000000); } else { sleep($seconds); } } /** * 停止监听并退出脚本 * @param int $status * @return void */ public function stop($status = 0) { Yii::$app->end(); exit($status); } /** * 将失败任务记录到数据库 * @param Job $job * @param $failReason * @param $details */ protected function saveFailJob($job = false, $failReason, $details) { if ($job) { $payload = $job->getPayload(); $payload = json_decode($payload, true); $payloadId = $payload['id']; } else { $payloadId = 'unknown_payload_id'; } Job::handleFailJob($payloadId, $failReason, $details); } /** * 终止脚本进程(当数据库链接丢失的时候) * @param Job $job * @param Exception $e * @return void */ protected function stopWorkerIfLostConnection($job = false, $e) { if ($this->causedByLostConnection($e)) { $this->shouldQuit = true; $failReason = '数据库连接丢失=>' . $e->getMessage(); $details = $e->getTraceAsString(); } else { $failReason = '应用程序异常=>' . $e->getMessage(); $details = $e->getTraceAsString(); } self::Log(getmypid() . 'stopWorkerIfLostConnection: ' . $failReason . ' | ' . $details); // 将失败任务记录到数据库 $this->saveFailJob($job, $failReason, $details); } /** * 判断指定的异常是否由于数据库连接丢失造成 * @param Exception $e * @return bool */ protected function causedByLostConnection(Exception $e) { $message = $e->getMessage(); return $this->contains($message, [ 'server has gone away', 'no connection to the server', 'Lost connection', 'is dead or not enabled', 'Error while sending', 'decryption failed or bad record mac', 'server closed the connection unexpectedly', 'SSL connection has been closed unexpectedly', 'Error writing data to the connection', 'Resource deadlock avoided', 'Transaction() on null', 'child connection forced to terminate due to client_idle_limit', 'query_wait_timeout', 'reset by peer', 'Can\'t connect to', 'failed with errno=32 Broken pipe', 'Failed to read from socket' ]); } /** * 启用异步信号来监听脚本进程 * 该函数用于 PHP 7.1 版本以上,用于脚本的信号处理。 * 所谓的信号处理,就是由 Process Monitor(如 Supervisor )发送并与我们的脚本进行通信的异步通知。 * @return void */ protected function listenForSignals() { if ($this->supportsAsyncSignals()) { //self::Log(getmypid() . ':listenForSignals:开启异步通信'); pcntl_async_signals(true); // 开启异步信号 // 终止进程: 软件终止信号 pcntl_signal(SIGTERM, function () { $this->shouldQuit = true; $this->signalTerm = true; }); // 终止进程: 用户定义信号2 pcntl_signal(SIGUSR2, function () { $this->paused = true; }); // 忽略信号: 继续执行一个停止的进程 pcntl_signal(SIGCONT, function () { $this->paused = false; }); } } /** * 判断当前系统是否支持异步信号(PHP 7.1.0以上版本支持) * @return bool */ protected function supportsAsyncSignals() { return version_compare(PHP_VERSION, '7.1.0') >= 0 && extension_loaded('pcntl'); } /** * 查找子串 * @param string $haystack * @param string|array $needles * @return bool */ public static function contains($haystack, $needles) { foreach ((array) $needles as $needle) { if ($needle !== '' && mb_strpos($haystack, $needle) !== false) { return true; } } return false; } }