环境配置
# 添加Erlang仓库
wget -O- https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc | sudo apt-key add -
echo "deb https://packages.erlang-solutions.com/ubuntu $(lsb_release -sc) contrib" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
# 更新包索引
sudo apt-get update
# 安装Erlang
sudo apt-get install -y erlang-base \
erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
erlang-runtime-tools erlang-snmp erlang-ssl \
erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
# 添加RabbitMQ仓库
echo "deb https://dl.bintray.com/rabbitmq/debian $(lsb_release -sc) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
# 导入RabbitMQ公钥
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
# 安装 rabbitmq-server
sudo apt-get install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server # 设置开机自启
# 安装librabbitmq开发库
sudo apt-get install -y librabbitmq-dev
# 安装php 扩展
pecl install amqp
# 安装php 客户端
composer require videlalvaro/php-amqplib
集成代码
//rabbitmq配置
<?php
return [
'host' => env('RABBITMQ_HOST', 'localhost'),
'port' => env('RABBITMQ_PORT', 5672),
'user' => env('RABBITMQ_USER', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'vhost' => env('RABBITMQ_VHOST', '/'),
'connection_timeout' => 3.0,
'read_write_timeout' => 3.0,
'heartbeat' => 0,
];
<?php
namespace app\service;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\facade\Config;
class RabbitMQService
{
protected $connection;
protected $channel;
public function __construct()
{
$config = Config::get('rabbitmq');
$this->connection = new AMQPStreamConnection(
$config['host'],
$config['port'],
$config['user'],
$config['password'],
$config['vhost'],
false,
'AMQPLAIN',
null,
'en_US',
$config['connection_timeout'],
$config['read_write_timeout'],
null,
$config['heartbeat']
);
$this->channel = $this->connection->channel();
}
public function getChannel()
{
return $this->channel;
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}
<?php
namespace app\service;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitMQProducer extends RabbitMQService
{
public function publish($exchange, $routingKey, $message, $params = [])
{
// 声明交换机
$this->channel->exchange_declare(
$exchange, // 交换机名称
'direct', // 交换机类型(根据需求修改)
false, // 是否被动声明(仅检查是否存在)
true, // 是否持久化
false // 是否自动删除
);
// 声明队列 直连模式
$this->channel->queue_declare(
'my_queue', // 队列名称
false, // 是否被动声明(仅检查是否存在)
true, // 是否持久化
false, // 是否排他(仅对当前连接可见)
false // 是否自动删除
);
$msg = new AMQPMessage(
$message,
array_merge([
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
], $params)
);
$this->channel->basic_publish($msg, $exchange, $routingKey);
}
}
<?php
namespace app\service;
class RabbitMQConsumer extends RabbitMQService
{
public function consume($queue, $callback, $noAck = false)
{
// 声明交换机(如果不存在则创建)
$this->channel->exchange_declare(
'my_exchange', // 交换机名称
'direct', // 交换机类型(根据需求修改)
false, // 是否被动声明
true, // 是否持久化
false // 是否自动删除
);
// 声明队列(如果不存在则创建)
$this->channel->queue_declare($queue, false, true, false, false);
// 将队列绑定到交换机
$this->channel->queue_bind($queue, 'my_exchange', 'my_routing_key');
$this->channel->basic_consume(
$queue,
'',
false,
$noAck,
false,
false,
$callback
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
}
<?php
declare (strict_types = 1);
namespace app\controller;
use app\BaseController;
use app\service\RabbitMQProducer;
class Rabbitmq extends BaseController
{
public function sendMessage()
{
$producer = new RabbitMQProducer();
$producer->publish(
'my_exchange', // 交换机名称
'my_routing_key', // 路由键
json_encode([ // 消息内容
'user_id' => 1234,
'action' => 'login',
'time' => time()
])
);
$producer->close();
return json(['code' => 200, 'msg' => '消息发送成功']);
}
}
<?php
declare (strict_types = 1);
namespace app\command;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use app\service\RabbitMQConsumer;
class ConsumeMessage extends Command
{
protected function configure()
{
// 指令配置
$this->setName('rabbitmq:consume')
->setDescription('消费RabbitMQ消息');
}
protected function execute(Input $input, Output $output)
{
$consumer = new RabbitMQConsumer();
$consumer->consume('my_queue', function ($message) use ($output) {
$data = json_decode($message->body, true);
$output->writeln("收到消息: " . print_r($data, true));
// 处理完消息后确认
$message->ack();
});
$consumer->close();
}
}
测试
http://www.thinkphp8.test/rabbitmq/sendMessage ~ php think rabbitmq:consume #可以看到投递的消息 可以supervisor 管理
消息类型
常见消息模式
- 直连模式:消息根据路由键(routing key)直接路由到队列。生产者和消费者都需要声明队列,确保队列存在。生产者通过交换机和路由键发送消息,消费者从队列接收消息。
- 扇形模式:消息广播到所有绑定的队列,忽略路由键。每个消费者需要声明自己的队列,并绑定到扇形交换机。生产者只需向交换机发送消息,无需关心队列。
- 主题模式:消息根据路由键的模式(如
*.error)路由到多个队列。消费者声明队列并绑定到主题交换机,指定绑定键(如order.#)。生产者根据业务逻辑设置路由键。