php+rabbitmq消息延时队列

php+rabbitmq消息延时队列,第1张

生产端
$connection = new AMQPStreamConnection('192.168.16.111', 5672, 'user', '123456', '/', true);
        $channel    = $connection->channel();
        
        $exchange = 'delayed_exchange_test1';
        //交换机类型
        $args       = new AMQPTable(['x-delayed-type' => 'fanout']);
        //声明交换机
        $channel->exchange_declare($exchange, 'x-delayed-message', false, true, false, false, false, $args);
        
        $data    = 'Hello World at ' . date('Y-m-d H:i:s');
        //'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT消息持久化
        $message = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
        $delay   = 10000;
        //延时设置
        $headers = new AMQPTable(['x-delay' => $delay]);
        $message->set('application_headers', $headers);
        //发送消息
        $channel->basic_publish($message, $exchange);
        //关闭信道
        $channel->close();
        //关闭连接
        $connection->close();

消费端
$exchange = 'delayed_exchange_test1';
        
        $connection = new AMQPStreamConnection('192.168.16.209', 5672, 'opfan', 'opfan1688','/',true);
        
        $channel    = $connection->channel();;
    
        $queue = 'delayed_queue_test';
        //消息延迟队列
        $args = new AMQPTable(['x-dead-letter-exchange' => 'delayed']);
        //队列声明
        $channel->queue_declare($queue, false, true, false, false, false, $args);
    
        //绑定队列,可在消费端绑定
        $channel->queue_bind($queue, $exchange);
        
        $callback = function (AMQPMessage $message) {
            printf(' [x] Message received: %s', $message->body);
            $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
        };
        
        $channel->basic_consume('delayed_queue_test', '', false, false, false, false, $callback);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/790755.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-05
下一篇 2022-05-05

发表评论

登录后才能评论

评论列表(0条)

保存