- mq的结构图,如下:
同理,consumer也是先向nameserver获取从哪一个broker获取消息,再得到nameserver的响应之后,才开始获取消息,consumer会一直监听消息的发送.
异步与同步的区别:
4 发送同步,异步消息:
同步: 指的是消息的发送者在发送消息后,处于阻塞状态,等待消息的消费方发送确认后返回.
Message msg = new Message(“springboot-mq”, “Tag1”, (“Hello World” + i).getBytes());
//5.发送消息
SendResult result = producer.send(msg);
//发送状态
SendStatus status = result.getSendStatus();
//消息id(每条消息都有一个id)
String msgId = result.getMsgId();
//消息队列id
int queueId = result.getMessageQueue().getQueueId();
System.out.println(“发送结果:” + result);
//线程睡1秒
TimeUnit.SECONDS.sleep(1);
}
//6.关闭生产者producer
producer.shutdown();
}
}
异步: 消息发送到之后,不会去等待mq回传发送结果,异步消 息的可靠性没有同步消息高,异步消息的发送可以通过回调函数的方式接受消息的处理.
应用场景为对消息发送性能比较高的场景.不能让消息延迟
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer(“group1”);
//2.指定Nameserver地址
producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 Message msg = new Message("base", "Tag2", ("Hello World" + i).getBytes()); //5.发送异步消息 producer.send(msg, new SendCallback() { public void onSuccess(SendResult sendResult) { System.out.println("发送结果:" + sendResult); } public void onException(Throwable e) { System.out.println("发送异常:" + e); } }); //线程睡1秒 TimeUnit.SECONDS.sleep(1); } //6.关闭生产者producer producer.shutdown();
}
}
5 单向消息发送: 只管发,不管结果(例如日志发送)
Message msg = new Message(“base”, “Tag3”, (“Hello World,单向消息” + i).getBytes());
//5.发送单向消息
producer.sendoneway(msg);
//线程睡1秒
TimeUnit.SECONDS.sleep(5);
}
//6.关闭生产者producer
producer.shutdown();
}
}
6 消费消息的流程: } 9.2 消息的顺序消费(采用单线程确保消费的顺序性) public class Consumer { } 12 过滤消息的方式: 当消息生产者将消息发送至broker后,consumer可以根据一定的规则对消息进行过滤,选择需要消费的消息 } (2) Sql:复杂场景推荐使sql过滤. } 欢迎分享,转载请注明来源:内存溢出
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List
10 延迟消息: 在发送消息的时候,设置消息被延迟消费的时间
延迟可选如下:
11 批量消息发送:
批量发送消息能显著提高消息传递的性能,这一批消息的总大小不应超过4M.超过范围则需要进行分割.需要手写一个分割算法,计算消息的大小.
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer(“group1”);
//2.指定Nameserver地址
producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
//3.启动producer
producer.start();
List msgs = new ArrayList();
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg1 = new Message(“BatchTopic”, “Tag1”, (“Hello World” + 1).getBytes());
Message msg2 = new Message(“BatchTopic”, “Tag1”, (“Hello World” + 2).getBytes());
Message msg3 = new Message(“BatchTopic”, “Tag1”, (“Hello World” + 3).getBytes());
msgs.add(msg1);
msgs.add(msg2);
msgs.add(msg3);
//5.发送消息
SendResult result = producer.send(msgs);
//发送状态
SendStatus status = result.getSendStatus();
System.out.println(“发送结果:” + result);
//线程睡1秒
TimeUnit.SECONDS.sleep(1);
//6.关闭生产者producer
producer.shutdown();
}
}
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”);
//2.指定Nameserver地址
consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
//3.订阅主题Topic和Tag
consumer.subscribe(“BatchTopic”, “*”);
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
(1) tag过滤,consumer将接受包含TAG的消息.但是限制是一个消息只能有一个标签,不适用与一些复杂的场景.
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer(“group1”);
//2.指定Nameserver地址
producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
//3.启动producer
producer.start();
for (int i = 0; i < 3; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Message(“FilterTagTopic”, “Tag2”, (“Hello World” + i).getBytes());
//5.发送消息
SendResult result = producer.send(msg);
//发送状态
SendStatus status = result.getSendStatus();
System.out.println(“发送结果:” + result);
//线程睡1秒
TimeUnit.SECONDS.sleep(1);
}
//6.关闭生产者producer
producer.shutdown();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”);
//2.指定Nameserver地址
consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
//3.订阅主题Topic和Tag
consumer.subscribe(“FilterTagTopic”, "Tag1 || Tag2 ");
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer(“group1”);
//2.指定Nameserver地址
producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Message(“FilterSQLTopic”, “Tag1”, (“Hello World” + i).getBytes());
msg.putUserProperty(“i”, String.valueOf(i));//添加自定义的属性用于过滤
//5.发送消息
SendResult result = producer.send(msg);
//发送状态
SendStatus status = result.getSendStatus();
System.out.println(“发送结果:” + result);
//线程睡1秒
TimeUnit.SECONDS.sleep(2);
}
//6.关闭生产者producer
producer.shutdown();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”);
//2.指定Nameserver地址
consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
//3.订阅主题Topic和Tag
consumer.subscribe(“FilterSQLTopic”, MessageSelector.bySql(“i>5”));//过滤条件
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List
评论列表(0条)