db)) { $this->db = Yii::createObject($this->db); if ($this->db->driverName != 'mysql') { throw new Exception("sorry, only mysql db supported!"); } } } /** * 添加任务记录到数据库 * @param string $data * @param null $queue * @return mixed */ protected function _push(array $data = [], $queueName = null) { $queueName = $this->getQueueName($queueName); return $this->pushToDatabase(0, $queueName, $this->createPayload($data)); } /** * 添加一个延时记录到数据库 * @param int $delay 延迟多少秒后执行 * @param array $data 任务数据 * @param string||null $queueName * @return mixed */ protected function _later($delay, array $data = [], $queueName = null) { $queueName = $this->getQueueName($queueName); return $this->pushToDatabase($delay, $queueName, $this->createPayload($data)); } /** * 取出一个任务 * @param string||null $queueName * @return mixed */ public function pop($queueName = null) { $queueName = $this->getQueueName($queueName); if (!is_null($this->expire)) { $this->releaseJobsThatHaveBeenReservedTooLong($queueName); } $tr = $this->db->beginTransaction(); if ($model = $this->getNextAvailableJob($queueName)) { $this->markJobAsReserved($model->id); $tr->commit(); $config = [ 'class' => 'common\components\jqueue\db\Job', 'queueName' => $queueName, 'model' => $model, 'queueInstance' => $this, ]; return Yii::createObject($config); } $tr->commit(); return false; } /** * 从队列中删除一个已经处理过的任务 * * @param string $queueName * @param int $id * @return mixed */ public function deleteReserved($queueName, $id) { return $this->db->createCommand() ->delete($this->table, "id={$id}") ->execute(); } /** * 将一个任务重新加入队列 * * @param string $queueName * @param \stdClass $job * @param int $delay * @return mixed */ public function release($queueName, $model, $delay, $attempt = 0) { return $this->pushToDatabase($delay, $queueName, $model->payload, $model->attempts); } /** * 将任务数据写入数据库 * @param $delay * @param $queueName * @param $payload * @param int $attempts * @return integer */ protected function pushToDatabase($delay, $queueName, $payload, $attempts = 0) { $queueName = $this->getQueueName($queueName); $createdAt = time(); $availableAt = $this->getAvailableAt($delay, $createdAt); return $this->db->createCommand()->insert($this->table, [ 'queue' => $queueName, 'payload' => $payload, 'attempts' => $attempts, 'reserved' => 0, 'reserved_at' => null, 'available_at' => $availableAt, 'created_at' => $createdAt ])->execute(); } /** * 获取队列当前任务数量 * @param null $queueName * @return int|string */ public function getJobCount($queueName = null) { $queueName = $this->getQueueName($queueName); return (new Query()) ->select(['id']) ->from($this->table) ->where(['reserved' => 0]) ->andWhere(['queue' => $queueName]) ->count("*", $this->db); } /** * 获取任务有效时间 * @param $delay * @param $createdAt * @return mixed */ protected function getAvailableAt($delay, $createdAt) { return $createdAt + $delay; } /** * 重新激活那些超时仍未处理完的任务 * @param string $queueName * @return integer */ protected function releaseJobsThatHaveBeenReservedTooLong($queueName) { $expired = time() + $this->expire; $sql = "update {$this->table} set reserved=0,reserved_at=null,attempts=attempts+1 where queue='{$this->getQueueName($queueName)}' and reserved=1 and reserved_at<={$expired}"; return $this->db->createCommand($sql)->execute(); } /** * 获取下一个可用的任务 * * @param string|null $queueName * @return \stdClass|null */ protected function getNextAvailableJob($queueName) { $now = time(); $sql = "select * from {$this->table} where queue='{$this->getQueueName($queueName)}' and reserved=0 and available_at<={$now} ORDER BY id asc limit 1 for update"; $job = $this->db->createCommand($sql)->queryOne(); return $job ? (object)$job : null; } /** * 将任务标记为已处理 * * @param string $id * @return void */ protected function markJobAsReserved($id) { $now = time(); $sql = "update {$this->table} set reserved=1,reserved_at={$now} where id={$id}"; return $this->db->createCommand($sql)->execute(); } }