MultiExec.php 9.09 KB
<?php namespace common\components\redis\transaction;

use common\components\redis\IConnection;
use common\components\redis\PHPRedisConnection;
use common\components\redis\YIIRedisConnection;

/**
 * Class MultiExec
 * 注意:该类原型来自 \Predis\Transaction\MultiExec
 * @package common\components\redis\transaction
 */
class MultiExec
{
    private $state;

    protected $connection;
    protected $commands = [];
    protected $attempts = 0;
    protected $watchKeys = [];
    protected $modeCAS = false;

    /**
     * @param IRedisConnection $connection  Connection instance used by the transaction.
     * @param array $options Initialization options.
     */
    public function __construct(IConnection $connection, array $options = [])
    {
        $this->connection = clone $connection;
        $this->state = new MultiExecState();

        $this->configure($options ?: []);
        $this->reset();
    }

    /**
     * Configures the transaction using the provided options.
     *
     * @param IRedisConnection $connection  Underlying connection instance.
     * @param array           $options Array of options for the transaction.
     **/
    protected function configure(array $options)
    {
        if (isset($options['cas'])) {
            $this->modeCAS = (bool) $options['cas'];
        }

        if (isset($options['watch']) && $keys = $options['watch']) {
            $this->watchKeys = $keys;
        }

        if (isset($options['retry'])) {
            $this->attempts = (int) $options['retry'];
        }
    }

    /**
     * Resets the state of the transaction.
     * 重置事务的状态
     */
    protected function reset()
    {
        $this->state->reset();
        $this->commands = [];
    }

    /**
     * 初始化事务的上下文环境
     */
    protected function initialize()
    {
        if ($this->state->isInitialized()) {
            return;
        }

        if ($this->modeCAS) {
            $this->state->flag(MultiExecState::CAS);
        }

        if ($this->watchKeys) {
            $this->watch($this->watchKeys);
        }

        $cas = $this->state->isCAS();
        $discarded = $this->state->isDiscarded();

        if (!$cas || ($cas && $discarded)) {
            $this->call('MULTI');
            if ($discarded) {
                $this->state->unflag(MultiExecState::CAS);
            }
        }

        $this->state->unflag(MultiExecState::DISCARDED);
        $this->state->flag(MultiExecState::INITIALIZED);
    }

    /**
     * 代理 Connection 的 指令和方法,把REDIS操作捆绑到 MultiExec(事务类)
     * $tr = new MultiExec();
     * $tr->lpop();//lpop是REDIS命令
     *
     * 注意:调用该方法的命令会认为是事务的一部分,会被统计到$this->commands中去
     *
     * @param $method
     * @param $arguments
     * @return mixed
     */
    public function __call($method, $arguments)
    {
        return $this->executeCommand($method, $arguments);
    }

    /**
     * 执行REDIS命令
     * 不调用初始化,提供给本类调用REDIS命令。
     * @param string $method Command.
     * @param array  $arguments Arguments for the command.
     *
     * @return mixed
     */
    protected function call($method, array $arguments = [])
    {
        return call_user_func_array([$this->connection, $method], $arguments);
    }

    /**
     * 执行REDIS命令
     * 调用初始化程序
     * @param string $method Command.
     * @param string $arguments 参数数组.
     * @throws \Exception
     * @return $this|mixed
     */
    public function executeCommand($method, $arguments)
    {
        $this->initialize();

        $response = call_user_func_array([$this->connection, $method], $arguments);

        if ($this->state->isCAS()) {
            return $response;
        }

        /**
         * 开启 $tx->multi()之后,PHP REDI和YII REDIS在执行某条指令的时候返回的结果不同,
         * YII REDIS是字符串之类的,PHP REDIS返回的是\Redis的实例。
         */
        if($this->connection instanceof PHPRedisConnection && $response instanceof \Redis){
            $this->commands[] = [
                'method' => $method,
                'arguments' => $arguments
            ];
        } elseif($this->connection instanceof YIIRedisConnection && $response == 'QUEUED') {
            $this->commands[] = [
                'method' => $method,
                'arguments' => $arguments
            ];
        } else {
            throw new \Exception('The server did not return a valid status response.');
        }
    }

    /**
     * Executes WATCH against one or more keys.
     *
     * @param string|array $keys One or more keys.
     * @throws \Exception
     * @return mixed
     */
    public function watch($keys)
    {
        if ($this->state->isWatchAllowed()) {
            throw new \Exception('Sending WATCH after MULTI is not allowed.');
        }

        $response = $this->call('WATCH', is_array($keys) ? $keys : array($keys));
        $this->state->flag(MultiExecState::WATCH);

        return $response;
    }

    /**
     * Finalizes the transaction by executing MULTI on the server.
     * @return MultiExec
     */
    public function multi()
    {
        if ($this->state->check(MultiExecState::INITIALIZED | MultiExecState::CAS)) {
            $this->state->unflag(MultiExecState::CAS);
            $this->call('MULTI');
        } else {
            $this->initialize();
        }

        return $this;
    }

    /**
     * Executes UNWATCH.
     * @throws \Exception
     * @return MultiExec
     */
    public function unwatch()
    {
        $this->state->unflag(MultiExecState::WATCH);
        $this->__call('UNWATCH', []);

        return $this;
    }

    /**
     * Resets the transaction by UNWATCH-ing the keys that are being WATCHed and
     * DISCARD-ing pending commands that have been already sent to the server.
     * @return MultiExec
     */
    public function discard()
    {
        if ($this->state->isInitialized()) {
            $this->call($this->state->isCAS() ? 'UNWATCH' : 'DISCARD');

            $this->reset();
            $this->state->flag(MultiExecState::DISCARDED);
        }

        return $this;
    }

    /**
     * Executes the whole transaction.
     * @return mixed
     */
    public function exec()
    {
        return $this->execute();
    }

    /**
     * Checks the state of the transaction before execution.
     *
     * @param mixed $callable Callback for execution.
     * @throws \InvalidArgumentException
     * @throws \Exception
     */
    private function checkBeforeExecution($callable)
    {
        if ($this->state->isExecuting()) {
            throw new \Exception('Cannot invoke "execute" or "exec" inside an active transaction context.');
        }

        if ($callable) {
            if (!is_callable($callable)) {
                throw new \InvalidArgumentException('The argument must be a callable object.');
            }
            if (!empty($this->commands)) {
                $this->discard();
                throw new \Exception('Cannot execute a transaction block after using fluent interface.');
            }
        } elseif ($this->attempts) {
            $this->discard();
            throw new \Exception('Automatic retries are supported only when a callable block is provided.');
        }
    }

    /**
     * Handles the actual execution of the whole transaction.
     *
     * @param mixed $callable Optional callback for execution.
     * @throws AbortedMultiExecException
     * @return array
     */
    public function execute($callable = null)
    {
        $this->checkBeforeExecution($callable);

        $execResponse = null;
        $attempts = $this->attempts;

        do {
            if ($callable) {
                $this->executeTransactionBlock($callable);
            }

            if (empty($this->commands)) {
                if ($this->state->isWatching()) {
                    $this->discard();
                }
                return;
            }

            $execResponse = $this->call('EXEC');

            if ($execResponse === null) {
                if ($attempts === 0) {
                    throw new AbortedMultiExecException($this, 'The current transaction has been aborted by the server.');
                }

                $this->reset();

                continue;
            }
            break;
        } while ($attempts-- > 0);

        $size = count($execResponse);
        if ($size !== count($this->commands)) {
            throw new \Exception('EXEC returned an unexpected number of response items.');
        }

        return $execResponse;
    }

    /**
     * Passes the current transaction object to a callable block for execution.
     * @param mixed $callable Callback.
     * @throws null
     */
    protected function executeTransactionBlock($callable)
    {
        $exception = null;
        $this->state->flag(MultiExecState::INSIDEBLOCK);

        try {
            call_user_func($callable, $this);
        } catch (\Exception $exception) {
            $this->discard();
        }

        $this->state->unflag(MultiExecState::INSIDEBLOCK);

        if ($exception) {
            throw $exception;
        }
    }
}