Queue.php 3.49 KB
<?php

namespace common\components\jqueue\base;

use yii\di\ServiceLocator;
use function json_encode;
use function array_map;
use function is_array;

/**
 * Class Queue
 * @package common\components\jqueue\base
 */
abstract class Queue extends ServiceLocator
{
    /**
     * 队列默认名称
     * @var string
     */
    public $queueName = 'default';

    /**
     * 队列允许最大任务数量,0代表不限制
     * @var int
     */
    public $maxJob = 0;

    /**
     * 任务过期时间(秒),重试的次数达到10次之后,按过期时间自动删除已处理的任务。
     * @var int
     */
    public $expire = 60;

    /**
     * @var array 失败配置
     */
    public $failed;

    /**
     * 入队列
     * @param string $data
     * @param $queueName
     */
    abstract protected function _push(array $data = [], $queueName = null);

    /**
     * 延时入队列
     * @param $delay
     * @param string $data
     * @param $queueName
     */
    abstract protected function _later($delay, array $data = [], $queueName = null);

    /**
     * 出队列
     * @param null $queueName
     * @return Job
     */
    abstract public function pop($queueName = null);

    /**
     * 将一个任务重新加入队列
     * @param string $queueName
     * @param string $delay
     * @return mixed
     */
    abstract public function deleteAndRelease($queueName, $payload, $delay);

    /**
     * 获取当前队列中等待执行的任务数量
     * @param string||null $queueName
     * @return mixed
     */
    abstract public function getJobCount($queueName = null);

    /**
     * 入队列
     * @param array $data
     * @param string||null $queueName
     * @return  mixed
     * @throws Exception
     */
    public function push(array $data = [], $queueName = null)
    {
        if ($this->canPush()) {
            return $this->_push($data, $queueName);
        } else {
            throw new Exception("Max Jobs number exceed! the max Jobs number is {$this->maxJob}");
        }
    }

    /**
     * 延时入队列
     * @param $delay
     * @param string $data
     * @param $queueName
     * @return mixed
     * @throws Exception
     */
    public function later($delay, array $data = [], $queue = null)
    {
        if ($this->canPush()) {
            return $this->_later($delay, $data, $queue);
        } else {
            throw new Exception("Max Jobs number exceed! the max Jobs number is {$this->maxJob}");
        }
    }

    /**
     * 将任务及任务相关数据打包成json数据
     * @param  mixed $data
     * @return string
     */
    protected function createPayload(array $data = [])
    {
        return json_encode($data, JSON_UNESCAPED_UNICODE);
    }

    /**
     * 准备任务的数据
     * @param $data
     * @return array
     */
    protected function prepareQueueData(array $data)
    {
        $data = array_map(function ($d) {

            if (is_array($d)) {
                return $this->prepareQueueData($d);
            }
            return $d;

        }, $data);

        return $data;
    }

    /**
     * 检查队列是否已达最大任务量
     * @return bool
     */
    protected function canPush()
    {
        if ($this->maxJob > 0 && $this->getJobCount() >= $this->maxJob) {
            return false;
        }
        return true;
    }


    /**
     * 获取队列名称
     * @param $queueName
     * @return string
     */
    protected function getQueueName($queueName)
    {
        return $queueName ?: $this->queueName;
    }
}