dlog->debug($msg); } /** * 任务所属队列的名称 * @var string */ protected $queueName; /** * Queue实例 * @var Queue */ public $queueInstance; /** * 任务是否删除标识 * @var bool */ protected $deleted = false; /** * 任务是否released标识 * @var bool */ protected $released = false; /** * 获取任务已经尝试执行的次数 * @return int */ abstract public function getAttempts(); /** * 获取任务的数据 * @return string */ abstract public function getPayload(); /** * 检测任务是否被重新加入队列 * @return bool */ public function isReleased() { return $this->released; } /** * 检测任务是否被删除或者被released * @return bool */ public function isDeletedOrReleased() { return $this->isDeleted() || $this->isReleased(); } /** * 删除任务,子类需要实现具体的删除 * @return void */ public function delete() { $this->deleted = true; } /** * 判断任务是否被删除 * @return bool */ public function isDeleted() { return $this->deleted; } /** * 将任务重新加入队列 * @param int $delay * @return void */ public function release($delay = 0) { $this->released = true; } /** * 执行任务 * @return void */ public function execute($callback) { $this->resolveAndFire($callback); } /** * @param $callback */ protected function resolveAndFire($callback) { $payload = $this->getPayload(); $payload = json_decode($payload, true);//以为数组的形式返回 if($callback instanceof \Closure){ try { $result = $callback($this, $payload); //self::Log('任务执行结果=>[' . json_encode($result) . '] | 任务数据=>[' . json_encode($payload) . ']'); } catch (Exception $e){ $failReason = '任务执行异常=>' . $e->getMessage(); $details = $e->getTraceAsString(); self::Log($failReason . ': ' . $details); self::handleFailJob($payload['id'], $failReason, $details); throw $e; } } if (empty($result) || !isset($result['success'])) { self::Log('resolveAndFire(): ####### doJob返回结果未设置, 删除任务 #######'); if (!$this->isDeletedOrReleased()) { $this->delete(); } } else { // 任务执行成功 if ($result['success']) { self::Log('resolveAndFire(): 任务执行成功, 从队列删除任务'); if (!$this->isDeletedOrReleased()) { $this->delete(); } } else { // 任务执行失败, 写入数据库 $failReason = '应用层任务回调返回错误'; $details = $result['errcode'] . '=>' . $result['errmsg']; self::handleFailJob($payload['id'], $failReason, $details); } } } /** * 失败任务处理(记录到数据库方便分析原因) * @param $payloadId * @param $failReason * @param $details */ public static function handleFailJob($payloadId, $failReason, $details) { $reasonModel = FailedJobsReasonModel::findOne(['payload_id' => $payloadId]); if ($reasonModel) { $reasonModel->reason = $failReason; $reasonModel->details = $details; $reasonModel->created_at = time(); $reasonModel->save(); self::Log('handleFailJob(): [更新]任务失败原因: ' . $failReason . ' | payload_id=' . $payloadId); }else { Yii::$app->db->createCommand()->insert('failed_jobs_reason', [ 'payload_id' => $payloadId, 'reason' => $failReason, 'details' => $details, 'created_at' => time() ])->execute(); self::Log('handleFailJob(): [添加]任务失败原因: ' . $failReason . ' | payload_id=' . $payloadId); } } /** * 任务执行失败后的处理方法(调用handler的failed方法) * @return boolean */ public function failed() { if ($this->queueInstance->failed['logFail'] === true) { $failedProvider = Yii::createObject($this->queueInstance->failed['provider']); return $failedProvider->log($this->getQueueName(), $this->getPayload()); } return false; } /** * 获取队列名称 * @return string */ public function getQueueName() { return $this->queueName; } /** * 设置队列名称 * @param $queue */ public function setQueueName($queueName) { $this->queueName = $queueName; } /** * 获取Queue实例 * @return mixed */ public function getQueueInstance() { return $this->queueInstance; } /** * 设置Queue实例 * @param $queueInstance */ public function setQueueInstance($queueInstance) { $this->queueInstance = $queueInstance; } }