[ * 'class' => 'common\components\redis\YIIRedisConnection', //队列使用的类 * 'hostname' => 'localhost', * 'port' => 6379, *], * @var Redis */ public $redis; /** * @throws Exception */ public function init() { parent::init(); if (!is_object($this->redis)) { $this->redis = Yii::createObject($this->redis); if(!$this->redis || !$this->redis instanceof IConnection){ throw new Exception('Redis class injected must implements of "common\components\redis\IRedisConnection"'); } } } /** * 入队列 * @param $job * @param string $data * @param null $queue * @return int */ protected function _push(array $data = [], $queueName = null) { $data = $this->setData($data, 'attempts', 0); return $this->redis->rpush($this->formatQueueName($queueName), $this->createPayload($data, $queueName)); } /** * 延时任务入队列 * @param $delay * @param $job * @param string $data * @param null $queue * @return int */ protected function _later($delay, array $data = [], $queueName = null) { $data = $this->setData($data, 'attempts', 0); return $this->redis->zadd($this->formatQueueDelayedName($queueName), time() + $delay, $this->createPayload($data, $queueName)); } /** * 出队列 * @param null $queue * @return object|boolean * @throws \yii\base\InvalidConfigException */ public function pop($queueName = null) { // 重新激活那些延迟时间已经到的任务,queues_delayed= > queues 重新放回主队列 $this->migrateExpiredJobs($this->formatQueueDelayedName($queueName), $this->formatQueueName($queueName)); if (!is_null($this->expire)) { // 重新入列那些超时仍未完成的任务 queues_reserved => queues 重新放回主队列 $this->migrateExpiredJobs($this->formatQueueReservedName($queueName), $this->formatQueueName($queueName)); } list($payload, $reservedPayload) = $this->retrieveNextJob($queueName); if ($reservedPayload) { $config = [ 'class' => 'common\components\jqueue\redis\Job', 'queueName' => $queueName, 'payload' => $reservedPayload, 'queueInstance' => $this, ]; return Yii::createObject($config); } return false; } /** * 获取队列当前任务数 = 执行队列任务数 + 等待队列任务数 * @param null $queue * @return mixed */ public function getJobCount($queueName = null) { return $this->redis->eval( LuaScripts::size(), 3, $this->formatQueueName($queueName), $this->formatQueueDelayedName($queueName), $this->formatQueueReservedName($queueName) ); } /** * 将任务重新加入队列中 * @param string $queueName * @param string $payload 这里是一个JSON数组 * @param int $delay * @return void */ public function deleteAndRelease($queueName, $payload, $delay) { $this->redis->eval( LuaScripts::release(), 2, $this->formatQueueDelayedName($queueName), $this->formatQueueReservedName($queueName), $payload, time() + $delay ); } /** * 给队列数据添加id和attempts字段 * @param string $job * @param mixed $data * @param string $queueName * @return string */ protected function createPayload(array $data = [], $queueName = null) { $data = $this->setData($data, 'id', $this->getRandomId()); return parent::createPayload($data); } /** * 创建一个随机串作为id * @param int $length * @return string */ protected function getRandomId() { $string = md5(time() . rand(1000, 9999)); return $string; } /** * 队列名称:主队列 * @param string|null $queueName * @return string */ protected function formatQueueName($queueName) { return 'queues:' . ($queueName ?: $this->queueName); } /** * 队列名称:延迟执行的队列 * @param string|null $queueName * @return string */ protected function formatQueueDelayedName($queueName) { return 'queues_delayed:' . ($queueName ?: $this->queueName); } /** * 队列名称:已执行过的任务 * @param string|null $queueName * @return string */ protected function formatQueueReservedName($queueName) { return 'queues_reserved:' . ($queueName ?: $this->queueName); } /** * 从已处理集合中删除一个任务 * @param string $queue * @param string $payload 在这里是一个JSON字符串 * @return void */ public function deleteReserved($queueName, $payload) { $this->redis->zrem($this->formatQueueReservedName($queueName), $payload); } /** * Migrate the delayed jobs that are ready to the regular queue. * * @param string $from * @param string $to * @return array */ public function migrateExpiredJobs($from, $to) { return $this->redis->eval( LuaScripts::migrateExpiredJobs(), 2, $from, $to, time() ); } /** * Retrieve the next job from the queue. * * @param string $queueName * @return array */ protected function retrieveNextJob($queueName) { return $this->redis->eval( LuaScripts::pop(), 2, $this->formatQueueName($queueName), $this->formatQueueReservedName($queueName), time() + $this->expire ); } /** * 在输入数据中添加新的字段 * @param array $data * @param string $key * @param string $value * @return array */ protected function setData($data, $key, $value) { return ArrayHelper::set($data, $key, $value); } /**stopIfNecessary * 清空指定队列 * @param null $queue * @return integer * @throws \Exception execution failed */ public function flush($queueName = null) { return $this->redis->del($this->formatQueueName($queueName), $this->formatQueueDelayedName($queueName), $this->formatQueueReservedName($queueName)); } }