创建完成后记录下集群ID(clusterId);
1.2 创建命名空间创建好集群后,在命名空间中新建命名空间,命名空间名称可以根据实际业务场景进行区分,比如这里创建可以根据测试环境、预发布环境、生产环境等进行区分创建。
新建命名空间
1.3、创建好命名空间后,新建个`topic`主题。创建topic
以上信息创建好后,我们在集群中可以看到集群的访问地址,如下:
查看接入地址
在创建tdmq集群时我们需要申请外网访问,这个需要找腾讯的客服开通。
至此,我们开发的基础环境已经准备完成。
二、编写生产者、消费者代码 2.1、创建工程在idea中新建个工程,工程名称为spring-boot-starter-tdmq
创建工程
工程名称和包路径可以根据实际情况进行自定义。
2.2、引入相关依赖包org.springframework.boot spring-boot-autoconfigureorg.apache.pulsar pulsar-client${pulsar.version}
这里使用的是腾讯云tdmq-pulsar版,这里需要引入pulsar-client.
2.3、创建生产者首先我们在项目中创建一个config的包路径,新建一个tdmq的配置类TdmqProperties。
@Data @ConfigurationProperties(prefix = "tdmq") public class TdmqProperties { private boolean enable = false; private String serviceUrl; private String token; private String clusterId; private String environmentId; }
这里主要配置下tdmq集群相关的信息。我们先将配置信息写在配置类中,后续优化我们在使用外部配置进行初始化 *** 作。
创建PulsarClient对象,生产者消费者都是基于PulsarClient对象进行创建。
这里我们通过SpringBoot的自动装配功能来装配PulsarClient。
这里我们创建一个自动化配置类TdmqAutoConfiguration。内容如下:
@Data @EnableConfigurationProperties({TdmqProperties.class}) public class TdmqAutoConfiguration { @Bean @ConditionalOnMissingBean(PulsarClient.class) @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true") public PulsarClient pulsarClient(TdmqProperties mqProperties) throws PulsarClientException { return PulsarClient.builder() .serviceUrl(mqProperties.getServiceUrl()) .authentication(AuthenticationFactory.token(mqProperties.getToken())) .build(); } }
这里增加了两个条件注解@ConditionalOnProperty和@ConditionalOnMissingBean(PulsarClient.class)这里分别表示如果没有找到PulsarClient对象和配置文件中启用了tdmq功能,我们才实例化PulsarClient对象。
有了PulsarClient对象,我们可以继续编写生产者代码啦。
在工程中创建producer包路径,并在该路径下创建TdmqProucer.class,内容如下:
@Service public class TdmqProucer { @Autowired private PulsarClient pulsarClient; public MessageId sendMsg(String message) throws PulsarClientException { Producer2.4、创建消息消费者producer = pulsarClient.newProducer().topic("test").create(); MessageId messageId = producer.newMessage().value(message.getBytes()).send(); producer.close(); return messageId; } }
在工程中创建consumer包路径,并在该路径下创建TdmqConsumer类,内容如下:
@Slf4j @Service public class TdmqConsumer { @Autowired private PulsarClient pulsarClient; private Consumerconsumer; @PostConstruct public void initConsumer() throws PulsarClientException { log.info("MessageLoggingListener is start"); consumer = pulsarClient.newConsumer().topic("自己的topic").subscriptionName("自己的subscriptionName").subscriptionType(SubscriptionType.Exclusive) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); Executors.newSingleThreadExecutor().submit(this::consumer); } @PreDestroy public void destroy() throws PulsarClientException { consumer.close(); pulsarClient.close(); } public void consumer() { while (true) { Message message = null; try { //使用while(true)无问题,该处会发生阻塞释放CPU资源 message = consumer.receive(); String json = new String(message.getData()); if (StringUtils.isNotEmpty(json)) { log.info("获取消息数据内容:{}", json); } consumer.acknowledge(message); } catch (PulsarClientException e) { log.error("数据消费失败", e); } } } }
2.5、测试生产者和消费者
编写单元测试类
@Slf4j @SpringBootTest class SpringBootStarterTdmqApplicationTests { @Autowired private TdmqProucer proucer; @Test public void producer() throws PulsarClientException { MessageId messageId = proucer.sendMsg("自己的topic", "发送消息测试"); log.info("send msg is success Id = {}", messageId); } }
我们看下测试结果:
测试结果
最简单的生产消费已经完成了。接下来我们需要对代码进一步封装,这里我们参考amqp的方式,分别封装proucerTemplate和TdmqConsumer注解。实现消费者功能达到一下情况:
@TdmqConsumer(topic = TopicConstant.MESSAGE_LOGGING_TOPIC, clazz = CreateMsgBean.class, subscriptionName = "subscriptionName") void consume(JddMessagemsg) { log.info("------------{}", JSONUtil.toJsonStr(msg)); }
OK,基本使用先到这里,我们开始后续内容的封装优化。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)