CrontabsGroupComponent.php 8.07 KB
<?php

namespace common\components\crontabs;

use yii\base\Component;
use yii\base\InvalidValueException;
use function dirname;
use function count;
use function md5;
use function popen;
use function fread;
use function pclose;

/**
 *  $repository = new CrontabRepository(new CrontabAdapter);
 *  $crontabs = file(__DIR__.DIRECTORY_SEPARATOR.'crontabs.php');
 *  foreach ($crontabs as $crontab){
 *      $repository->addJob(CrontabJob::createFromCrontabLine($crontab));
 *  }
 *  $jobs = $repository->findJobByRegex('/(.s)*\#close-order/');
 *  print_r($jobs);
 * 
 * Class CrontabsGroupComponent
 * @package common\components
 */
class CrontabsGroupComponent extends Component
{
    private $root = '/var/www/html';
    private $output = '/dev/null';//
    private $groupNameRegex = '(.s)*\#';
    private $phpYiiCommand = 'php yii';
    private $cdCommand = 'cd';
    private $crontabRepository;

    public function init()
    {
        $this->root = dirname(dirname(dirname(__DIR__))).DIRECTORY_SEPARATOR;
        $this->crontabRepository = new CrontabRepository(new CrontabAdapter);
    }

    /**
     * 通过任务命令行添加CrontabJob任务
     */
    public function addJobByCrontabLine($crontabLine)
    {
        $crontabJob = CrontabJob::createFromCrontabLine($crontabLine);
        $this->crontabRepository->addJob($crontabJob);
    }

    /**
     * 根据分组添加任务
     * @param $groupName
     * @param $route
     * @param int $groupJobsCount 一分钟内轮询多少条任务,最大60条
     * @return mixed
     */
    public function addJobByGroupName($crontabRootPath, $timeFormat, $groupName, $route, $groupJobsCount = 5)
    {
        $this->root = $crontabRootPath;

        $groupJobsCount = (int)$groupJobsCount;
        if($groupJobsCount > 60){
            throw new InvalidValueException('Max jobs count is 60,now is '.$groupJobsCount);
        }

        $times = explode(' ', $timeFormat);
        $sleepInterval = 60 / $groupJobsCount;
        for ($i = 0; $i < $groupJobsCount; $i++) {
            $crontabJob = new CrontabJob();
            if (!empty($times)) {
                $crontabJob->minutes = $times[0];
                $crontabJob->hours = $times[1];
                $crontabJob->dayOfMonth = $times[2];
                $crontabJob->months = $times[3];
                $crontabJob->dayOfWeek = $times[4];
            }

            if (0 == $i) {
                $crontabJob->taskCommandLine = $this->getJobCommandLine($groupName, $route);
            } else {
                $sleep = $sleepInterval * $i;
                $crontabJob->taskCommandLine = $this->getJobCroupCommandLine($sleep, $groupName, $route);
            }

            $crontabJob->comments = self::jobName($groupName, $i);
            $this->crontabRepository->addJob($crontabJob);
        }

        return $this->crontabRepository->persist();
    }

    /**
     * 保存所有活动任务的选项,并启动
     * @return mixed
     */
    public function persist()
    {
        return $this->crontabRepository->persist();
    }

    /**
     * @param $groupName
     * @param $route
     * @param int $groupJobsCount 一分钟内轮询多少条任务,最大60条
     * @return mixed
     */
    public function persistByGroupName($groupName, $route, $groupJobsCount = 5)
    {
        $groupJobsCount = (int)$groupJobsCount;
        if($groupJobsCount > 60){
            throw new InvalidValueException('Max jobs count is 60,now is '.$groupJobsCount);
        }

        // 清除该分组的任务
        $this->clearJobsByGroupName($groupName);

        $sleepInterval = 60 / $groupJobsCount;
        for ($i = 0; $i < $groupJobsCount; $i++) {
            $crontabJob = new CrontabJob();
            if (0 == $i) {
                $crontabJob->taskCommandLine = $this->getJobCommandLine($groupName, $route);
            } else {
                $sleep = $sleepInterval * $i;
                $crontabJob->taskCommandLine = $this->getJobCroupCommandLine($sleep, $groupName, $route);
            }
            $crontabJob->comments = self::jobName($groupName, $i);
            $this->crontabRepository->addJob($crontabJob);
        }

        return $this->crontabRepository->persist();
    }

    /**
     * @param $groupName
     */
    public function clearJobsByGroupName($groupName)
    {
        $jobs = $this->findJobsByGroupName($groupName);
        foreach ($jobs as $job){
            $this->crontabRepository->removeJob($job);
        }

        return $this->crontabRepository->persist();
    }

    /**
     * 清空定时任务
     * @return mixed
     */
    public function clearJobs()
    {
        $this->crontabRepository->clearJob();
        return $this->crontabRepository->persist();
    }

    /**
     * @param $groupName 按分组名称来查找任务
     * @return array 某分组的CrontabJob数组
     */
    public function findJobsByGroupName($groupName)
    {
        return (array)$this->crontabRepository->findJobByRegex("/{$this->groupNameRegex}{$groupName}/");
    }

    /**
     * @param $groupName 按分组名称来查找任务
     * @return array 某分组的CrontabJob数组
     */
    public function countJobsByGroupName($groupName)
    {
        return count($this->findJobsByGroupName($groupName));
    }

    /**
     * @param $groupName
     * @param $i
     * @return string
     */
    private static function jobName($groupName, $i)
    {
        return $groupName . '-' . $i;
    }

    /**
     * 启动进程的时候带上一个进程识别符
     * 返回类似格式:
     * cd /var/www/html/mk-hyk; php yii crontab/notification /var/www/html/mk-libs/wechat-notification >/dev/null 2>&1 &
     * @param $sleep
     */
    private function getJobCommandLine($groupName, $route)
    {
        // 2>&1 是把错误输出导入(合并)到标准输出流中
        $processToken = $this->getProcessToken($groupName);

        return "{$this->cdCommand} {$this->root}; {$this->phpYiiCommand} {$route} {$processToken} >{$this->output} 2>&1 &";
    }

    /**
     * 进程识别符
     * @param $groupName
     * @return string
     */
    private function getProcessToken($groupName)
    {
        return md5($this->root . $groupName);
    }

    /**
     * 返回类似格式:sleep 48; cd /var/www/html/mk-hyk/; php yii crontab/notification >/dev/null 2>&1 &
     * @param $sleep
     * @param $route
     * @return string
     */
    private function getJobCroupCommandLine($sleep, $groupNamem, $route)
    {
        return "sleep {$sleep}; " . $this->getJobCommandLine($groupNamem, $route);
    }

    /**
     * 返回已经启动的PHP进程,通过进程唯一识别符(process token)来获知定时任务启动的监听进程
     * 注意:如果定时任务启动的不是监听程序,那么该PHP进程执行完之后就会自动销毁。
     * @param $groupName 任务组名,用来识别业务
     * @param $route console控制台对应的应用程序(application route)路由
     * @return int 进程数量
     */
    public function getJobsProcessCount($groupName, $route)
    {
        $jobProcessCommand = $this->getJobProcessCommand($groupName, $route);
        $cmd = popen("ps -ef | {$jobProcessCommand} | wc -l", 'r');
        $num = fread($cmd, 512);
        pclose($cmd);
        return (int)$num;
    }

    /**
     * ps -ef | grep 'php yii worker/listen 3a34facf9ddaad970db9f13910161a4d ' | grep -v grep | grep -v '#wechat-notification' | pgrep php
     * 移除(kill)掉所有对应($groupName, $route)定时任务开启的进程
     * @param $groupName
     * @param $route
     */
    public function removeJobsProcessByGroupName($groupName, $route)
    {
        $jobProcessCommand = $this->getJobProcessCommand($groupName, $route);
        $cmd = popen("ps -ef | {$jobProcessCommand} | pgrep php | xargs kill -s 9", 'w');
        pclose($cmd);
    }

    /**
     * 进程命令:根据($groupName, $route)匹配出对应的进程
     * @param $groupName
     * @param $route
     * @return string
     */
    private function getJobProcessCommand($groupName, $route)
    {
        $processToken = $this->getProcessToken($groupName);
        return "grep '{$this->phpYiiCommand} {$route} {$processToken}'";
    }
}