- 1.rabbit mq和mqtt
- 1.1mq
- 1.2mqtt
- 2.集成rabbitmq收发消息
- 2.1引入pom文件增加配置文件
- 2.2参数说明
- 2.2.1交换机
- 2.2.2队列
- 2.2.3消息
- 2.3发送消息
- 2.4接收消息
- 3.集成mqtt收发消息
- 3.1引入pom文件增加配置文件
- 3.2发送消息
- 3.3接收消息
- 4.github源码地址
之前介绍过rabbitmq的安装和底层实现有兴趣可以查阅之前的博客。
重要的说明:
- 生产者 producer:生产者向队列发送消息
- 消费者 consumer:消费者从队列获取消息
- 交换机 exchange:生产者将消息发给交换机,由交换机分发消息
- 通道 channel:信道,用于通信
- 队列 queue:储存消息的地方
- 路由键 RoutingKey:指定当前消息被谁接受
- 绑定key BindingKey:指定当前Exchange下,什么样的路由键会被下派到当前绑定的Queue中
今天抽象型的说一下mq各部分的运行机制:
交换机: 队列与交换机绑定,然后指定路由键,这样消息发送到交换机,交换机根据路由键去投递到匹配的队列中。
- Direct Exchange 直连交换机
需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。 - Fanout Exchange 扇形交换机
一种不使用路由键的交换机,收到的消息直接投递到绑定的全部队列上,一个队列一份。 - Topic Exchange 主题交换机
路由键匹配,他的路由键支持通配符比如"test/",可以匹配到多个路由键,假设交换机收到两条消息路由键是“test/1”“test/2”,那么交换机会发送给路由键是"test/"的交换机一份。
“#”会匹配之后多级的目录,“”只能匹配后一级目录。
test/ : test/1
test/# : test/1/2/3/4/5/6/7/8/9/10
队列:
- 简单队列:Hello World
一个生产者对应一个消费者没有中间商赚差价(交换机)。 - work模式:Work queus
一个生产者多个消费者,消息只会被一个消费者处理,同一条消息不会被重复消费,也没有中间商赚差价(交换机)。 - 订阅模式:Publish/Subscribe
类似于mqtt协议的主题订阅,消息先到交换机,交换机再将消息分配给已经绑定的消费者。 - 路由模式:Routing
和订阅模式极度相同,队列可以设置多个key绑定交换机任何一个匹配上了都可以接收消息,比如key为 test/get、test/put,test/get和test/put任何一个来消息了路由器都会投递到队列中。 - 通配符模式:Topics
和交换机的同通配符差不多,只是这次通配符用在了队列绑定交换机的key上 - RPC(没啥用)
MQTT是当前物联网使用比较多的一个协议,他是以主题订阅的形式进行工作,类似于八婆传瞎话,某一个八婆在某个小团体中说别人的坏话,然后你不在它的小团里不行,它说别人坏话的时候你不在也听不见。
(mqttclient订阅某一个主题,然后所有订阅这个主题的用户都可以收发消息,某个客户端发送消息其他所有在线用户会同步收到,即时消息不会存储,不在线收不到消息,即使下线又上线之前不在线的消息也不会收到)
注意:一定要现有队列才可以发送消息,发送消息并不会创建交换机和队列,可以先让消费者启动也就是RabbitListener他会根据配置创建。
2.1引入pom文件增加配置文件POM文件
org.springframework.boot spring-boot-starter-amqp2.1.6.RELEASE
porperties
spring: #rabbitmq rabbitmq: port: 5672 username: 账号 password: 密码 virtual-host: / host: ip listener: simple: ## auto表示自动完成这次消费,manual表示需要手动告知完成消费 acknowledge-mode: manual ## listener开几个线程处理数据 concurrency: 1 ## linstener 最大开几个线程 max-concurrency: 1 ## 一次拿几条数据 prefetch: 1 # 开启重试,重试5次 间隔1秒 retry: # 开启消费者(程序出现异常)重试机制,默认开启并一直重试 enabled: true # 最大重试次数 max-attempts: 5 # 重试间隔时间(毫秒) initial-interval: 1000 # 是否进入死信队列 true是 false不是 default-requeue-rejected: false2.2参数说明 2.2.1交换机
@AliasFor("name") String value() default ""; // 交换机名称(两个都是) @AliasFor("value") String name() default ""; // 交换机名称(两个都是) String type() default "direct"; // 交换机类型 String durable() default "true"; // 是否是持久化的,即使rabbitmq重启,交换机是否存在 String autoDelete() default "false"; // 当没有队列绑定交换机自动销毁 String internal() default "false"; // 是否为内部交换机,内部交换机只能路由交换机到交换机 String ignoreDeclarationExceptions() default "false"; // 忽略声明异常 String delayed() default "false"; // 是否开启延迟消息,需要使用延迟消息插件 Argument[] arguments() default {}; // 结构化参数,发送消息的时候,额外设置消息的参数(也就是header信息) String declare() default "true"; // 是否有管理员 String[] admins() default {}; // 返回应该声明此组件的管理bean名称列表。默认情况下,所有管理员都将声明它2.2.2队列
@AliasFor("name") String value() default ""; // 队列名称(两个都是) @AliasFor("value") String name() default ""; // 队列名称(两个都是) String durable() default ""; // 是否是持久化的,即使rabbitmq重启,队列是否存在 String exclusive() default ""; // 是否是排他队列,是否只在第一次创建它的Connection中有效,当Connection关闭,该Queue也会被删除 String autoDelete() default ""; // 当没有消费者自动销毁队列 String ignoreDeclarationExceptions() default "false"; // 忽略声明异常 Argument[] arguments() default {}; // 结构化参数,发送消息的时候,额外设置消息的参数(也就是header信息) String declare() default "true"; // 是否有管理员 String[] admins() default {}; // 返回应该声明此组件的管理bean名称列表。默认情况下,所有管理员都将声明它2.2.3消息
- 设置exchange为持久化之后,并不能保证消息不丢失,因为此时发送往exchange中的消息并不是持久化的,需要配置delivery_mode=2指明message为持久的。FanoutExchange 发送的消息默认就是持久化
@Component public class SendMessage implements CommandLineRunner { private int index = 0; private RabbitTemplate rabbitTemplate; public SendMessage(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @Override public void run(String... args) throws InterruptedException { //测试使用代码 Boolean where = true; while (where) { System.out.println("Sending message..."); try { rabbitTemplate.convertAndSend("test_exchange", "", "发送消息" + index); } catch (Exception e) { e.printStackTrace(); } index++; Thread.sleep(1000); } } }2.4接收消息
@Component public class ReceiveMessage { @Autowired private MqttClient mqttClient; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "test_queue", durable = "true"), exchange = @Exchange(name = "test_exchange", type = "fanout"), key = "")) @RabbitHandler private void receive1(Message message, Channel channel) { //消费者 *** 作 try { receiveMessage(new String(message.getBody())); //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 正常消费队列,第二个参数的意思是小于该ack的消息是否等待全部完成后再一次性签收 // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);// 将消息重新放回队列 } catch (Exception e) { e.printStackTrace(); } } public void receiveMessage(String message) { // 1.接收AMQP消息 Thread t = Thread.currentThread(); String name = t.getName(); System.out.println("name=" + name); System.out.println("-------------------接收消息-------------------"); System.out.println(message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 2.发送mqtt 消息 String mqttTopic = "test"; MqttMessage msg = new MqttMessage(); String msgStr = message; msg.setPayload(msgStr.getBytes());//设置消息内容 msg.setQos(0);//设置消息发送质量,可为0,1,2. msg.setRetained(false);//服务器是否保存最后一条消息,若保存,client再次上线时,将再次受到上次发送的最后一条消息。 try { mqttClient.publish(mqttTopic, msg);//设置消息的topic,并发送 } catch (MqttException e) { e.printStackTrace(); } } }3.集成mqtt收发消息 3.1引入pom文件增加配置文件
org.eclipse.paho org.eclipse.paho.client.mqttv3${mqttv3.version}
#mqtt mqtt: clientid: mqtt-system host: 182.92.101.122 username: root pwd: hydf@8888 completionTimeout: 30000
@Configuration @RefreshScope public class MqttConfig { @Value("${mqtt.host}") private String mqttHost; @Value("${mqtt.username}") private String mqttUserName; @Value("${mqtt.clientid}") private String clientId; @Value("${mqtt.pwd}") private String mqttPwd; @Value("${mqtt.completionTimeout}") private Integer completionTimeout; @Bean public MqttClient mqttClient() throws MqttException { //1.初始化mqtt参数 MqttConnectOptions mOptions = new MqttConnectOptions(); mOptions.setAutomaticReconnect(true);//断开后,是否自动连接 mOptions.setCleanSession(false);//是否清空客户端的连接记录。若为true,则断开后,broker将自动清除该客户端连接信息 mOptions.setConnectionTimeout(completionTimeout);//设置超时时间,单位为秒 mOptions.setUserName(mqttUserName);//设置用户名。跟Client ID不同。用户名可以看做权限等级 mOptions.setPassword(mqttPwd.toCharArray());//设置登录密码 mOptions.setKeepAliveInterval(60);//心跳时间,单位为秒。即多长时间确认一次Client端是否在线 mOptions.setMaxInflight(10);//允许同时发送几条消息(未收到broker确认信息) //2.创建mqtt客户端 MqttClient client = null; try { client = new MqttClient("tcp://"+mqttHost, clientId,null); client.connect(mOptions);//连接broker client.setCallback(mqttCallback);//设置回调 } catch (MqttException e) { e.printStackTrace(); } // 订阅主题 client.subscribe("test"); return client; } static MqttCallback mqttCallback = new MqttCallback() { @Override public void connectionLost(Throwable cause) { System.out.println("MQTT Lost"); } @Override public void messageArrived(String topic, MqttMessage message){ System.out.println("收到消息!"); System.out.println(new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("MQTT delivery Complete "); } }; }3.2发送消息
// 2.发送mqtt 消息 String mqttTopic = "test"; MqttMessage msg = new MqttMessage(); String msgStr = message; msg.setPayload(msgStr.getBytes());//设置消息内容 msg.setQos(0);//设置消息发送质量,可为0,1,2. msg.setRetained(false);//服务器是否保存最后一条消息,若保存,client再次上线时,将再次受到上次发送的最后一条消息。 try { mqttClient.publish(mqttTopic, msg);//设置消息的topic,并发送 } catch (MqttException e) { e.printStackTrace(); }3.3接收消息
接收消息代码,在3.1中初始化client的时候绑定的回调
4.github源码地址https://github.com/1142235090/frame
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)