[TOC] #### 1. 前言 --- 前面几讲我们装好了 RabbitMQ,也逛了管理界面,这一讲,终于要写代码了 本文我们用 PHP 连接 RabbitMQ,完成最基础的操作:发一条消息,再把它消费掉 #### 2. 安装扩展 --- PHP 连接 RabbitMQ 用的是 AMQP 协议,需要安装一个扩展: + 前提是你已经装好了 Composer,PHP 版本 >= 7.1 + 它是一个 Composer 包,不需要编译 PHP 扩展,开箱即用 ```bash composer require php-amqplib/php-amqplib ``` #### 3. 先看整体结构 --- 我们一共需要两个文件: + producer.php —— 生产者,发消息 + consumer.php —— 消费者,取消息 它们都会连接 RabbitMQ,但做的事情不一样 #### 4. 生产者:发消息 --- ```php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 1. 建立连接 $connection = new AMQPStreamConnection('192.168.3.20', 5672, 'admin', 'admin123'); $channel = $connection->channel(); // 2. 声明队列(如果不存在就创建) $channel->queue_declare('hello_queue', false, false, false, false); // 3. 创建消息 $message = new AMQPMessage('Hello RabbitMQ ' . date('i:s')); // 4. 发布消息到队列 $channel->basic_publish($message, '', 'hello_queue'); echo "[x] 消息已发送 " . date('i:s') . "\n"; // 5. 关闭连接 $channel->close(); $connection->close(); ``` 运行: ```bash php producer.php ``` 输出: ```plaintext [x] 消息已发送 37:20 ``` 打开管理界面,进入 Queues 页面,你会看到 `hello_queue` 的 `Ready` 变成了 1,消息已经在队列里了 #### 5. 消费者:取消息 --- ```php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 1. 建立连接 $connection = new AMQPStreamConnection('192.168.3.20', 5672, 'admin', 'admin123'); $channel = $connection->channel(); // 2. 声明队列(和生产者保持一致) $channel->queue_declare('hello_queue', false, false, false, false); echo "[*] 等待消息中...\n"; // 3. 定义回调函数:收到消息后怎么处理 $callback = function ($msg) { echo "[x] 收到:{$msg->body}\n"; }; // 4. 开始消费 $channel->basic_consume('hello_queue', '', false, true, false, false, $callback); // 5. 保持运行,等待消息 while ($channel->is_consuming()) { $channel->wait(); } ``` 运行: ```bash php consumer.php ``` 输出: ```plaintext [*] 等待消息中... [x] 收到:Hello RabbitMQ 37:20 ``` 现在消息被消费了。再看管理界面,`hello_queue` 的 `Ready` 变回了 0 #### 6. 逐行解释代码 --- 建立连接 ```php // 四个参数:地址、端口、用户名、密码(和在管理界面登录一样) $connection = new AMQPStreamConnection('192.168.3.20', 5672, 'admin', 'admin123'); ``` 声明队列 ```php // 这行的意思是:如果 hello_queue 不存在,就创建它;如果已经存在,什么都不做 // 生产者和消费者都要声明队列,这样不管谁先启动,队列都不会出问题 $channel->queue_declare('hello_queue', false, false, false, false); ``` 发布消息 ```php $message = new AMQPMessage('Hello RabbitMQ!'); $channel->basic_publish($message, '', 'hello_queue'); ``` | 参数 | 含义 | | ------------- | -------------------------------------------------- | | $message | 消息内容 | | `''` | Exchange 名字(空字符串表示用默认 Exchange)| | `hello_queue` | routing key(这里直接写队列名) | 消费消息 ```php // basic_consume 的参数比较多,现在先不用纠结每个参数是什么意思,先知道它的作用是「从指定队列持续消费消息」 // 后面的参数我们在消费者章节再详细讲 $channel->basic_consume('hello_queue', '', false, true, false, false, $callback); ``` 回调函数 ```php // 每收到一条消息,就会调用这个函数,$msg->body 就是消息的内容 $callback = function ($msg) { echo " [x] 收到:{$msg->body}\n"; }; ``` 保持运行 ```php while ($channel->is_consuming()) { $channel->wait(); } ``` 消费者是一个 `常驻进程`,它不会退出,一直在等消息。 如果有新消息进来,就调用回调函数处理;如果没有消息,就阻塞等待。 刚接触 RabbitMQ 容易误以为消费者执行完一次就结束了,实际上消费者通常会一直运行,持续等待新的消息到来。 这就是为什么消费者通常要放在后台运行(用 Supervisor 或者 systemd 管理)。 #### 7. 完整流程回顾 --- ```plaintext 1. producer.php 连接 RabbitMQ,声明队列,发送消息 2. 消息进入 hello_queue 3. consumer.php 连接 RabbitMQ,声明队列,开始消费 4. 收到消息,执行回调函数 5. 消息被消费,队列清空 ``` #### 8. 本文小结 --- PHP 连接 RabbitMQ 就五步: + Composer 安装 php-amqplib + 建立连接,获取 channel + 声明队列 + 发消息(basic_publish)或取消息(basic_consume) + 关闭连接(消费者不用关,一直运行) 你已经完成了第一个 RabbitMQ 程序,虽然简单,但这就是所有 RabbitMQ 应用的基础模式 后面的章节会在这个基础上加东西:Exchange、路由、ACK、持久化 …… 一步一步来,不着急,下一讲,我们来深入了解一下 Queue 队列。