环境配置
# 添加Erlang仓库 wget -O- https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc | sudo apt-key add - echo "deb https://packages.erlang-solutions.com/ubuntu $(lsb_release -sc) contrib" | sudo tee /etc/apt/sources.list.d/rabbitmq.list # 更新包索引 sudo apt-get update # 安装Erlang sudo apt-get install -y erlang-base \ erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \ erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \ erlang-runtime-tools erlang-snmp erlang-ssl \ erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl # 添加RabbitMQ仓库 echo "deb https://dl.bintray.com/rabbitmq/debian $(lsb_release -sc) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list # 导入RabbitMQ公钥 wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - # 安装 rabbitmq-server sudo apt-get install rabbitmq-server sudo systemctl start rabbitmq-server sudo systemctl enable rabbitmq-server # 设置开机自启 # 安装librabbitmq开发库 sudo apt-get install -y librabbitmq-dev # 安装php 扩展 pecl install amqp # 安装php 客户端 composer require videlalvaro/php-amqplib
集成代码
//rabbitmq配置 <?php return [ 'host' => env('RABBITMQ_HOST', 'localhost'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), 'connection_timeout' => 3.0, 'read_write_timeout' => 3.0, 'heartbeat' => 0, ]; <?php namespace app\service; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use think\facade\Config; class RabbitMQService { protected $connection; protected $channel; public function __construct() { $config = Config::get('rabbitmq'); $this->connection = new AMQPStreamConnection( $config['host'], $config['port'], $config['user'], $config['password'], $config['vhost'], false, 'AMQPLAIN', null, 'en_US', $config['connection_timeout'], $config['read_write_timeout'], null, $config['heartbeat'] ); $this->channel = $this->connection->channel(); } public function getChannel() { return $this->channel; } public function close() { $this->channel->close(); $this->connection->close(); } } <?php namespace app\service; use PhpAmqpLib\Message\AMQPMessage; class RabbitMQProducer extends RabbitMQService { public function publish($exchange, $routingKey, $message, $params = []) { // 声明交换机 $this->channel->exchange_declare( $exchange, // 交换机名称 'direct', // 交换机类型(根据需求修改) false, // 是否被动声明(仅检查是否存在) true, // 是否持久化 false // 是否自动删除 ); // 声明队列 直连模式 $this->channel->queue_declare( 'my_queue', // 队列名称 false, // 是否被动声明(仅检查是否存在) true, // 是否持久化 false, // 是否排他(仅对当前连接可见) false // 是否自动删除 ); $msg = new AMQPMessage( $message, array_merge([ 'content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ], $params) ); $this->channel->basic_publish($msg, $exchange, $routingKey); } } <?php namespace app\service; class RabbitMQConsumer extends RabbitMQService { public function consume($queue, $callback, $noAck = false) { // 声明交换机(如果不存在则创建) $this->channel->exchange_declare( 'my_exchange', // 交换机名称 'direct', // 交换机类型(根据需求修改) false, // 是否被动声明 true, // 是否持久化 false // 是否自动删除 ); // 声明队列(如果不存在则创建) $this->channel->queue_declare($queue, false, true, false, false); // 将队列绑定到交换机 $this->channel->queue_bind($queue, 'my_exchange', 'my_routing_key'); $this->channel->basic_consume( $queue, '', false, $noAck, false, false, $callback ); while ($this->channel->is_open()) { $this->channel->wait(); } } } <?php declare (strict_types = 1); namespace app\controller; use app\BaseController; use app\service\RabbitMQProducer; class Rabbitmq extends BaseController { public function sendMessage() { $producer = new RabbitMQProducer(); $producer->publish( 'my_exchange', // 交换机名称 'my_routing_key', // 路由键 json_encode([ // 消息内容 'user_id' => 1234, 'action' => 'login', 'time' => time() ]) ); $producer->close(); return json(['code' => 200, 'msg' => '消息发送成功']); } } <?php declare (strict_types = 1); namespace app\command; use think\console\Command; use think\console\Input; use think\console\input\Argument; use think\console\input\Option; use think\console\Output; use app\service\RabbitMQConsumer; class ConsumeMessage extends Command { protected function configure() { // 指令配置 $this->setName('rabbitmq:consume') ->setDescription('消费RabbitMQ消息'); } protected function execute(Input $input, Output $output) { $consumer = new RabbitMQConsumer(); $consumer->consume('my_queue', function ($message) use ($output) { $data = json_decode($message->body, true); $output->writeln("收到消息: " . print_r($data, true)); // 处理完消息后确认 $message->ack(); }); $consumer->close(); } }
测试
http://www.thinkphp8.test/rabbitmq/sendMessage ~ php think rabbitmq:consume #可以看到投递的消息 可以supervisor 管理
消息类型
常见消息模式
- 直连模式:消息根据路由键(routing key)直接路由到队列。生产者和消费者都需要声明队列,确保队列存在。生产者通过交换机和路由键发送消息,消费者从队列接收消息。
- 扇形模式:消息广播到所有绑定的队列,忽略路由键。每个消费者需要声明自己的队列,并绑定到扇形交换机。生产者只需向交换机发送消息,无需关心队列。
- 主题模式:消息根据路由键的模式(如
*.error
)路由到多个队列。消费者声明队列并绑定到主题交换机,指定绑定键(如order.#
)。生产者根据业务逻辑设置路由键。