一、RabbitMQ的定义
RabbitMQ是使用erlang语言开发的开源消息队列系统,完整的实现了AMPQ(高级抽象层消息通信协议)。
二、Mac下RabbitMQ安装
使用Homebrew安装
1$ brew install rabbitmq修改 ~/.bash_profile 配置环境变量:
12# RabbitMQ Configexport PATH=$PATH:/usr/local/sbin重启配置
1$ source ~/.bash_profile启动mq服务(后台启动为rabbitmq-server -detached)
1$ rabbitmq-server登录管理界面 http://127.0.0.1:15672 账号密码为:guest
三、客户端
RabbitMQ官方提供了三种PHP可用的扩展:php-amqp,php-rabbit,php-amqplib
php-amqplib 安装
php的客户端现在常用的是php-amqplib
(1)直接拉取github上面的代码
|
|
(2)composer安装(官网提供)
将composer.json文件添加到您的项目中
{ “require”:{ “php-amqplib / php-amqplib”:“> = 2.6.1” } }
下载依赖
1$ composer install
四、概念
Virtual vhosts
virtual vhosts是一个命名空间,可以存在多个exchange和queue。实现了环境(用户,用户组,exchange,queue)隔离,是权限控制的最小粒度。默认的virtual host为/。
Exchange(交换机)
接受producer发送的消息,并根据binding绑定规则转发到对应的队列。默认是无名交换使用空字符串标识。exchange type(交换机类型)包含四种类型:direct,topic,headers,fanout
(1)direct
转发消息到routigKey指定的队列
(2)topic
类似于direct类型,只不过routigKey为一个句点号“.”分隔的字符串
- 可以替代一个字。
# 可以替换零个或多个单词。
(3)headers
根据发送的消息内容中的headers属性进行匹配。
(4)fanout
将所有收到的消息广播到所有已知的队列。
Queue(消息队列)
queue是mq内部对象,用于存储未被customer消费的消息。相同属性的queue可以重复定义,每个消息都会被投入到一个或多个队列。
Binding(绑定)
binding是将exchange和queue按照路由规则绑定起来。可以理解为binding是exchange和queue之间的关系
Connection(连接)
消息tcp连接
Channel(信道)
每个connection里,可建立多个channel,每个channel代表一个会话任务。做到尽量共用connection
五、RabbitMQ使用示例
send.php:
12345678910111213141516171819202122require_once __DIR__ . '/vendor/autoload.php';use PhpAmqpLib\Connection\AMQPStreamConnection;use PhpAmqpLib\Message\AMQPMessage;// 创建连接$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');// 创建channel,多个channel可以共用连接$channel = $connection->channel();// 创建交换机以及队列(如果已经存在,不需要重新再次创建并且绑定)// 创建直连的交换机$channel->exchange_declare('direct_logs', 'direct', false, false, false);// 创建队列$channel->queue_declare('hello', false, false, false, false);// 交换机跟队列的绑定,$channel->queue_bind('hello', 'direct_logs', 'routigKey');// 设置消息bady传送字符串logs(消息只能为字符串,建议消息均json格式)$msg = new AMQPMessage('logs');// 发送数据到对应的交换机direct_logs并设置对应的routigKey$channel->basic_publish($msg, 'direct_logs', 'routigKey');receive.php:
1234567891011121314151617181920212223242526272829require_once __DIR__ . '/vendor/autoload.php';use PhpAmqpLib\Connection\AMQPStreamConnection;// 创建连接$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');// 创建channel,多个channel可以共用连接$channel = $connection->channel();// 可能会在数据发布之前启动消费者,所以我们要确保队列存在,然后再尝试从中消费消息。// 创建直连的交换机$channel->exchange_declare('direct_logs', 'direct', false, false, false);// 创建队列$channel->queue_declare('hello', false, false, false, false);// 交换机跟队列的绑定,$channel->queue_bind('hello', 'direct_logs', 'routigKey');// 回调函数$callback = function ($msg) {echo $msg->body;};// 启动队列消费者$channel->basic_consume('hello3', '', false, true, false, false, $callback);// 判断是否存在回调函数while(count($channel->callbacks)) {// 此处为执行回调函数$channel->wait();}
六、RabbitMQ备注
- 非持久化会导致,队列重启,数据丢失
- exchange持久化,在声明durable时参数指定为true
- queue持久化,在声明durable时参数指定true
- 消息持久化,实例化AMQPMessage类时指定delivery_mode为2
- exchange和queue是否持久化需要一致才能绑定
- 消费者设置手动ack,在声明no_ack参数时指定false
- 队列消息异常需要将消息删除并再次发送同样的消息置于末尾并手动记录日志
(完)
- 本文作者:Cindy
- 本文标题:Create Custom Demain Name Of Github Pages
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享4.0许可证)