ThinkPHP8集成RabbitMQ

环境配置
# 添加Erlang仓库
wget -O- https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc | sudo apt-key add -
echo "deb https://packages.erlang-solutions.com/ubuntu $(lsb_release -sc) contrib" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
# 更新包索引
sudo apt-get update
# 安装Erlang
sudo apt-get install -y erlang-base \
                        erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
                        erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
                        erlang-runtime-tools erlang-snmp erlang-ssl \
                        erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
# 添加RabbitMQ仓库
echo "deb https://dl.bintray.com/rabbitmq/debian $(lsb_release -sc) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
# 导入RabbitMQ公钥
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
# 安装 rabbitmq-server
sudo apt-get install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server  # 设置开机自启
# 安装librabbitmq开发库
sudo apt-get install -y librabbitmq-dev
# 安装php 扩展
pecl install amqp
# 安装php 客户端
composer require videlalvaro/php-amqplib
集成代码
//rabbitmq配置
<?php
return [
    'host' => env('RABBITMQ_HOST', 'localhost'),
    'port' => env('RABBITMQ_PORT', 5672),
    'user' => env('RABBITMQ_USER', 'guest'),
    'password' => env('RABBITMQ_PASSWORD', 'guest'),
    'vhost' => env('RABBITMQ_VHOST', '/'),
    'connection_timeout' => 3.0,
    'read_write_timeout' => 3.0,
    'heartbeat' => 0,
];
<?php
namespace app\service;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\facade\Config;

class RabbitMQService
{
    protected $connection;
    protected $channel;

    public function __construct()
    {
        $config = Config::get('rabbitmq');
        $this->connection = new AMQPStreamConnection(
            $config['host'],
            $config['port'],
            $config['user'],
            $config['password'],
            $config['vhost'],
            false,
            'AMQPLAIN',
            null,
            'en_US',
            $config['connection_timeout'],
            $config['read_write_timeout'],
            null,
            $config['heartbeat']
        );
        $this->channel = $this->connection->channel();
    }

    public function getChannel()
    {
        return $this->channel;
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}
<?php
namespace app\service;
use PhpAmqpLib\Message\AMQPMessage;

class RabbitMQProducer extends RabbitMQService
{
    public function publish($exchange, $routingKey, $message, $params = [])
    {
        // 声明交换机
        $this->channel->exchange_declare(
            $exchange,      // 交换机名称
            'direct',       // 交换机类型(根据需求修改)
            false,          // 是否被动声明(仅检查是否存在)
            true,           // 是否持久化
            false           // 是否自动删除
        );
        // 声明队列 直连模式
        $this->channel->queue_declare(
            'my_queue', // 队列名称
            false, // 是否被动声明(仅检查是否存在)
            true, // 是否持久化
            false, // 是否排他(仅对当前连接可见)
            false // 是否自动删除
        );
        $msg = new AMQPMessage(
            $message,
            array_merge([
                'content_type' => 'text/plain',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ], $params)
        );

        $this->channel->basic_publish($msg, $exchange, $routingKey);
    }
}
<?php
namespace app\service;
class RabbitMQConsumer extends RabbitMQService
{
    public function consume($queue, $callback, $noAck = false)
    {
        // 声明交换机(如果不存在则创建)
        $this->channel->exchange_declare(
            'my_exchange',  // 交换机名称
            'direct',       // 交换机类型(根据需求修改)
            false,          // 是否被动声明
            true,           // 是否持久化
            false           // 是否自动删除
        );
        // 声明队列(如果不存在则创建)
        $this->channel->queue_declare($queue, false, true, false, false);
        // 将队列绑定到交换机
        $this->channel->queue_bind($queue, 'my_exchange', 'my_routing_key');
        $this->channel->basic_consume(
            $queue,
            '',
            false,
            $noAck,
            false,
            false,
            $callback
        );
        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }
}
<?php
declare (strict_types = 1);
namespace app\controller;
use app\BaseController;
use app\service\RabbitMQProducer;
class Rabbitmq extends BaseController
{
    public function sendMessage()
    {
        $producer = new RabbitMQProducer();
        $producer->publish(
            'my_exchange',      // 交换机名称
            'my_routing_key',   // 路由键
            json_encode([       // 消息内容
                'user_id' => 1234,
                'action' => 'login',
                'time' => time()
            ])
        );
        $producer->close();
        return json(['code' => 200, 'msg' => '消息发送成功']);
    }
}
<?php
declare (strict_types = 1);

namespace app\command;

use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use app\service\RabbitMQConsumer;

class ConsumeMessage extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('rabbitmq:consume')
             ->setDescription('消费RabbitMQ消息');
    }

    protected function execute(Input $input, Output $output)
    {
        $consumer = new RabbitMQConsumer();
        $consumer->consume('my_queue', function ($message) use ($output) {
            $data = json_decode($message->body, true);
            $output->writeln("收到消息: " . print_r($data, true));
            
            // 处理完消息后确认
            $message->ack();
        });
        $consumer->close();
    }
}
测试
http://www.thinkphp8.test/rabbitmq/sendMessage
~ php think rabbitmq:consume #可以看到投递的消息 可以supervisor 管理
消息类型

常见消息模式

  • 直连模式:消息根据路由键(routing key)直接路由到队列。生产者和消费者都需要声明队列,确保队列存在。生产者通过交换机和路由键发送消息,消费者从队列接收消息。
  • 扇形模式:消息广播到所有绑定的队列,忽略路由键。每个消费者需要声明自己的队列,并绑定到扇形交换机。生产者只需向交换机发送消息,无需关心队列。
  • 主题模式:消息根据路由键的模式(如*.error)路由到多个队列。消费者声明队列并绑定到主题交换机,指定绑定键(如order.#)。生产者根据业务逻辑设置路由键。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注