MetaQ

MetaQ,第1张

MetaQ

一、原理

 1、简介

        metaQ(全称metamorphosis)是一款分布式、队列模型的消息中间件。基于发布订阅模式,有Push和Pull两种消费方式,支持严格的消息顺序,亿级别的堆积能力,支持消息回溯和多个维度的消息查询。

 2、架构

 metaQ架构图-1(更具体)

metaQ架构图-2(更抽象) 

metaQ架构由四个部分组成:

Producer:消息生产者,Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递。

  • Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息。
  • 并向提供Topic服务的Broker的Master建立长连接,且定时向Master发送心跳。
  • Producer向Master发布消息。

Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制。

  • Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息。
  • 并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。
  • Consumer既可以从Master订阅消息,也可以从Slave订阅消息,具体订阅Master还是Slave由Broker决定。

NameServer:NameServer是一个Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现,维护了Topic与Broker的对应关系,并与每一个Broker都保持心跳连接。在Producer和Consumer需要发布或者消费消息时,向NameServer发出请求来获得需要连接的Broker信息。主要包括两个功能:

  • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活。
  • 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker向每一台NameServer注册自己的路由信息,所以每个NameServer实例都保存一份完整的路由信息。当某个NameServer下线了,其它NameServer仍能向Consumer,Producer提供服务。

补充:metaQ是一个分布式消息中间件,关于高可用的实现(NameServer),mq没有使用业界最常用的zookeeper,而是自己开发了一个轻量级的configserver。这个configserver在内部里面很多中间件用到。               

configserver是一个无状态的节点(无状态节点就保证了可以随意横向扩展),多个节点之间并不通讯,但是任意一个NameServer的节点都保存了所需要的所有配置信息。集群内部的broker启动的时候会向他配置文件里面所有的NameServer节点汇报信息(Commitlog、Topic等信息),这样就保证了NameServer的单点问题,NameServer启动的时候也会从指定的配置文件里面加载配置。

Broker Cluster:Broker的集群,主要负责消息的存储、投递和查询以及服务高可用保证。Broker集群中Broker是分为Master-Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。

当消息发送到Master时候,Slave会定时获取此消息记录,并存储在自己的Store实现上;一旦Master记录成功,就返回成功,不用等待在slave上是否记录;正因如此,slave和master还有稍微一点的时间差异,在Master出故障那一瞬间,或许有最新产生的消息,就无法同步到slave;另外Slave可以作为Consumer的服务提供者,意思就是如果要写入必须通过Master,消费时候可以从Slave上直接获取。

Master和Slave都需要注册到nameserver上,一旦Master无法使用,客户端可使用与之对应的Slave。

为什么不用Zookeeper?

其实metaQ一开始的注册中心是ZooKeeper的,后续选择NameServer的原因,原因可能如下:

  • 根据CAP理论,最多只能满足两点特性,而ZooKeeper满足的是CP, 也就是说ZooKeeper无法保证可用性,这个主要是当ZooKeeper发生Master选举时,周期可能太长(30 ~ 120s),期间整个集群对外不可用,这对应一个注册中心来说是难以接受的
  • metaQ上Broker的节点角色Master和Slave是区分明确的,Slave不能升级成Master,如果一个Master宕机了,直接写到另一台Master就行了,也就不需要ZooKeeper的主从选举功能了。因此才选择设计一个轻量级的注册中心NameServer,满足集群部署需求,NameServer节点之间无任何信息同步,轻量级的设计也有更高的稳定性,占用的系统资源也更小。
  • metaQ集群,需要处理一些通用的数据,如Broker的列表,刷新时间等;如果选择ZooKeeper作为注册中心,复杂数据的逻辑解析 *** 作也需要一个ZooKeeper客户端去工作,因此直接设计一个注册中心反而简化了系统。

  3、存储模型

 4、消费模式

 1)集群模式

概念:集群消费模式是同一个消费者集群会消费topic的所有消息,就集群本身而言,它对于该topic下的每条消息只会消费一次,不会存在一个集群下两台机器重复消费同一条消息。或者说是一个ConsumerGroup中的Consumer实例平均分摊消费消息;

例子:某个Topic有9条消息,其中一个ConsumerGroup有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中部分,消费完的消息不能被其他实例消费。

2)广播模式

概念:广播消费指的是一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次。所以说,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。

例子:当一个consumer集群内有3个consumer实例(假设为consumer1、consumer2、consumer3)时,一条消息投递过来,会被consumer1、consumer2、consumer3都消费一次,该topic下的所有消息都会被每个消费者消费一次。

区别:与集群消费不同的是,consumer的消费进度是存储在各个consumer实例上,这就容易造成消息重复。还有很重要的一点,对于广播消费来说,是不会进行消费失败重投的,所以在consumer端消费逻辑处理时,需要额外关注消费失败的情况。

注意:虽然广播消费能保证集群内每个consumer实例都能消费消息,但是消费进度的维护、不具备消息重投的机制大大影响了实际的使用。因从,在实际使用中,更推荐使用集群消费,因为集群消费不仅拥有消费进度存储的可靠行,还具有消息重投的机制。而且,我们通过集群消费也可以达到广播消费的效果。

5、订阅模式

1)pull(拉模式)

获取消息的过程需要我们自己实现,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue,使用过程中需注意:

如果要使用pull方式的话,关键在于我们如何设置拉取消息的频率,如果生产者产生消息的频率是大致固定的,那么消费者也可以按照这个频率去拉取消息那是没有问题的。但是生产者生产消息的频率可能并非固定的,这就会导致我们不是很好设置pull的频率。

  • 如果每次pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;
  • 若每次pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的pull请求的RPC开销,影响MQ整体的网络性能。

2)push(推模式)

push的实现其实是将consumer把轮询过程进行封装,注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对我们而言,会感觉消息是被推送过来的,本质上还是消费者主动去获取的消息,这应该是我们最常用的一种方式,比较如果使用pull的话,感觉会和RPC很像了。更准确得说,metaQ使用的是长轮询来实现push效果的,相比较于轮询而言,长轮询是指服务端在没有数据时会hold住来自消费者的请求,也就是不返回响应结果,那么本次请求暂时就不会结束,直到服务端有相关数据,或者等待一定时间超时才会返回。返回后,客户端又会立即再次发起下一次,这个阻塞的时间是可以设置的。“长轮询”的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer,所以,不会存在由于生产者生产消息的能力远远大于消费者的处理能力,导致消费者承受不住之类的问题。但是显而易见的,这种长轮询其实是会阻塞住线程的,消费者这边的话,线程被阻塞住应该干不了其他事了;

为什么要使用这种方式来实现数据的推送呢?有如下几点:

  1. pull模式的频率设置无法很好的适应消息生产的频率,pull频率太小将导致消息推挤在Broer中,如果频率太大的话则会可能某些时间段都无法获取到数据,增加Broker负担;
  2. 长轮询的方式可以保证当Broker存在数据时才进行推送,并且消息获取的主动权在消费者手中,可以处理消息时才会获取,这点是可以控制的。

6、过滤

1)TGA过滤

发送消息时,可以指定tags属性:

com.alibaba.rocketmq.common.message.Message
	#Message(java.lang.String topic, 
             java.lang.String tags, 
             java.lang.String keys, 
             byte[] body)

示例:

Message msg = new Message("labor_test_demo",// topic
                        "tagTestA",// tag
                        "LaborID00" + i,// key,消息的Key字段是为了唯一标识消息的,方便运维排查问题。如果不设置Key,则无法定位消息丢失原因。
                        ("Hello metaQ " + i).getBytes());

订阅消息时,可以指定tags进行消息过滤:

com.taobao.metaq.client.metaPushConsumer
	#subscribe(java.lang.String topic, 
               java.lang.String subexpression)

subexpression属性支持多种表达式:

  • * 代表所有接收消息
  • tagA 代表过滤tags为tagA的消息
  • tagA || tagB代表过滤tagA或tagB的消息

示例:

consumer.subscribe("labor_test_demo", "tagTestA || tagTestB");

 2)SQL过滤

生产者端,通过设置用户自定义的属性:

for (int i = 0; i < 10; i++) {
   Message msg = new Message("labor_test_demo",// topic
                        "tagTestA",// tag
                        "LaborID00" + i, // key
                        ("Hello metaQ " + i).getBytes()); // body
   msg.putUserProperty("id", String.valueOf(i));
   SendResult sendResult = producer.send(msg);
}

消费者端,通过SQL过滤符合条件的自定义属性:  

metaMessageSelector selector = metaMessageSelector.bySql("id >= 1 and id <= 3");
consumer.subscribe("labor_test_demo", selector);

最终消费者会过滤得到id从1到3的3个消息进行消费 

理解:无论是TAG过滤还是SQL过滤都是消费者端的过滤,即该topic下的所有消息从Broker拉取到了消费者端,符合消费者过滤的消息则进行用户定义的进一步处理,不符合过滤条件的也当作被消费者消费了。

7、关键词概念

  • Topic:一级分类,对消息进行的分类
  • Tag:二级分类,对消息进行的分类
  • Message:信息载体
  • MessageID:消息唯一标志
  • MessageKey:由生产者配置的服务逻辑的唯一标志

produceId和produceGroup是一个意思,叫法不同。

comsumerId和comsumerGroup是一个意思,叫法不同。

二、实践

1、准备工作

1)创建topic

打开metaQ的控制台:http://ons-api.taobao.net/#/home/topic

发布订阅管理->Topic管理->发布Topic,填写Topic、应用名和备注信息,完成Topic创建。

2)订阅topic

搜索 dev_test_demo 这个topic,在 *** 作栏 中点击 申请订阅 进行创建: 

 此处创建两个Topic订阅分组,分别为 CID-consumer-test-demo 和 CID-consumer-test-demo2。

为啥需要创建订阅分组?

  1. 当多个消费者使用同一个订阅分组,可以负载均衡
  2. 实现容错更容易

Consumer Group:一组消费者集合,同一个group下的消费者以负载均衡的方式消费同一个topic下的消息。不同group之间可以认为是不同的消费者,如果消费的是同一个topic下的消息,那消费是互不干扰的,进度是不同的。

所以不同业务线想订阅同一个topic的消息,但又不想影响到各自的消息处理,可创建不同的订阅分组,每个订阅分组具有自己的消息消费进度,这样业务线之间的消息消费不受影响。

2 、普通maven工程

1)添加依赖

    
	com.taobao.metaq    
  metaq-client     
  3.0.0-SNAPSHOT 

2)消息生产者示例

发送普通消息:

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.taobao.metaq.client.metaProducer;


public class ProducerPlain {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        // ProducerGroup这个概念发送普通的消息时,作用不大(可随意填写),但是发送分布式事务消息时,比较关键
        metaProducer producer = new metaProducer("your_producer_group_name");
        producer.start();
        try {
            for (int i = 0; i < 10; i++) {
                Message msg = new Message(Constant.TOPIC,// topic
                        Constant.TAG,// tag
                        "OrderID001",// key,消息的Key字段是为了唯一标识消息的,方便运维排查问题。如果不设置Key,则无法定位消息丢失原因。
                        ("Hello metaQ").getBytes());// body
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }
}

发送顺序消息:

如何理解顺序消息?

metaQ的顺序消息指的是保证消息的局部有序,即发往单个队列中的消息有序。对于乱序消息采用轮询的方式分配到各个队列中,而对于顺序消息,在发布时我们可以实现一个select将需要保证顺序的消息都发往一个队列中(例如图中根据OrderId将同一个订单下的创建、付款、完成三个消息分到同一个队列中)这样做可以使得消费端在拉取到消费队列消费时按照规定的消息顺序进行消费,从而保证了消息的有序性。

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.taobao.metaq.client.metaProducer;

import java.util.List;

public class ProducerOrder {
    public static void main(String[] args) {
        try {
            metaProducer producer = new metaProducer("your_producer_group_name");
            producer.start();

            // 创建队列选择器规则,即定义消息发往到哪个队列中
            MessageQueueSelector selector = new MessageQueueSelector() {
                @Override
                public MessageQueue select(List mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            };

            int[] orderIds = new int[] {1001, 1002}; // 2个订单
            for (int i = 0; i < orderIds.length; i++) {
                int orderId = orderIds[i]; // 获取订单ID
                // 订单ID相同的消息要有序
                // 1.生成订单创建
                Message orderCreateMsg = new Message("dev_test_demo", "OrderCreateTag",
                        "OrderCreate:" + orderId, ("Order Content").getBytes());

                SendResult sendResult = producer.send(orderCreateMsg, selector, orderId);
                System.out.println(sendResult);
                // 2.生成订单付款
                Message orderPaymentMsg = new Message("dev_test_demo", "OrderPaymentTag",
                        "OrderPayment:" + orderId, ("Payment Content").getBytes());
                sendResult = producer.send(orderPaymentMsg, selector, orderId);
                System.out.println(sendResult);
                // 3.生成订单完成
                Message orderFinishMsg = new Message("dev_test_demo", "OrderFinishTag",
                        "OrderFinish:" + orderId, ("Finish Content").getBytes());
                sendResult = producer.send(orderFinishMsg, selector, orderId);
                System.out.println(sendResult);
            }
            producer.shutdown();
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 在消息发送完成后,可以进入到metaQ的控制台进行查看消息的堆积情况(发布订阅管理->订阅管理->查询 dev_test_demo -> 选择CID-consumer-test-demo->消费进度查询),6个消息堆积在2个队列中,每个队列保存着与该订单ID一致的订单创建、订单付款、订单完成信息。

3)消息消费者示例

订阅普通消息:

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.taobao.metaq.client.metaPushConsumer;

import java.util.List;


public class PushConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 设置consumerGroup的值
        metaPushConsumer consumer = new metaPushConsumer("CID-consumer-test-demo");
        // 订阅指定topic下tags分别等于TagA或TagC或TagD
        consumer.subscribe("dev_test_demo", "TagA || TagC || TagD");
        // 订阅指定topic下所有消息,一个consumer对象可以订阅多个topic
        consumer.subscribe("dev_test_demo2", "*");
        consumer.setConsumeMessageBatchMaxSize(3); // 设置批量消费时的单批最大消息数量,默认为1,当设置为的值>1,即为批量获取消费方式
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                System.out.println("Current Message Size: " + msgs.size());
                // 循环迭代msgs中的消息,即单次批量获取的消息
                // 尽管设置了最大单批次获取的最大消息数据为3,但是单次批量获取的消息数量是不确定的,可能是1、2、3
                // 如果设置为批量消费方式,要么都成功,要么都失败
                for (MessageExt msg: msgs) {
                    System.out.println("Tags: " + msg.getTags());
                    System.out.println("Body: " + new String(msg.getBody()));
                }
                //
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

订阅顺序消息:

import com.alibaba.rocketmq.client.consumer.listener.*;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.taobao.metaq.client.metaPushConsumer;

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;


public class PushConsumerOrder {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 设置consumerGroup的值
        metaPushConsumer consumer = new metaPushConsumer("CID-consumer-test-demo");
        // 订阅指定topic下tags分别等于TagA或TagC或TagD
        consumer.subscribe("dev_test_demo", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List msgs,
                                                       ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                this.consumeTimes.incrementAndGet();
                for (MessageExt msg:msgs) {
                    System.out.println(new String(msg.getBody()));
                    System.out.println(this.consumeTimes.get());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

主动pull:

主动pull拉消息时,只要队列中存在消息,那么都可以获取到,本质是通过offset获取的,即便消息被push模式消费过,依旧可以通过offset再次进行消费,只要不修改offset的值,那么pull模式下不会对服务器上的消息产生任何变动。 

import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.taobao.metaq.client.metaPullConsumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;


public class PullConsumer {
    private static final Map offseTable = new HashMap();


    public static void main(String[] args) throws MQClientException {
        metaPullConsumer consumer = new metaPullConsumer("CID-consumer-test-demo");
        consumer.start();
        Set mqs = consumer.fetchSubscribeMessageQueues("dev_test_demo");
        System.out.println("msg size:" + mqs.size());
        for (MessageQueue mq : mqs) {
            System.out.println("Consume from the queue: " + mq);
            PullResult pullResult;
            try {
                pullResult = consumer.pullBlockIfNotFound(mq, "*", getMessageQueueOffset(mq), 32);
                System.out.println(pullResult);
                putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        // TODO
                        List msgs = pullResult.getMsgFoundList();
                        for (MessageExt msg:msgs) {
                            System.out.println(new String(msg.getBody()));
                        }
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                        break;
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }

}

3、与Pandora Boot集成

在阿里集团内部,几乎所有的应用都用到了各式各样的中间件,比如HSF、TDDL、Diamond等等。本身中间件之间可能就有版本依赖的问题,比如你的应用HSF和Diamond分别依赖了同名jar包的不同版本,maven只会引入其中一个版本。同样的中间件和应用之间也存在同样的Jar包依赖的问题,出于要解决这些依赖冲突的问题,阿里就开发了Pandora ,中文名潘多拉,简单的来说就是一个类隔离容器。Pandora Boot是运行在Spring Boot上的,完全兼容,主要集成了各种阿里中间件,比如现在用的metaq,Pandora Boot会让metaq的使用变的非常简单。

1)创建工程

先生成脚手架工程:http://mw.alibaba-inc.com/bootstrap.html

没有metaq中间件可以勾选,没关系,等会手动添加即可。

2)添加meavn依赖

在root工程中添加pom管理(已存在):


    
        
            com.taobao.pandora
            pandora-boot-starter-bom
            2020-07-release
            pom
            import
        
    

在service服务模块中添加metaq依赖:


  com.alibaba.boot
  pandora-metaq-spring-boot-starter

3)生产者配置及使用 

在application.properties中配置单个消息发送者:

spring.metaq.producer.producer-group=CID-consumer-test-demo

在服务中使用生产者发送消息:

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.taobao.metaq.client.metaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderServiceImpl {

    @Autowired
    metaProducer metaProducer;

    public void createOrder() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        int orderId = 1001;
        String msgBody = "create order: " + orderId;

        Message msg = new Message("dev_test_demo", // topic
                "CreateOrderTag", // tag
                "OrderID1001", // key,消息的Key字段是为了唯一标识消息的,方便运维排查问题。如果不设置Key,则无法定位消息丢失原因。
                msgBody.getBytes());// body
        SendResult sendResult = metaProducer.send(msg);
        System.out.println(sendResult);
        SendStatus status = sendResult.getSendStatus();
        if (!status.equals(SendStatus.SEND_OK)) {
            System.out.println("error to send message");
            // message insert into database and send an alarm to admin
        }
    }
}

测试用例:

import com.alibaba.learn.Application;
import com.taobao.pandora.boot.test.junit4.DelegateTo;
import com.taobao.pandora.boot.test.junit4.PandoraBootRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(PandoraBootRunner.class)
@DelegateTo(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = { Application.class })
public class OrderServiceTest {

    @Autowired
    private OrderServiceImpl orderService;

    @Test
    public void testCreateOrder() {
        try {
            orderService.createOrder();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

进入到metaQ控制台,发现已经有了一条消息。

4)消费者配置及使用

消费者需要指定一个messageListener: 

其中需要实现MessageListenerConcurrently,重写consumeMessage方法,其中List就是消费的消息,可以通过getBody()方法获取到消息内容

@Service(value = "metaqMessageListener")
public class metaqMessageListener implements MessageListenerConcurrently {
    public static final Charset UTF_8 = Charset.forName("UTF-8");
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            System.err.println("receive: " + msg);
            String receivedMsg = new String(msg.getBody(), UTF_8);
            System.err.println("receivedMsg: " + receivedMsg);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

在application.properties 里指定这个Listener的ref:

spring.metaq.consumer.consumer-group=CID-consumer-test-demo
spring.metaq.consumer.topic=dev_test_demo
spring.metaq.consumer.sub-expression=TagB || TagA
spring.metaq.consumer.message-listener-ref=metaqMessageListener

结语:学习过程很漫长,使用很简单,努力学习每一天,加油!!!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5433444.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-11
下一篇 2022-12-11

发表评论

登录后才能评论

评论列表(0条)

保存