* To use this module with Composer you need "php-amqplib/php-amqplib": "~2.4" package. * * * ## Config * * * host: localhost - host to connect * * username: guest - username to connect * * password: guest - password to connect * * vhost: '/' - vhost to connect * * cleanup: true - defined queues will be purged before running every test. * * queues: [mail, twitter] - queues to cleanup * * ### Example * * modules: * enabled: * - AMQP: * host: 'localhost' * port: '5672' * username: 'guest' * password: 'guest' * vhost: '/' * queues: [queue1, queue2] * * ## Public Properties * * * connection - AMQPStreamConnection - current connection */ class AMQP extends CodeceptionModule implements RequiresPackage { protected $config = [ 'host' => 'localhost', 'username' => 'guest', 'password' => 'guest', 'port' => '5672', 'vhost' => '/', 'cleanup' => true, ]; /** * @var AMQPStreamConnection */ public $connection; /** * @var AMQPChannel */ protected $channel; protected $requiredFields = ['host', 'username', 'password', 'vhost']; public function _requires() { return ['PhpAmqpLib\Connection\AMQPStreamConnection' => '"php-amqplib/php-amqplib": "~2.4"']; } public function _initialize() { $host = $this->config['host']; $port = $this->config['port']; $username = $this->config['username']; $password = $this->config['password']; $vhost = $this->config['vhost']; try { $this->connection = new AMQPStreamConnection($host, $port, $username, $password, $vhost); } catch (Exception $e) { throw new ModuleException(__CLASS__, $e->getMessage() . ' while establishing connection to MQ server'); } } public function _before(TestInterface $test) { if ($this->config['cleanup']) { $this->cleanup(); } } /** * Sends message to exchange by sending exchange name, message * and (optionally) a routing key * * ``` php * pushToExchange('exchange.emails', 'thanks'); * $I->pushToExchange('exchange.emails', new AMQPMessage('Thanks!')); * $I->pushToExchange('exchange.emails', new AMQPMessage('Thanks!'), 'severity'); * ?> * ``` * * @param string $exchange * @param string|AMQPMessage $message * @param string $routing_key */ public function pushToExchange($exchange, $message, $routing_key = null) { $message = $message instanceof AMQPMessage ? $message : new AMQPMessage($message); $this->connection->channel()->basic_publish($message, $exchange, $routing_key); } /** * Sends message to queue * * ``` php * pushToQueue('queue.jobs', 'create user'); * $I->pushToQueue('queue.jobs', new AMQPMessage('create')); * ?> * ``` * * @param string $queue * @param string|AMQPMessage $message */ public function pushToQueue($queue, $message) { $message = $message instanceof AMQPMessage ? $message : new AMQPMessage($message); $this->connection->channel()->queue_declare($queue); $this->connection->channel()->basic_publish($message, '', $queue); } /** * Declares an exchange * * This is an alias of method `exchange_declare` of `PhpAmqpLib\Channel\AMQPChannel`. * * ```php * declareExchange( * 'nameOfMyExchange', // exchange name * 'topic' // exchange type * ) * ``` * * @param string $exchange * @param string $type * @param bool $passive * @param bool $durable * @param bool $auto_delete * @param bool $internal * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ public function declareExchange( $exchange, $type, $passive = false, $durable = false, $auto_delete = true, $internal = false, $nowait = false, $arguments = null, $ticket = null ) { return $this->connection->channel()->exchange_declare( $exchange, $type, $passive, $durable, $auto_delete, $internal, $nowait, $arguments, $ticket ); } /** * Declares queue, creates if needed * * This is an alias of method `queue_declare` of `PhpAmqpLib\Channel\AMQPChannel`. * * ```php * declareQueue( * 'nameOfMyQueue', // exchange name * ) * ``` * * @param string $queue * @param bool $passive * @param bool $durable * @param bool $exclusive * @param bool $auto_delete * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ public function declareQueue( $queue = '', $passive = false, $durable = false, $exclusive = false, $auto_delete = true, $nowait = false, $arguments = null, $ticket = null ) { return $this->connection->channel()->queue_declare( $queue, $passive, $durable, $exclusive, $auto_delete, $nowait, $arguments, $ticket ); } /** * Binds a queue to an exchange * * This is an alias of method `queue_bind` of `PhpAmqpLib\Channel\AMQPChannel`. * * ```php * bindQueueToExchange( * 'nameOfMyQueueToBind', // name of the queue * 'transactionTracking.transaction', // exchange name to bind to * 'your.routing.key' // Optionally, provide a binding key * ) * ``` * * @param string $queue * @param string $exchange * @param string $routing_key * @param bool $nowait * @param array $arguments * @param int $ticket * @return mixed|null */ public function bindQueueToExchange( $queue, $exchange, $routing_key = '', $nowait = false, $arguments = null, $ticket = null ) { return $this->connection->channel()->queue_bind( $queue, $exchange, $routing_key, $nowait, $arguments, $ticket ); } /** * Checks if message containing text received. * * **This method drops message from queue** * **This method will wait for message. If none is sent the script will stuck**. * * ``` php * pushToQueue('queue.emails', 'Hello, davert'); * $I->seeMessageInQueueContainsText('queue.emails','davert'); * ?> * ``` * * @param string $queue * @param string $text */ public function seeMessageInQueueContainsText($queue, $text) { $msg = $this->connection->channel()->basic_get($queue); if (!$msg) { $this->fail("Message was not received"); } if (!$msg instanceof AMQPMessage) { $this->fail("Received message is not format of AMQPMessage"); } $this->debugSection("Message", $msg->body); $this->assertContains($text, $msg->body); } /** * Takes last message from queue. * * ``` php * grabMessageFromQueue('queue.emails'); * ?> * ``` * * @param string $queue * @return AMQPMessage */ public function grabMessageFromQueue($queue) { $message = $this->connection->channel()->basic_get($queue); return $message; } /** * Purge a specific queue defined in config. * * ``` php * purgeQueue('queue.emails'); * ?> * ``` * * @param string $queueName */ public function purgeQueue($queueName = '') { if (! in_array($queueName, $this->config['queues'])) { throw new ModuleException(__CLASS__, "'$queueName' doesn't exist in queues config list"); } $this->connection->channel()->queue_purge($queueName, true); } /** * Purge all queues defined in config. * * ``` php * purgeAllQueues(); * ?> * ``` */ public function purgeAllQueues() { $this->cleanup(); } protected function cleanup() { if (!isset($this->config['queues'])) { throw new ModuleException(__CLASS__, "please set queues for cleanup"); } if (!$this->connection) { return; } foreach ($this->config['queues'] as $queue) { try { $this->connection->channel()->queue_purge($queue); } catch (AMQPProtocolChannelException $e) { // ignore if exchange/queue doesn't exist and rethrow exception if it's something else if ($e->getCode() !== 404) { throw $e; } } } } }