RocketMQ入门级搭建

RocketMQ入门级搭建,第1张

RocketMQ入门级搭建

本文章的目的不在于讲解RocketMQ相关的基础知识点,此文章主要提供源码,帮助那些想要快速搭建一个入门级RocketMQ的初学者的同学。相关源码可参考如下:

项目结构:

建立父工程,pom文件导入依赖:


    4.0.0

    org.cainiao
    RocketDemo
    pom
    1.0-SNAPSHOT
    
        demo-api
        demo-producer
        demo-Consumer
        demo-Consumer-Slave
    

    
        org.springframework.boot
        spring-boot-starter-parent
        2.6.3
    
    
    
        
            org.springframework.boot
            spring-boot-starter
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    


模块(demo-api):定义相关的pojo类和接口类 1)首先引入pom依赖(demo-api):


    
        RocketDemo
        org.cainiao
        1.0-SNAPSHOT
    
    4.0.0

    demo-api

    
        
            org.springframework.boot
            spring-boot-starter
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    


2)模块(demo-api)项目结构:

 3)ISender简单接口定义:
package com.cainiao.message;

import com.cainiao.pojo.Student;

public interface ISender {

    
    public String demoSender(Student student);

    
    public void templateSender(Student student);
}
4)Pojo类相关定义:
package com.cainiao.pojo;

import java.io.Serializable;

public class Student implements Serializable {

    private String name;
    private Integer age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }
}

模块(demo-producer):消息生产者 1)首先引入pom依赖(demo-producer):


    
        RocketDemo
        org.cainiao
        1.0-SNAPSHOT
    
    4.0.0

    demo-producer

    
        
        
            org.cainiao
            demo-api
            1.0-SNAPSHOT
        
        
        
            org.apache.rocketmq
            rocketmq-spring-boot-starter
            2.2.1
        





        
        
            org.springframework.boot
            spring-boot-starter-actuator
        
        
        
            org.projectlombok
            lombok
            provided
        

    


2)模块(demo-producer)项目结构:

 3)生产者启动类:
package com.cainiao;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoProducer {

    public static void main(String[] args) {
        SpringApplication.run(DemoProducer.class, args);
    }
}
4)发送消息接口实现类:(分别采用默认的DefaultMQProducer和采用RocketMQTemplate进行发送消息,两种方式)
package com.cainiao.service;

import com.alibaba.fastjson.JSON;
import com.cainiao.message.ISender;
import com.cainiao.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class IMessageSender implements ISender {

    @Value("${mq.rocket.topic}")
    private String topic;

    @Value("${mq.rocket.tag}")
    private String tag;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public String demoSender(Student student) {
        
        try{
            DefaultMQProducer producer = new DefaultMQProducer("demo_producer");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
            Message message = new Message("chenke_topic", "message", String.valueOf(System.currentTimeMillis()),
                    JSON.toJSonString(student).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult result = producer.send(message);
            log.info("消息发送成功: " + message);
            producer.shutdown();
        }catch (Exception e){
            log.error("消息发送失败");
        }

        return "消息发送成功";
    }

    @Override
    public void templateSender(Student student) {
        try{
            Message message = new Message(topic, "tag4", JSON.toJSonString(student).getBytes(RemotingHelper.DEFAULT_CHARSET));
            log.info("开始发送消息时间:" + System.currentTimeMillis());
            rocketMQTemplate.getProducer().send(message);
            log.info("消息发送成功时间:" + System.currentTimeMillis());
        }catch (Exception e){
            log.error("消息发送失败时间:" + System.currentTimeMillis());
        }

    }
}
5)配置文件application.properties
# RocketMQ配置
rocketmq.nameServer=127.0.0.1:9876
rocketmq.producer.group=demo_producer

# Rocket 相关主题和标签配置
mq.rocket.topic=chenke_topic
mq.rocket.tag=message
6)采用Junit5编写测试类进行功能测试:
package com.cainiao.service;

import com.cainiao.DemoProducer;
import com.cainiao.message.ISender;
import com.cainiao.pojo.Student;
import io.netty.handler.codec.compression.ZstdEncoder;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import static org.junit.jupiter.api.Assertions.*;

@SpringBootTest(classes = DemoProducer.class)
class IMessageSenderTest {

    @Autowired
    private ISender sender;

    @Test
    public void testIMessageSender(){
        Student student = new Student();
        student.setName("chenke");
        student.setAge(25);
        String res = sender.demoSender(student);
    }

    @Test
    public void testTemplateSender() {
        Student student = new Student();
        student.setName("test2");
        student.setAge(30);
        sender.templateSender(student);
    }
}

模块(demo-consumer):Master 主消息消费者 1)首先引入pom依赖(demo-consumer):


    
        RocketDemo
        org.cainiao
        1.0-SNAPSHOT
    
    4.0.0

    demo-Consumer

    
        
        
            org.cainiao
            demo-api
            1.0-SNAPSHOT
        
        
        
            org.apache.rocketmq
            rocketmq-spring-boot-starter
            2.2.1
        
        
        
            org.springframework.boot
            spring-boot-starter-actuator
        
        
        
            org.projectlombok
            lombok
            provided
        
    


2)整体模块结构:

 3)Master消费者启动类(此处)
package com.cainiao.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoConsumer {
    public static void main(String[] args) {
        SpringApplication.run(DemoConsumer.class, args);
    }
}
4)指定消费者对某一个Topic进行监听,并设置其消费者组:
package com.cainiao.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@RocketMQMessageListener(topic = "${mq.rocket.topic}",
                            consumerGroup = "${rocketmq.consumer.group}",
                            messageModel = MessageModel.BROADCASTING,
                            selectorexpression = "tag2 || tag4",
                            selectorType = SelectorType.TAG)
@Component
public class IMessageConsumer implements RocketMQListener {
    @Override
    public void onMessage(MessageExt messageExt) {
        String msgId = messageExt.getMsgId();
        String tags = messageExt.getTags();
        String keys = messageExt.getKeys();
        byte[] bodyB = messageExt.getBody();
        String body = new String(bodyB);
        log.info("Master已消费: " + body);
    }
}
5)配置文件设置application.properties
# RocketMQ配置 (Master)
rocketmq.nameServer=127.0.0.1:9876
rocketmq.consumer.group=consumer_group

# Rocket 相关主题和标签配置
mq.rocket.topic=chenke_topic
# mq.rocket.tag=message1

模块(demo-consumer):Slave 从消息消费者 1)首先引入pom依赖(demo-consumer-slave):


    
        RocketDemo
        org.cainiao
        1.0-SNAPSHOT
    
    4.0.0

    demo-Consumer-Slave

    
        
        
            org.cainiao
            demo-api
            1.0-SNAPSHOT
        
        
        
            org.apache.rocketmq
            rocketmq-spring-boot-starter
            2.2.1
        
        
        
            org.springframework.boot
            spring-boot-starter-actuator
        
        
        
            org.projectlombok
            lombok
            provided
        
    

2)整体模块结构:

 3)Slave 消费者启动类(此处)
package com.cainiao.slave.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoConsumerSlave {

    public static void main(String[] args) {
        SpringApplication.run(DemoConsumerSlave.class, args);
    }
}
4)指定Slave 从消费者对某一个Topic进行监听,并设置其消费者组:
package com.cainiao.slave.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.springframework.stereotype.Component;
import org.apache.rocketmq.spring.core.RocketMQListener;

@Slf4j
@RocketMQMessageListener(topic = "${mq.rocket.topic}",
                        consumerGroup = "${rocketmq.consumer.group}",
                        messageModel = MessageModel.BROADCASTING,
                        selectorexpression = "tag1 || tag3",
                        selectorType = SelectorType.TAG)
@Component
public class SlaverConsumer implements RocketMQListener{

    @Override
    public void onMessage(MessageExt messageExt) {
        String msgId = messageExt.getMsgId();
        String keys = messageExt.getKeys();
        String tags = messageExt.getTags();
        byte[] bodyB = messageExt.getBody();
        String body = new String(bodyB);
        log.info("Slave已消费: " + body);
    }
}
5)配置文件设置application.properties
# RocketMQ配置 (Slave)
rocketmq.nameServer=127.0.0.1:9876
rocketmq.consumer.group=consumer_group

# Rocket 相关主题和标签配置
mq.rocket.topic=chenke_topic
# mq.rocket.tag=message2

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5716140.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存