Job.php 5.91 KB
<?php

namespace common\components\jqueue\base;

use Yii;
use yii\base\Component;
use common\models\FailedJobsReason as FailedJobsReasonModel;
use function json_decode;
use function time;

/**
 * Class Job
 * @package common\components\jqueue\base
 */
abstract class Job extends Component
{
    const DEBUG = true; // 调试开关(开启日志)

    /**
     * 统一控制日志输出
     * @param $level
     * @param $msg
     */
    private static function Log($msg) {
        if (self::DEBUG == false || YII_ENV_PROD) { // 正式服屏蔽消息日志
            return ;
        }
        Yii::$app->dlog->debug($msg);
    }

    /**
     * 任务所属队列的名称
     * @var string
     */
    protected $queueName;
    
    /**
     * Queue实例
     * @var Queue
     */
    public $queueInstance;

    /**
     * 任务是否删除标识
     * @var bool
     */
    protected $deleted = false;

    /**
     * 任务是否released标识
     * @var bool
     */
    protected $released = false;

    /**
     * 获取任务已经尝试执行的次数
     * @return int
     */
    abstract public function getAttempts();

    /**
     * 获取任务的数据
     * @return string
     */
    abstract public function getPayload();

    /**
     * 检测任务是否被重新加入队列
     * @return bool
     */
    public function isReleased()
    {
        return $this->released;
    }

    /**
     * 检测任务是否被删除或者被released
     * @return bool
     */
    public function isDeletedOrReleased()
    {
        return $this->isDeleted() || $this->isReleased();
    }

    /**
     * 删除任务,子类需要实现具体的删除
     * @return void
     */
    public function delete()
    {
        $this->deleted = true;
    }

    /**
     * 判断任务是否被删除
     * @return bool
     */
    public function isDeleted()
    {
        return $this->deleted;
    }

    /**
     * 将任务重新加入队列
     * @param  int $delay
     * @return void
     */
    public function release($delay = 0)
    {
        $this->released = true;
    }

    /**
     * 执行任务
     * @return void
     */
    public function execute($callback)
    {
        $this->resolveAndFire($callback);
    }

    /**
     * @param $callback
     */
    protected function resolveAndFire($callback)
    {
        $payload = $this->getPayload();
        $payload = json_decode($payload, true);//以为数组的形式返回

        if($callback instanceof \Closure){
            try {
                $result =  $callback($this, $payload);
                //self::Log('任务执行结果=>[' . json_encode($result) . '] | 任务数据=>[' . json_encode($payload) . ']');
            } catch (Exception $e){
                $failReason = '任务执行异常=>' . $e->getMessage();
                $details = $e->getTraceAsString();
                self::Log($failReason . ': ' . $details);
                self::handleFailJob($payload['id'], $failReason, $details);
                throw $e;
            }
        }

        if (empty($result) || !isset($result['success'])) {
            self::Log('resolveAndFire(): ####### doJob返回结果未设置, 删除任务 #######');
            if (!$this->isDeletedOrReleased()) {
                $this->delete();
            }
        } else {
            // 任务执行成功
            if ($result['success']) {
                self::Log('resolveAndFire(): 任务执行成功, 从队列删除任务');
                if (!$this->isDeletedOrReleased()) {
                    $this->delete();
                }
            } else { // 任务执行失败, 写入数据库
                $failReason = '应用层任务回调返回错误';
                $details = $result['errcode'] . '=>' . $result['errmsg'];
                self::handleFailJob($payload['id'], $failReason, $details);
            }
        }
    }

    /**
     * 失败任务处理(记录到数据库方便分析原因)
     * @param $payloadId
     * @param $failReason
     * @param $details
     */
    public static function handleFailJob($payloadId, $failReason, $details)
    {
        $reasonModel = FailedJobsReasonModel::findOne(['payload_id' => $payloadId]);
        if ($reasonModel) {
            $reasonModel->reason = $failReason;
            $reasonModel->details = $details;
            $reasonModel->created_at = time();
            $reasonModel->save();
            self::Log('handleFailJob(): [更新]任务失败原因: ' . $failReason . ' | payload_id=' . $payloadId);
        }else {
            Yii::$app->db->createCommand()->insert('failed_jobs_reason', [
                'payload_id' => $payloadId,
                'reason'     => $failReason,
                'details'    => $details,
                'created_at' => time()
            ])->execute();
            self::Log('handleFailJob(): [添加]任务失败原因: ' . $failReason . ' | payload_id=' . $payloadId);
        }
    }

    /**
     * 任务执行失败后的处理方法(调用handler的failed方法)
     * @return boolean
     */
    public function failed()
    {
        if ($this->queueInstance->failed['logFail'] === true) {
            $failedProvider = Yii::createObject($this->queueInstance->failed['provider']);
            return $failedProvider->log($this->getQueueName(), $this->getPayload());
        }
        return false;
    }

    /**
     * 获取队列名称
     * @return string
     */
    public function getQueueName()
    {
        return $this->queueName;
    }

    /**
     * 设置队列名称
     * @param $queue
     */
    public function setQueueName($queueName)
    {
        $this->queueName = $queueName;
    }

    /**
     * 获取Queue实例
     * @return mixed
     */
    public function getQueueInstance()
    {
        return $this->queueInstance;
    }

    /**
     * 设置Queue实例
     * @param $queueInstance
     */
    public function setQueueInstance($queueInstance)
    {
        $this->queueInstance = $queueInstance;
    }
}