03 rocketmq术语解释

03 rocketmq术语解释,第1张

03 rocketmq术语解释
  1. mq的结构图,如下:


    同理,consumer也是先向nameserver获取从哪一个broker获取消息,再得到nameserver的响应之后,才开始获取消息,consumer会一直监听消息的发送.



    异步与同步的区别:



    4 发送同步,异步消息:
    同步: 指的是消息的发送者在发送消息后,处于阻塞状态,等待消息的消费方发送确认后返回.

    Message msg = new Message(“springboot-mq”, “Tag1”, (“Hello World” + i).getBytes());
    //5.发送消息
    SendResult result = producer.send(msg);
    //发送状态
    SendStatus status = result.getSendStatus();
    //消息id(每条消息都有一个id)
    String msgId = result.getMsgId();
    //消息队列id
    int queueId = result.getMessageQueue().getQueueId();
    System.out.println(“发送结果:” + result);
    //线程睡1秒
    TimeUnit.SECONDS.sleep(1);
    }
    //6.关闭生产者producer
    producer.shutdown();
    }
    }
  2. 异步: 消息发送到之后,不会去等待mq回传发送结果,异步消 息的可靠性没有同步消息高,异步消息的发送可以通过回调函数的方式接受消息的处理.
    应用场景为对消息发送性能比较高的场景.不能让消息延迟

    public class AsyncProducer {

    public static void main(String[] args) throws Exception {
    //1.创建消息生产者producer,并制定生产者组名
    DefaultMQProducer producer = new DefaultMQProducer(“group1”);
    //2.指定Nameserver地址
    producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
    //3.启动producer
    producer.start();

     for (int i = 0; i < 10; i++) {
         //4.创建消息对象,指定主题Topic、Tag和消息体
         
         Message msg = new Message("base", "Tag2", ("Hello World" + i).getBytes());
         //5.发送异步消息
         producer.send(msg, new SendCallback() {
             
             public void onSuccess(SendResult sendResult) {
                 System.out.println("发送结果:" + sendResult);
             }
             
             public void onException(Throwable e) {
                 System.out.println("发送异常:" + e);
             }
         });
         //线程睡1秒
         TimeUnit.SECONDS.sleep(1);
     }
     //6.关闭生产者producer
     producer.shutdown();
    

    }
    }

    5 单向消息发送: 只管发,不管结果(例如日志发送)

    Message msg = new Message(“base”, “Tag3”, (“Hello World,单向消息” + i).getBytes());
    //5.发送单向消息
    producer.sendoneway(msg);
    //线程睡1秒
    TimeUnit.SECONDS.sleep(5);
    }
    //6.关闭生产者producer
    producer.shutdown();
    }
    }

    6 消费消息的流程:

    SendResult sendResult = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { long orderId = (long) arg; long index = orderId % mqs.size();//根据订单id选择队列(保证同一个订单在同一个队列中) return mqs.get((int) index);//根据取模的结果确定发送的队列 } }, orderSteps.get(i).getOrderId()); System.out.println("发送结果:" + sendResult); } producer.shutdown(); }

    }

    9.2 消息的顺序消费(采用单线程确保消费的顺序性)

    10 延迟消息: 在发送消息的时候,设置消息被延迟消费的时间
    延迟可选如下:

    11 批量消息发送:
    批量发送消息能显著提高消息传递的性能,这一批消息的总大小不应超过4M.超过范围则需要进行分割.需要手写一个分割算法,计算消息的大小.
    public class Producer {
    public static void main(String[] args) throws Exception {
    //1.创建消息生产者producer,并制定生产者组名
    DefaultMQProducer producer = new DefaultMQProducer(“group1”);
    //2.指定Nameserver地址
    producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
    //3.启动producer
    producer.start();
    List msgs = new ArrayList();
    //4.创建消息对象,指定主题Topic、Tag和消息体

    Message msg1 = new Message(“BatchTopic”, “Tag1”, (“Hello World” + 1).getBytes());
    Message msg2 = new Message(“BatchTopic”, “Tag1”, (“Hello World” + 2).getBytes());
    Message msg3 = new Message(“BatchTopic”, “Tag1”, (“Hello World” + 3).getBytes());
    msgs.add(msg1);
    msgs.add(msg2);
    msgs.add(msg3);
    //5.发送消息
    SendResult result = producer.send(msgs);
    //发送状态
    SendStatus status = result.getSendStatus();
    System.out.println(“发送结果:” + result);
    //线程睡1秒
    TimeUnit.SECONDS.sleep(1);
    //6.关闭生产者producer
    producer.shutdown();
    }
    }

    public class Consumer {
    public static void main(String[] args) throws Exception {
    //1.创建消费者Consumer,制定消费者组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”);
    //2.指定Nameserver地址
    consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
    //3.订阅主题Topic和Tag
    consumer.subscribe(“BatchTopic”, “*”);
    //4.设置回调函数,处理消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动");
    }
    

    }

    12 过滤消息的方式: 当消息生产者将消息发送至broker后,consumer可以根据一定的规则对消息进行过滤,选择需要消费的消息
    (1) tag过滤,consumer将接受包含TAG的消息.但是限制是一个消息只能有一个标签,不适用与一些复杂的场景.
    public class Producer {
    public static void main(String[] args) throws Exception {
    //1.创建消息生产者producer,并制定生产者组名
    DefaultMQProducer producer = new DefaultMQProducer(“group1”);
    //2.指定Nameserver地址
    producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
    //3.启动producer
    producer.start();
    for (int i = 0; i < 3; i++) {
    //4.创建消息对象,指定主题Topic、Tag和消息体

    Message msg = new Message(“FilterTagTopic”, “Tag2”, (“Hello World” + i).getBytes());
    //5.发送消息
    SendResult result = producer.send(msg);
    //发送状态
    SendStatus status = result.getSendStatus();
    System.out.println(“发送结果:” + result);
    //线程睡1秒
    TimeUnit.SECONDS.sleep(1);
    }
    //6.关闭生产者producer
    producer.shutdown();
    }
    }
    public class Consumer {
    public static void main(String[] args) throws Exception {
    //1.创建消费者Consumer,制定消费者组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”);
    //2.指定Nameserver地址
    consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
    //3.订阅主题Topic和Tag
    consumer.subscribe(“FilterTagTopic”, "Tag1 || Tag2 ");
    //4.设置回调函数,处理消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动");
    }
    

    }

    (2) Sql:复杂场景推荐使sql过滤.
    public class Producer {
    public static void main(String[] args) throws Exception {
    //1.创建消息生产者producer,并制定生产者组名
    DefaultMQProducer producer = new DefaultMQProducer(“group1”);
    //2.指定Nameserver地址
    producer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
    //3.启动producer
    producer.start();
    for (int i = 0; i < 10; i++) {
    //4.创建消息对象,指定主题Topic、Tag和消息体

    Message msg = new Message(“FilterSQLTopic”, “Tag1”, (“Hello World” + i).getBytes());
    msg.putUserProperty(“i”, String.valueOf(i));//添加自定义的属性用于过滤
    //5.发送消息
    SendResult result = producer.send(msg);
    //发送状态
    SendStatus status = result.getSendStatus();
    System.out.println(“发送结果:” + result);
    //线程睡1秒
    TimeUnit.SECONDS.sleep(2);
    }
    //6.关闭生产者producer
    producer.shutdown();
    }
    }
    public class Consumer {
    public static void main(String[] args) throws Exception {
    //1.创建消费者Consumer,制定消费者组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“group1”);
    //2.指定Nameserver地址
    consumer.setNamesrvAddr(“192.168.25.135:9876;192.168.25.138:9876”);
    //3.订阅主题Topic和Tag
    consumer.subscribe(“FilterSQLTopic”, MessageSelector.bySql(“i>5”));//过滤条件
    //4.设置回调函数,处理消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动");
    }
    

    }

    欢迎分享,转载请注明来源:内存溢出

    原文地址: http://outofmemory.cn/zaji/5669776.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存