Queue.php 6.81 KB
<?php

namespace common\components\jqueue\redis;

use Yii;
use common\components\redis\IConnection;
use common\components\jqueue\helpers\ArrayHelper;
use common\components\jqueue\base\Exception;
use common\components\jqueue\base\Queue as BaseQueue;
use function is_object;
use function time;
use function is_null;
use function md5;
use function rand;

/**
 * Class Queue
 * @package common\components\jqueue\redis
 */
class Queue extends BaseQueue
{
    /**
     *'redis' => [
     *  'class' => 'common\components\redis\YIIRedisConnection', //队列使用的类
     *  'hostname' => 'localhost',
     *  'port' => 6379,
     *],
     * @var Redis
     */
    public $redis;

    /**
     * @throws Exception
     */
    public function init()
    {
        parent::init();
        if (!is_object($this->redis)) {
            $this->redis = Yii::createObject($this->redis);
            if(!$this->redis || !$this->redis instanceof IConnection){
                throw new Exception('Redis class injected must implements of "common\components\redis\IRedisConnection"');
            }
        }
    }

    /**
     * 入队列
     * @param $job
     * @param string $data
     * @param null $queue
     * @return int
     */
    protected function _push(array $data = [], $queueName = null)
    {
        $data = $this->setData($data, 'attempts', 0);
        return $this->redis->rpush($this->formatQueueName($queueName), $this->createPayload($data, $queueName));
    }

    /**
     * 延时任务入队列
     * @param $delay
     * @param $job
     * @param string $data
     * @param null $queue
     * @return int
     */
    protected function _later($delay, array $data = [], $queueName = null)
    {
        $data = $this->setData($data, 'attempts', 0);
        return $this->redis->zadd($this->formatQueueDelayedName($queueName), time() + $delay, $this->createPayload($data, $queueName));
    }

    /**
     * 出队列
     * @param null $queue
     * @return object|boolean
     * @throws \yii\base\InvalidConfigException
     */
    public function pop($queueName = null)
    {
        // 重新激活那些延迟时间已经到的任务,queues_delayed= > queues 重新放回主队列
        $this->migrateExpiredJobs($this->formatQueueDelayedName($queueName), $this->formatQueueName($queueName));

        if (!is_null($this->expire)) {
            // 重新入列那些超时仍未完成的任务 queues_reserved => queues 重新放回主队列
            $this->migrateExpiredJobs($this->formatQueueReservedName($queueName), $this->formatQueueName($queueName));
        }

        list($payload, $reservedPayload) = $this->retrieveNextJob($queueName);

        if ($reservedPayload) {
            $config = [
                'class' => 'common\components\jqueue\redis\Job',
                'queueName' => $queueName,
                'payload' => $reservedPayload,
                'queueInstance' => $this,
            ];

            return Yii::createObject($config);
        }

        return false;
    }

    /**
     * 获取队列当前任务数 = 执行队列任务数 + 等待队列任务数
     * @param null $queue
     * @return mixed
     */
    public function getJobCount($queueName = null)
    {
        return $this->redis->eval(
            LuaScripts::size(), 3, $this->formatQueueName($queueName), $this->formatQueueDelayedName($queueName), $this->formatQueueReservedName($queueName)
        );
    }

    /**
     * 将任务重新加入队列中
     * @param  string $queueName
     * @param  string $payload 这里是一个JSON数组
     * @param  int $delay
     * @return void
     */
    public function deleteAndRelease($queueName, $payload, $delay)
    {
        $this->redis->eval(
            LuaScripts::release(), 2, $this->formatQueueDelayedName($queueName), $this->formatQueueReservedName($queueName),
            $payload, time() + $delay
        );
    }

    /**
     * 给队列数据添加id和attempts字段
     * @param  string $job
     * @param  mixed $data
     * @param  string $queueName
     * @return string
     */
    protected function createPayload(array $data = [], $queueName = null)
    {
        $data = $this->setData($data, 'id', $this->getRandomId());
        return parent::createPayload($data);
    }

    /**
     * 创建一个随机串作为id
     * @param int $length
     * @return string
     */
    protected function getRandomId()
    {
        $string = md5(time() . rand(1000, 9999));
        return $string;
    }

    /**
     * 队列名称:主队列
     * @param  string|null $queueName
     * @return string
     */
    protected function formatQueueName($queueName)
    {
        return 'queues:' . ($queueName ?: $this->queueName);
    }

    /**
     * 队列名称:延迟执行的队列
     * @param  string|null $queueName
     * @return string
     */
    protected function formatQueueDelayedName($queueName)
    {
        return 'queues_delayed:' . ($queueName ?: $this->queueName);
    }

    /**
     * 队列名称:已执行过的任务
     * @param  string|null $queueName
     * @return string
     */
    protected function formatQueueReservedName($queueName)
    {
        return 'queues_reserved:' . ($queueName ?: $this->queueName);
    }

    /**
     * 从已处理集合中删除一个任务
     * @param  string $queue
     * @param  string $payload 在这里是一个JSON字符串
     * @return void
     */
    public function deleteReserved($queueName, $payload)
    {
        $this->redis->zrem($this->formatQueueReservedName($queueName), $payload);
    }

    /**
     * Migrate the delayed jobs that are ready to the regular queue.
     *
     * @param  string  $from
     * @param  string  $to
     * @return array
     */
    public function migrateExpiredJobs($from, $to)
    {
        return $this->redis->eval(
            LuaScripts::migrateExpiredJobs(), 2, $from, $to, time()
        );
    }

    /**
     * Retrieve the next job from the queue.
     *
     * @param  string  $queueName
     * @return array
     */
    protected function retrieveNextJob($queueName)
    {
        return $this->redis->eval(
            LuaScripts::pop(), 2, $this->formatQueueName($queueName), $this->formatQueueReservedName($queueName),
            time() + $this->expire
        );
    }

    /**
     * 在输入数据中添加新的字段
     * @param  array $data
     * @param  string $key
     * @param  string $value
     * @return array
     */
    protected function setData($data, $key, $value)
    {
        return ArrayHelper::set($data, $key, $value);
    }

    /**stopIfNecessary
     * 清空指定队列
     * @param null $queue
     * @return integer
     * @throws \Exception execution failed
     */
    public function flush($queueName = null)
    {
        return $this->redis->del($this->formatQueueName($queueName), $this->formatQueueDelayedName($queueName), $this->formatQueueReservedName($queueName));
    }
}