RabbitMQ(01)——RabbitMQ的HelloWorld消息模型

RabbitMQ(01)——RabbitMQ的HelloWorld消息模型,第1张

RabbitMQ(01)——RabbitMQ的HelloWorld消息模型

文章目录

RabbitMQ——RabbitMQ的HelloWorld消息模型(直连)

1、HelloWorld消息模型之发布者发布消息2、HelloWorld消息模型之消费者消费消息3、连接工具类的封装

RabbitMQ——RabbitMQ的HelloWorld消息模型(直连)

AMPQ模型

AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。

AMPQ结构图为:

1、HelloWorld消息模型之发布者发布消息

HelloWorld消息模型结构

P:生产者,要发送消息的程序

C:消费者:消息的接受者,会一直等待消息到来。

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

新建一个maven项目,导入依赖


  com.rabbitmq
  amqp-client
  5.7.2

创建虚拟主机ems和用户ems,并把虚拟主机添加到用户中

消息生产者的开发

package com.cheng.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

//消息生产者,消息生产者需要与rabbitmq建立连接
public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接rabbitmq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接rabbitmq位于的主机
        connectionFactory.setHost("121.199.53.150");
        //设置端口号,应用访问端口
        connectionFactory.setPort(5672);
        //设置虚拟主机
        connectionFactory.setVirtualHost("/ems");
        //设置可以访问虚拟主机用户的用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("ems");
        //获取连接对象
        Connection connection = connectionFactory.newConnection();
        //获取连接中的通道对象
        Channel channel = connection.createChannel();
        //通道对象要与消息队列建立连接,也可以说是绑定,绑定了才知道消息要发送到哪个队列
        //创建队列:参数1:队列名称,如果队列不存在会自动创建
        //参数二:队列是否要持久化,false不持久化,true持久化,如果不持久化,当rabbitmq重启服务后,队列会丢失,队列中未消费的消息自然也会丢失;如果持久化,当rabbitmq重启服务后,队列不会丢失,但队列中未消费的消息还会丢失
        //参数三:是否连接独占队列,false不独占
        //参数四:在消费完消息后是否自动删除队列
        //参数五:额外参数
        channel.queueDeclare("hello",true,false,false,null);

        //发布消息到队列queue中
        //参数一:交换机名称,直连模式没有交换机
        //参数二:队列名称
        //参数三:传递消息额外设置,MessageProperties.PERSISTENT_TEXT_PLAIN:消息持久化,当rabbitmq重启服务后,未消费的消息不会丢失
        //参数四:消息具体内容
        channel.basicPublish("","hello",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());

        //关闭通道和连接
        channel.close();
        connection.close();

    }
}

运行后,查看rabbitmq的管理控制页面:

Queues里面多了一个名为hello队列,里面有一条消息,HelloWorld消息模型消息发布成功。

2、HelloWorld消息模型之消费者消费消息

消息消费者的开发

package com.cheng.helloworld;

import com.rabbitmq.client.*;
import sun.rmi.transport.Endpoint;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接rabbitmq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接rabbitmq位于的主机
        connectionFactory.setHost("121.199.53.150");
        //设置端口号,应用访问端口
        connectionFactory.setPort(5672);
        //设置虚拟主机
        connectionFactory.setVirtualHost("/ems");
        //设置可以访问虚拟主机用户的用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("ems");
        //获取连接对象
        Connection connection = connectionFactory.newConnection();
        //获取连接中的通道对象
        Channel channel = connection.createChannel();
        //通道对象要与消息队列建立连接,也可以说是绑定,绑定了才知道消息要从哪个队列获取
        //创建队列:参数1:队列名称,如果队列不存在会自动创建
        //参数二:定义队列名称是否要持久化,false不持久化
        //参数三:是否连接独占队列
        //参数四:在消费完消息后是否自动删除队列
        //参数五:额外参数
        channel.queueDeclare("hello",true,false,false,null);

        //发布消息到队列中
        //参数一:交换机名称,直连模式没有交换机
        //参数二:开启消息的自动确认机制
        //参数三:消费者消费时的回调接口
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("new String(body) = "+ new String(body));
            }
        });

       //不关闭通道和连接,让消息消费者一直监听队列
    }
}

运行,查看控制台输出:

消费者消费完消息后,回调方法成功执行。

并且hello队列里的消息数量变为0:

注意:这里消费者会消费队列里所有的消息。

3、连接工具类的封装
public class ConnectUtils {

    private static  ConnectionFactory connectionFactory;

    static {
        connectionFactory = new ConnectionFactory();
    }

    public static Connection getConnection(String host,Integer port,String virtualHost,String username,String pwd){
        try {
            connectionFactory.setHost(host);
            connectionFactory.setPort(port);
            connectionFactory.setVirtualHost(virtualHost);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(pwd);
            return connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5701715.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存