本文章的目的不在于讲解RocketMQ相关的基础知识点,此文章主要提供源码,帮助那些想要快速搭建一个入门级RocketMQ的初学者的同学。相关源码可参考如下:
项目结构: 建立父工程,pom文件导入依赖:模块(demo-api):定义相关的pojo类和接口类 1)首先引入pom依赖(demo-api):4.0.0 org.cainiao RocketDemopom 1.0-SNAPSHOT demo-api demo-producer demo-Consumer demo-Consumer-Slave org.springframework.boot spring-boot-starter-parent2.6.3 org.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-starter-testtest
2)模块(demo-api)项目结构: 3)ISender简单接口定义:RocketDemo org.cainiao 1.0-SNAPSHOT 4.0.0 demo-apiorg.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-starter-testtest
package com.cainiao.message; import com.cainiao.pojo.Student; public interface ISender { public String demoSender(Student student); public void templateSender(Student student); }4)Pojo类相关定义:
package com.cainiao.pojo; import java.io.Serializable; public class Student implements Serializable { private String name; private Integer age; public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } }模块(demo-producer):消息生产者 1)首先引入pom依赖(demo-producer):
2)模块(demo-producer)项目结构: 3)生产者启动类:RocketDemo org.cainiao 1.0-SNAPSHOT 4.0.0 demo-producerorg.cainiao demo-api1.0-SNAPSHOT org.apache.rocketmq rocketmq-spring-boot-starter2.2.1 org.springframework.boot spring-boot-starter-actuatororg.projectlombok lombokprovided
package com.cainiao; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class DemoProducer { public static void main(String[] args) { SpringApplication.run(DemoProducer.class, args); } }4)发送消息接口实现类:(分别采用默认的DefaultMQProducer和采用RocketMQTemplate进行发送消息,两种方式)
package com.cainiao.service; import com.alibaba.fastjson.JSON; import com.cainiao.message.ISender; import com.cainiao.pojo.Student; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Slf4j @Component public class IMessageSender implements ISender { @Value("${mq.rocket.topic}") private String topic; @Value("${mq.rocket.tag}") private String tag; @Autowired private RocketMQTemplate rocketMQTemplate; @Override public String demoSender(Student student) { try{ DefaultMQProducer producer = new DefaultMQProducer("demo_producer"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); Message message = new Message("chenke_topic", "message", String.valueOf(System.currentTimeMillis()), JSON.toJSonString(student).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); log.info("消息发送成功: " + message); producer.shutdown(); }catch (Exception e){ log.error("消息发送失败"); } return "消息发送成功"; } @Override public void templateSender(Student student) { try{ Message message = new Message(topic, "tag4", JSON.toJSonString(student).getBytes(RemotingHelper.DEFAULT_CHARSET)); log.info("开始发送消息时间:" + System.currentTimeMillis()); rocketMQTemplate.getProducer().send(message); log.info("消息发送成功时间:" + System.currentTimeMillis()); }catch (Exception e){ log.error("消息发送失败时间:" + System.currentTimeMillis()); } } }5)配置文件application.properties
# RocketMQ配置 rocketmq.nameServer=127.0.0.1:9876 rocketmq.producer.group=demo_producer # Rocket 相关主题和标签配置 mq.rocket.topic=chenke_topic mq.rocket.tag=message6)采用Junit5编写测试类进行功能测试:
package com.cainiao.service; import com.cainiao.DemoProducer; import com.cainiao.message.ISender; import com.cainiao.pojo.Student; import io.netty.handler.codec.compression.ZstdEncoder; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import static org.junit.jupiter.api.Assertions.*; @SpringBootTest(classes = DemoProducer.class) class IMessageSenderTest { @Autowired private ISender sender; @Test public void testIMessageSender(){ Student student = new Student(); student.setName("chenke"); student.setAge(25); String res = sender.demoSender(student); } @Test public void testTemplateSender() { Student student = new Student(); student.setName("test2"); student.setAge(30); sender.templateSender(student); } }模块(demo-consumer):Master 主消息消费者 1)首先引入pom依赖(demo-consumer):
2)整体模块结构: 3)Master消费者启动类(此处)RocketDemo org.cainiao 1.0-SNAPSHOT 4.0.0 demo-Consumerorg.cainiao demo-api1.0-SNAPSHOT org.apache.rocketmq rocketmq-spring-boot-starter2.2.1 org.springframework.boot spring-boot-starter-actuatororg.projectlombok lombokprovided
package com.cainiao.consumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class DemoConsumer { public static void main(String[] args) { SpringApplication.run(DemoConsumer.class, args); } }4)指定消费者对某一个Topic进行监听,并设置其消费者组:
package com.cainiao.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Slf4j @RocketMQMessageListener(topic = "${mq.rocket.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING, selectorexpression = "tag2 || tag4", selectorType = SelectorType.TAG) @Component public class IMessageConsumer implements RocketMQListener5)配置文件设置application.properties{ @Override public void onMessage(MessageExt messageExt) { String msgId = messageExt.getMsgId(); String tags = messageExt.getTags(); String keys = messageExt.getKeys(); byte[] bodyB = messageExt.getBody(); String body = new String(bodyB); log.info("Master已消费: " + body); } }
# RocketMQ配置 (Master) rocketmq.nameServer=127.0.0.1:9876 rocketmq.consumer.group=consumer_group # Rocket 相关主题和标签配置 mq.rocket.topic=chenke_topic # mq.rocket.tag=message1模块(demo-consumer):Slave 从消息消费者 1)首先引入pom依赖(demo-consumer-slave):
2)整体模块结构: 3)Slave 消费者启动类(此处)RocketDemo org.cainiao 1.0-SNAPSHOT 4.0.0 demo-Consumer-Slaveorg.cainiao demo-api1.0-SNAPSHOT org.apache.rocketmq rocketmq-spring-boot-starter2.2.1 org.springframework.boot spring-boot-starter-actuatororg.projectlombok lombokprovided
package com.cainiao.slave.consumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class DemoConsumerSlave { public static void main(String[] args) { SpringApplication.run(DemoConsumerSlave.class, args); } }4)指定Slave 从消费者对某一个Topic进行监听,并设置其消费者组:
package com.cainiao.slave.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; import org.springframework.stereotype.Component; import org.apache.rocketmq.spring.core.RocketMQListener; @Slf4j @RocketMQMessageListener(topic = "${mq.rocket.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING, selectorexpression = "tag1 || tag3", selectorType = SelectorType.TAG) @Component public class SlaverConsumer implements RocketMQListener5)配置文件设置application.properties{ @Override public void onMessage(MessageExt messageExt) { String msgId = messageExt.getMsgId(); String keys = messageExt.getKeys(); String tags = messageExt.getTags(); byte[] bodyB = messageExt.getBody(); String body = new String(bodyB); log.info("Slave已消费: " + body); } }
# RocketMQ配置 (Slave) rocketmq.nameServer=127.0.0.1:9876 rocketmq.consumer.group=consumer_group # Rocket 相关主题和标签配置 mq.rocket.topic=chenke_topic # mq.rocket.tag=message2
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)