本篇主要介绍在SpringBoot项目中接入RocketMQ,对于RocketMQ的原理概念请自行了解。
RocketMQ 基本概念可参考:
- 【https://help.aliyun.com/document_detail/29533.html】
- 【https://blog.csdn.net/qq_21040559/article/details/122703715】
1、引入maven依赖
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-spring-boot-starterartifactId>
<version>2.2.0version>
dependency>
2、配置aplication.properties
rocketmq.name-server=192.168.1.206:9876
rocketmq.business.topic.name=test
rocketmq.producer.group=producer
3、生产者端
@Component
public class RocketMQUtils {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送RocketMQ消息
*
* @param topicName 主题
* @param jsonMessage 消息
*/
public void send(String topicName, String jsonMessage) {
rocketMQTemplate.convertAndSend(topicName, jsonMessage);
}
}
4、消费者端
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.business.topic.name}", consumerGroup = "${rocketmq.producer.group}")
public class Receiver implements RocketMQListener<String> {
private static final String LOG_BIZ_TYPE = "Receiver#onMessage";
@Override
public void onMessage(String message) {
log.info("message: {}", message)
//业务处理...
}
2 对接阿里云的消息队列RocketMQ版
前提:对接阿里云的消息队列RocketMQ版需要提前在控制台创建Topic、Group相关资源
1、引入maven依赖
<dependency>
<groupId>com.aliyun.openservicesgroupId>
<artifactId>ons-clientartifactId>
<version>1.8.8.Finalversion>
dependency>
2、配置aplication.yml
rocketmq:
# 阿里云身份验证 AccessKey ID 和 AccessKey Secret
accessKey: ************************
secretKey: ******************************
# TCP协议接入域名
nameSrvAddr: http://**********************.mq-internet-access.mq-internet.aliyuncs.com:80
topic: test-topic
groupId: GID-test
tag: '*'
参数名 | 参数说明 |
---|---|
nameSrvAddr | 设置TCP协议接入点,从消息队列RocketMQ版控制台的实例详情页面获取。 |
accessKey | 您在阿里云账号管理控制台中创建的AccessKey ID,用于身份认证。 |
secretKey | 您在阿里云账号管理控制台中创建的AccessKey Secret,用于身份认证。 |
其中nameSrvAddr来源于实例详情的接入点,这里采用的是TCP协议客户端接入的方式。
3、生产者端
MQ配置类
package com.swan.geese.support.config.rocketmq;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
/**
* 阿里云版RocketMQ配置类
**/
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfig {
private String accessKey;
private String secretKey;
private String nameSrvAddr;
private String topic;
private String groupId;
private String tag;
public Properties getMqPropertie() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
return properties;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getNameSrvAddr() {
return nameSrvAddr;
}
public void setNameSrvAddr(String nameSrvAddr) {
this.nameSrvAddr = nameSrvAddr;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic= topic;
}
public String getLogTopic() {
return logTopic;
}
public void setLogTopic(String logTopic) {
this.logTopic = logTopic;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
}
生产者配置类
package com.swan.geese.support.config.rocketmq;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 生产者配置信息
**/
@Configuration
public class ProducerClient {
@Autowired
private MqConfig mqConfig;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ProducerBean buildProducer() {
ProducerBean producer = new ProducerBean();
producer.setProperties(mqConfig.getMqPropertie());
return producer;
}
}
生产消息工具类
package com.swan.geese.support.utils;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.swan.geese.support.config.rocketmq.MqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* 生产者工具类
**/
@Component
@Slf4j
public class RocketMQUtils {
@Autowired
private ProducerBean producer;
@Autowired
private MqConfig mqConfig;
/**
* 发送RocketMQ消息
* @param topicName 主题
* @param jsonMessage 消息
*/
public void send(String topicName, String jsonMessage) {
Message msg = new Message(topicName, mqConfig.getTag(), jsonMessage.getBytes(StandardCharsets.UTF_8));
try {
// 发送消息,只要不抛异常就是成功
producer.send(msg);
} catch (ONSClientException e) {
log.error("message send failed : ", e);
}
}
}
4、消费者端
配置类和生产者端公用
package com.swan.geese.handler.receiver;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.swan.geese.common.domain.AnchorInfo;
import com.swan.geese.common.domain.LogParam;
import com.swan.geese.common.domain.TaskInfo;
import com.swan.geese.common.enums.AnchorState;
import com.swan.geese.handler.pending.Task;
import com.swan.geese.handler.pending.TaskPendingHolder;
import com.swan.geese.handler.utils.GroupIdMappingUtils;
import com.swan.geese.support.config.rocketmq.MqConfig;
import com.swan.geese.support.utils.LogUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* 配置messageBusiness topic的消费者信息
**/
@Configuration
public class ConsumerClient {
@Autowired
private MqConfig mqConfig;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer() {
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = mqConfig.getMqPropertie();
properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
//将消费者线程数固定为20个 20为默认值
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
consumerBean.setProperties(properties);
//订阅关系
Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
Subscription subscription = new Subscription();
//订阅messageBusiness
subscription.setTopic(mqConfig.getBusinessTopic());
subscription.setExpression(mqConfig.getTag());
subscriptionTable.put(subscription, new MessageListener() {
//监听者内部处理消息
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
//业务处理...
return Action.CommitMessage;
}
});
//订阅多个topic如上面设置
consumerBean.setSubscriptionTable(subscriptionTable);
return consumerBean;
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)