PHP *** 作Rabbitmq

PHP *** 作Rabbitmq,第1张

PHP *** 作Rabbitmq lumen 框架安装rabbitmq依赖包
	composer require php-amqplib/php-amqplib
推送消息队列
$connection = new AMQPStreamConnection('127.0.0.1','5672','admin','admin');

//创建通道
$chan = $connection->channel();

$chan->/confirm/i_select(); // 发布确认模式
//推送成功
$chan->set_ack_handler(
   function (AMQPMessage $message) {
       var_dump($message);
   }
);

//推送失败
$chan->set_nack_handler(
   function (AMQPMessage $message) {
       var_dump($message);
   }
);


//创建交换机
$exchangeName = 'test_exchange';//交换机名称
$chan->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, true, false);

//创建队列
$queueName = 'test_queue';
$chan->queue_declare($queueName, false, true, false, false);

//绑定队列与交换机
$routeKey = 'test_route';//路由名称
$chan->queue_bind($queueName,$exchangeName,$routeKey);

//发布消息
$data = [
   'msg_id'=>1,
   'data'=>'测试入队'
];


$msg = new AMQPMessage(json_encode($data),['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$chan->basic_publish($msg, 'test_exchange_topic', 'test_route');

$chan->/confirm/i_select_ok();
// 监听成功或失败返回结束 成功/失败 => set_ack_handler/set_nack_handler
$chan->close();
$connection->close();
消费消息
$connection = new AMQPStreamConnection('127.0.0.1','5672','admin','admin');

//创建通道
$chan = $connection->channel();

//创建交换机
$exchangeName = 'test_exchange';//交换机名称
$chan->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, true, false);

//创建队列
$queueName = 'test_queue';
$chan->queue_declare($queueName, false, true, false, false);

//绑定队列与交换机
$routeKey = 'test_route';
$chan->queue_bind($queueName,$exchangeName,$routeKey);

//获取队列中的消息
$chan->basic_consume($queueName, 'test', false, true, false, false, function($message){
    var_dump($message->body);
});

while($chan->is_consuming()){
    $chan->wait();
}
命令行运行该脚本,就会阻塞获取rabbitmq中的消息。

# PUSH延时队列
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin');
$channel = $connection->channel();
$channel->/confirm/i_select(); // 发布确认模式
//推送成功
$channel->set_ack_handler(
    function (AMQPMessage $message) {
        var_dump($message);
    }
);

//推送失败
$channel->set_nack_handler(
    function (AMQPMessage $message) {
        var_dump($message);
    }
);
//申明两个路由
$delayExchangeRoute = 'delay_exchange_route';//需要延迟交换机绑定路由
$workExchangeRoute = 'work_exchange_route';//消费交换机绑定路由

//给delay_exchange交换机push 延时消息,时间到后转发到work_exchange交换机处理,
//声明两个交换机
$delayExchange = 'delay_exchange';
$workExchange = 'work_exchange';
$channel->exchange_declare($delayExchange, 'direct',false,false,false);
$channel->exchange_declare($workExchange, 'direct',false,false,false);

$tale = new AMQPTable();
$tale->set('x-dead-letter-exchange', $workExchange);//很关键  表示过期后由哪个work_exchange处理
$tale->set('x-dead-letter-routing-key',$delayExchangeRoute);//很关键  表示过期后根据什么路由策略转发到work_exchange
// $tale->set('x-message-ttl',16000);  //存活时长   下面的过期时间不能超过

//绑定需要延迟的队列与交换机
$delayQueue = 'delay_queue';
$channel->queue_declare($delayQueue,false,true,false,false,false,$tale);
$channel->queue_bind($delayQueue, $delayExchange,$delayExchangeRoute);

//绑定真正消费的对列与交换机
$workQueue = 'work_queue';
$channel->queue_declare($workQueue,false,true,false,false,false,$tale);
$channel->queue_bind($workQueue, $workExchange,$workExchangeRoute);



$data = [
    'msg_id'=>2,
    'data'=>'ceshi延迟队列'
];
$msg = new AMQPMessage(json_encode($data,256), array(
    'expiration' => 50000,//延迟50秒
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT

));

$channel->basic_publish($msg,$delayExchange,$delayExchangeRoute);
$channel->/confirm/i_select_ok();
$channel->close();
$connection->close();

执行脚本,就会将消息push到delay_queue 队列,50秒后就会转发到work_queue队列,监听了work_queue的进程就会立马消费。

创建交换机参数详解
  • exchange: 交换机名称。
  • type: 交换机类型。
    1、direct:完整的路由匹配。
    2、topic:将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”。
    3、 fanout:不处理路由,消息发送到交换机,交换机会转发到所有绑定到该交换机的队列。
    4、headers:不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue
    与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑
    定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。
    headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,
    topic 的路由键都需要要字符串形式的。
  • passive: 设为false,交换机不存在就创建,并相信成功;设为true,交换机不存酒就会报错。
  • durable:是否持久化消息。
  • auto_delete:没有队列绑定到该交换机后,自动删除该exchange。
  • internal:当前Exchange是否用于RabbitMQ内部使用,默认为false,即不允许使用客户端推送消息。
  • nowait:如果为true则表示不等待服务器回执信息,函数将返回null,可以提高访问速度。
  • arguments: 扩展参数,定制化的需求,比如优先级等。
  • ticket:
创建队列参数详解
  • queue:队列名称。
  • passive:如果为true,如果存在这个队列,则会返回队列的信息,如果不存在这个队列,则会抛异常,设为fasle,不存在就会自动创建。
  • durable:持久化。
  • exclusive:设为true,该队列只有当前这个进程可以消费,其他客户端不能读取到这个队列。如果当进程断开连接,这个队列也会被销毁,不管是否设置了持久化或者自动删除。
  • auto_delete:没有消息时候,自动删除该队列。
  • nowait:不等待处理结果。
  • arguments:额外参数。
  • ticket:
延迟队列插件

上面的延迟队列是通过 DLX + TTL 方式实现的,他的原理是先将消息路由到一个正常的队列,当消息过期时间到后,交换机会自动路由到死信队列消费。这样需要两个队列来实现,并且由于队列先进先出原理,会阻塞后面推送的非延迟消息或者延迟时间更短的消息。因此rabbit官方提供了延迟插件解决类似问题。

  1. 安装插件

    下载地址:https://www.rabbitmq.com/community-plugins.html
    复制 rabbitmq_delayed_message_exchange-3.8.0.ez 到rabbitmq的plugins目录

  2. 启用插件

    rabbitmq-plugins enable rabbitmq_delayed_message_exchang

    查看已安装的插件

    可以通过rabbitmq-plugins list 查看已安装的插件

  3. 实现过程
$channel->exchange_declare('delayed_message_exchange', 'x-delayed-message',false, true, false, false, false, new AMQPTable(['x-delayed-type' => 'direct']));
$channel->queue_declare('delay_message_queue',false,true,false,false,false);
$channel->queue_bind('delay_message_queue', 'delayed_message_exchange','delayed_message_exchange_route');

$data = [
    'msg_id'=>18,
    'time'=>time(),
    'desc'=>'测试延迟18s'
];

$msg = new AMQPMessage(json_encode($data,256), array(
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
));

$msg->set('application_headers', new AMQPTable([
    'x-delay' => 18000 // 延迟时间,单位毫秒
]));

$channel->basic_publish($msg,'delayed_message_exchange','delayed_message_exchange_route');
消息的可靠投递

事务机制

$chan->tx_select();
$chan->tx_commit();
$chan->tx_rollback();

事务机制能保证消息的可靠投递,但会带来性能的损耗,rabbitmq官方提供了更轻量级的实现方案:发送、确认机制。

发送/确认机制

//推送成功
$chan->set_ack_handler(
 	function (AMQPMessage $message) {
 		echo('投递成功:'.$message->body . PHP_EOL);
 	}
);
//推送失败
$chan->set_nack_handler(
	function (AMQPMessage $message) {
    	echo('投递失败:'.$message->body . PHP_EOL);
  	}
);
$chan->/confirm/i_select(); // 发布确认模式

//消息push之后调用 wait_for_pending_acks 方法
$chan->wait_for_pending_acks();

注:消息只能保证被成功推送到交换机,如果交换机不能正确路由到queue,消息也会丢失。因为交换机投递消息到queue是异步的。

其他补充方案配合使用
复杂情况下,我们需要记录消息的整个历程,消息push之前,将消息入库,记录状态0,投递成功记录状态1,消息被消费之后记录状态2。业务场景根据需要,可以通过定时检查入库的消息,没有消费或者没有投递成功的重新投递,同时为了保证消费者不重复消费,需要做好消费端的幂等性。

消息的可靠消费

消息的消费确认机制
rabbitmq的消息消费确认机制是通过消费者发送给MQ服务端的,分为自动发送和手动发送。即basic_consume方法的 no_ack 参数。该参数为ture,表示自动发送确认消费机制,即消费者接收到消息,就告诉rabbitmq,我已经消费消息了。设为false:即手动发送确认消费机制,需要在消费逻辑处理完成后加上以下代码:

$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);

演示可靠消费
这里假设有两个消费者(consumer_01和consumer_02)监听同一个队列,通过模拟consumer_01在处理消息时候出现异常退出,来确认rabbitmq会自动将消息发送给consumer_02消费。
生产者代码:

$connection = new AMQPStreamConnection('127.0.0.1','5672','admin','admin');
//创建通道
$chan = $connection->channel();
//推送成功
$chan->set_ack_handler(
    function (AMQPMessage $message) {
        echo('投递成功:'.$message->body . PHP_EOL);
    }
);
//推送失败
$chan->set_nack_handler(
    function (AMQPMessage $message) {
        echo('投递失败:'.$message->body . PHP_EOL);
    }
);
$chan->/confirm/i_select(); // 发布确认模式

//创建交换机
$exchangeName = 'test_exchange';//交换机名称  AMQPExchangeType::DIRECT
$res = $chan->exchange_declare($exchangeName,AMQPExchangeType::DIRECT , false, true, true, false, false);

//创建队列
$queueName = 'test_queue';
$chan->queue_declare($queueName, false, true, false, false);

//绑定队列与交换机
$routeKey = 'test_route';//路由名称
$chan->queue_bind($queueName,$exchangeName,$routeKey);

//发布消息
$data = [
    'msg_id'=>1,
    'time'=>date('Y-m-d H:i:s'),
    'data'=>'测试入队'
];
$msg = new AMQPMessage(json_encode($data,256),['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$chan->basic_publish($msg, 'test_exchange', 'test_route');
$chan->wait_for_pending_acks();
$chan->close();
$connection->close();

消息投递后在rabbitmq 可视化后台可以看到如下信息:

注意图中红框参数,Ready值为1,Total值为1。下面模拟消费者:

consumer_01代码:

$connection = new AMQPStreamConnection('127.0.0.1','5672','admin','admin');
$chan = $connection->channel();
$chan->basic_consume('test_queue', 'consumer_01', false, false, false, false, function($message){
    echo('consumer_01接收到消息:' . $message->body . '==时间' . date('Y-m-d H:i:s') . PHP_EOL);
    echo('consumer_01消息处理中:' . PHP_EOL);
    sleep(60);//睡眠60秒模拟消息处理逻辑
    echo('consumer_01消息处理完成:' .  '==时间' . date('Y-m-d H:i:s') . PHP_EOL);
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    echo('------------------------------'.PHP_EOL);

});
while($chan->is_consuming()){
    $chan->wait();
}

consumer_02代码:

$connection = new AMQPStreamConnection('127.0.0.1','5672','admin','admin');
$chan = $connection->channel();
$queueName = 'test_queue';
$chan->basic_consume($queueName, 'test', false, false, false, false, function($message){
    echo('consumer_02接收到消息:' . $message->body . '==时间' . date('Y-m-d H:i:s') . PHP_EOL);
    echo('consumer_02消息处理中:' . PHP_EOL);
    sleep(5);//睡眠5秒模拟消息处理
    echo('consumer_02消息处理完成:' .  '==时间' . date('Y-m-d H:i:s') . PHP_EOL);
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    echo('------------------------------'.PHP_EOL);
});
while($chan->is_consuming()){
    $chan->wait();
}

启动consumer_01:如下图:

此时将该进程杀死,然后rabbitmq后台消息状态如图:出现Unacked 值为1,即未确认消息数量为1。

然后启动consumer_02,rabbitmq就会自动将消息发送给consumer_02消费。如下图所示:


当多个消费者监听同一个队列的时候,MQ会采用简单轮询机制发送消息给消费者的,即队列中的第一条消息发给consumer_01,第二条消息发给consumer_02,然后第三条发给consumer_01,这样依次反复。
但真实场景可能consumer_01处理消息的能力比consumer_02慢很多,比如consumer_01处理一条消息要10秒,consumer_02处理一条消息只需要5秒。这样就可能导致consumer_01的消息堆积,因此rabbitmq提供了basic_qos 方法,将prefetch_count参数设为1,即告诉rabbitmq,如果我有消息正在处理,还没有返回消费确认信息,则不要给我发消息。MQ就会将消息发送给其他监听了该队列并且空闲的消费者。

$chan->basic_qos(0, 1, false);

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5154822.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-18
下一篇 2022-11-18

发表评论

登录后才能评论

评论列表(0条)

保存