Queue.php 5.77 KB
<?php

namespace common\components\jqueue\db;

use Yii;
use yii\db\Query;
use common\components\jqueue\base\Exception;
use common\components\jqueue\base\Queue as BaseQueue;
use function is_array;
use function is_null;
use function time;

/**
 * Class Queue
 * @package common\components\jqueue\db
 */
class Queue extends BaseQueue
{
    /**
     * @var \yii\db\Connection
     */
    public $db;

    /**
     * @var string 存储队列任务表名称
     */
    public $table = 'jobs';

    /**
     * @throws Exception
     */
    public function init()
    {
        parent::init();
        if (is_array($this->db)) {
            $this->db = Yii::createObject($this->db);
            if ($this->db->driverName != 'mysql') {
                throw new Exception("sorry, only mysql db supported!");
            }
        }
    }

    /**
     * 添加任务记录到数据库
     * @param string $data
     * @param null $queue
     * @return mixed
     */
    protected function _push(array $data = [], $queueName = null)
    {
        $queueName = $this->getQueueName($queueName);
        return $this->pushToDatabase(0, $queueName, $this->createPayload($data));
    }
    
    /**
     * 添加一个延时记录到数据库
     * @param int $delay 延迟多少秒后执行
     * @param array $data 任务数据
     * @param string||null $queueName
     * @return mixed
     */
    protected function _later($delay, array $data = [], $queueName = null)
    {
        $queueName = $this->getQueueName($queueName);
        return $this->pushToDatabase($delay, $queueName, $this->createPayload($data));
    }

    /**
     * 取出一个任务
     * @param string||null $queueName
     * @return mixed
     */
    public function pop($queueName = null)
    {
        $queueName = $this->getQueueName($queueName);

        if (!is_null($this->expire)) {
            $this->releaseJobsThatHaveBeenReservedTooLong($queueName);
        }

        $tr = $this->db->beginTransaction();

        if ($model = $this->getNextAvailableJob($queueName)) {
            $this->markJobAsReserved($model->id);
            $tr->commit();

            $config = [
                'class'         => 'common\components\jqueue\db\Job',
                'queueName'     => $queueName,
                'model'         => $model,
                'queueInstance' => $this,
            ];

            return Yii::createObject($config);

        }
        $tr->commit();
        return false;
    }

    /**
     * 从队列中删除一个已经处理过的任务
     *
     * @param string $queueName
     * @param int $id
     * @return mixed
     */
    public function deleteReserved($queueName, $id)
    {
        return $this->db->createCommand()
            ->delete($this->table, "id={$id}")
            ->execute();
    }

    /**
     * 将一个任务重新加入队列
     *
     * @param string $queueName
     * @param \stdClass $job
     * @param int $delay
     * @return mixed
     */
    public function release($queueName, $model, $delay, $attempt = 0)
    {
        return $this->pushToDatabase($delay, $queueName, $model->payload, $model->attempts);
    }

    /**
     * 将任务数据写入数据库
     * @param $delay
     * @param $queueName
     * @param $payload
     * @param int $attempts
     * @return integer
     */
    protected function pushToDatabase($delay, $queueName, $payload, $attempts = 0)
    {
        $queueName = $this->getQueueName($queueName);
        $createdAt = time();
        $availableAt = $this->getAvailableAt($delay, $createdAt);
        return $this->db->createCommand()->insert($this->table, [
            'queue' => $queueName,
            'payload' => $payload,
            'attempts' => $attempts,
            'reserved' => 0,
            'reserved_at' => null,
            'available_at' => $availableAt,
            'created_at' => $createdAt
        ])->execute();
    }

    /**
     * 获取队列当前任务数量
     * @param null $queueName
     * @return int|string
     */
    public function getJobCount($queueName = null)
    {
        $queueName = $this->getQueueName($queueName);
        return (new Query())
            ->select(['id'])
            ->from($this->table)
            ->where(['reserved' => 0])
            ->andWhere(['queue' => $queueName])
            ->count("*", $this->db);
    }

    /**
     * 获取任务有效时间
     * @param $delay
     * @param $createdAt
     * @return mixed
     */
    protected function getAvailableAt($delay, $createdAt)
    {
        return $createdAt + $delay;
    }

    /**
     * 重新激活那些超时仍未处理完的任务
     * @param  string $queueName
     * @return integer
     */
    protected function releaseJobsThatHaveBeenReservedTooLong($queueName)
    {
        $expired = time() + $this->expire;
        $sql = "update {$this->table} set reserved=0,reserved_at=null,attempts=attempts+1 where queue='{$this->getQueueName($queueName)}' and reserved=1 and reserved_at<={$expired}";
        return $this->db->createCommand($sql)->execute();

    }

    /**
     * 获取下一个可用的任务
     *
     * @param  string|null $queueName
     * @return \stdClass|null
     */
    protected function getNextAvailableJob($queueName)
    {
        $now = time();
        $sql = "select * from {$this->table} where queue='{$this->getQueueName($queueName)}' and reserved=0 and available_at<={$now} ORDER BY id asc limit 1 for update";
        $job = $this->db->createCommand($sql)->queryOne();
        return $job ? (object)$job : null;
    }

    /**
     * 将任务标记为已处理
     *
     * @param  string $id
     * @return void
     */
    protected function markJobAsReserved($id)
    {
        $now = time();
        $sql = "update {$this->table} set reserved=1,reserved_at={$now} where id={$id}";
        return $this->db->createCommand($sql)->execute();
    }
}