<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-spring-boot-starterartifactId>
<version>2.0.3version>
dependency>
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-clientartifactId>
<version>4.6.0version>
dependency>
配置
#RocketMQ相关配置
#消费者只需要配置mq的server地址即可,生产者也要配置
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=${spring.application.name}
# 发送时间超时时间
rocketmq.producer.send-message-timeout=300000
#异步消息发送失败重试次数
rocketmq.producer.retry-times-when-send-async-failed=0
#消息发送失败后的最大重试次数
rocketmq.producer.retry-times-when-send-failed=2
#消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节
rocketmq.producer.compress-message-body-threshold=4096
#消息最大容量
rocketmq.producer.max-message-size=4194304
rocketmq.producer.retry-next-server=true
消息发送代码
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;
/**
* 消息发送
*
* @author zhangzengxiu
* @date 2022/4/24
*/
@Service
public class ProduceMessage {
private static final Logger logger = LoggerFactory.getLogger(ProduceMessage.class);
@Autowired
private RocketMQTemplate rocketMQTemplate;
public Boolean sendMsg(String msg) {
try {
rocketMQTemplate.convertAndSend("xxtopic", msg);
} catch (MessagingException e) {
logger.error("send msg error,msg=" + msg);
return false;
}
return true;
}
}
消息消费代码
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* 消费消息
*
* @author zhangzengxiu
* @date 2022/4/24
*/
@Component
@RocketMQMessageListener(topic = "xxtopic", //监听指定topic和消息的生产者是同一个topic
consumerGroup = "yyyGroup", //group:不用和生产者group相同 ( 在RocketMQ中消费者和发送者组没有关系 )
selectorExpression = "*", // *表示所有 默认也是*
selectorType = SelectorType.TAG, //是按照tag还是SQL92,SQL92需要在RocketMQ机器开启配置
messageModel = MessageModel.CLUSTERING //消费模式:默认 CLUSTERING ( CLUSTERING:负载均衡 )( BROADCASTING:广播机制 )
)
public class ConsumerMessage implements RocketMQListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerMessage.class);
/**
* 监听消息
*
* @param s
*/
@Override
public void onMessage(String s) {
//消费消息代码
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)