connection = clone $connection; $this->state = new MultiExecState(); $this->configure($options ?: []); $this->reset(); } /** * Configures the transaction using the provided options. * * @param IRedisConnection $connection Underlying connection instance. * @param array $options Array of options for the transaction. **/ protected function configure(array $options) { if (isset($options['cas'])) { $this->modeCAS = (bool) $options['cas']; } if (isset($options['watch']) && $keys = $options['watch']) { $this->watchKeys = $keys; } if (isset($options['retry'])) { $this->attempts = (int) $options['retry']; } } /** * Resets the state of the transaction. * 重置事务的状态 */ protected function reset() { $this->state->reset(); $this->commands = []; } /** * 初始化事务的上下文环境 */ protected function initialize() { if ($this->state->isInitialized()) { return; } if ($this->modeCAS) { $this->state->flag(MultiExecState::CAS); } if ($this->watchKeys) { $this->watch($this->watchKeys); } $cas = $this->state->isCAS(); $discarded = $this->state->isDiscarded(); if (!$cas || ($cas && $discarded)) { $this->call('MULTI'); if ($discarded) { $this->state->unflag(MultiExecState::CAS); } } $this->state->unflag(MultiExecState::DISCARDED); $this->state->flag(MultiExecState::INITIALIZED); } /** * 代理 Connection 的 指令和方法,把REDIS操作捆绑到 MultiExec(事务类) * $tr = new MultiExec(); * $tr->lpop();//lpop是REDIS命令 * * 注意:调用该方法的命令会认为是事务的一部分,会被统计到$this->commands中去 * * @param $method * @param $arguments * @return mixed */ public function __call($method, $arguments) { return $this->executeCommand($method, $arguments); } /** * 执行REDIS命令 * 不调用初始化,提供给本类调用REDIS命令。 * @param string $method Command. * @param array $arguments Arguments for the command. * * @return mixed */ protected function call($method, array $arguments = []) { return call_user_func_array([$this->connection, $method], $arguments); } /** * 执行REDIS命令 * 调用初始化程序 * @param string $method Command. * @param string $arguments 参数数组. * @throws \Exception * @return $this|mixed */ public function executeCommand($method, $arguments) { $this->initialize(); $response = call_user_func_array([$this->connection, $method], $arguments); if ($this->state->isCAS()) { return $response; } /** * 开启 $tx->multi()之后,PHP REDI和YII REDIS在执行某条指令的时候返回的结果不同, * YII REDIS是字符串之类的,PHP REDIS返回的是\Redis的实例。 */ if($this->connection instanceof PHPRedisConnection && $response instanceof \Redis){ $this->commands[] = [ 'method' => $method, 'arguments' => $arguments ]; } elseif($this->connection instanceof YIIRedisConnection && $response == 'QUEUED') { $this->commands[] = [ 'method' => $method, 'arguments' => $arguments ]; } else { throw new \Exception('The server did not return a valid status response.'); } } /** * Executes WATCH against one or more keys. * * @param string|array $keys One or more keys. * @throws \Exception * @return mixed */ public function watch($keys) { if ($this->state->isWatchAllowed()) { throw new \Exception('Sending WATCH after MULTI is not allowed.'); } $response = $this->call('WATCH', is_array($keys) ? $keys : array($keys)); $this->state->flag(MultiExecState::WATCH); return $response; } /** * Finalizes the transaction by executing MULTI on the server. * @return MultiExec */ public function multi() { if ($this->state->check(MultiExecState::INITIALIZED | MultiExecState::CAS)) { $this->state->unflag(MultiExecState::CAS); $this->call('MULTI'); } else { $this->initialize(); } return $this; } /** * Executes UNWATCH. * @throws \Exception * @return MultiExec */ public function unwatch() { $this->state->unflag(MultiExecState::WATCH); $this->__call('UNWATCH', []); return $this; } /** * Resets the transaction by UNWATCH-ing the keys that are being WATCHed and * DISCARD-ing pending commands that have been already sent to the server. * @return MultiExec */ public function discard() { if ($this->state->isInitialized()) { $this->call($this->state->isCAS() ? 'UNWATCH' : 'DISCARD'); $this->reset(); $this->state->flag(MultiExecState::DISCARDED); } return $this; } /** * Executes the whole transaction. * @return mixed */ public function exec() { return $this->execute(); } /** * Checks the state of the transaction before execution. * * @param mixed $callable Callback for execution. * @throws \InvalidArgumentException * @throws \Exception */ private function checkBeforeExecution($callable) { if ($this->state->isExecuting()) { throw new \Exception('Cannot invoke "execute" or "exec" inside an active transaction context.'); } if ($callable) { if (!is_callable($callable)) { throw new \InvalidArgumentException('The argument must be a callable object.'); } if (!empty($this->commands)) { $this->discard(); throw new \Exception('Cannot execute a transaction block after using fluent interface.'); } } elseif ($this->attempts) { $this->discard(); throw new \Exception('Automatic retries are supported only when a callable block is provided.'); } } /** * Handles the actual execution of the whole transaction. * * @param mixed $callable Optional callback for execution. * @throws AbortedMultiExecException * @return array */ public function execute($callable = null) { $this->checkBeforeExecution($callable); $execResponse = null; $attempts = $this->attempts; do { if ($callable) { $this->executeTransactionBlock($callable); } if (empty($this->commands)) { if ($this->state->isWatching()) { $this->discard(); } return; } $execResponse = $this->call('EXEC'); if ($execResponse === null) { if ($attempts === 0) { throw new AbortedMultiExecException($this, 'The current transaction has been aborted by the server.'); } $this->reset(); continue; } break; } while ($attempts-- > 0); $size = count($execResponse); if ($size !== count($this->commands)) { throw new \Exception('EXEC returned an unexpected number of response items.'); } return $execResponse; } /** * Passes the current transaction object to a callable block for execution. * @param mixed $callable Callback. * @throws null */ protected function executeTransactionBlock($callable) { $exception = null; $this->state->flag(MultiExecState::INSIDEBLOCK); try { call_user_func($callable, $this); } catch (\Exception $exception) { $this->discard(); } $this->state->unflag(MultiExecState::INSIDEBLOCK); if ($exception) { throw $exception; } } }