RocketMQ接入指南

RocketMQ接入指南,第1张

SpringBoot对接RocketMQ接入指南

本篇主要介绍在SpringBoot项目中接入RocketMQ,对于RocketMQ的原理概念请自行了解。

RocketMQ 基本概念可参考:

  • 【https://help.aliyun.com/document_detail/29533.html】
  • 【https://blog.csdn.net/qq_21040559/article/details/122703715】
1 对接自己搭建的RocketMQ环境

1、引入maven依赖

<dependency>
    <groupId>org.apache.rocketmqgroupId>
    <artifactId>rocketmq-spring-boot-starterartifactId>
    <version>2.2.0version>
dependency>

2、配置aplication.properties

rocketmq.name-server=192.168.1.206:9876
rocketmq.business.topic.name=test
rocketmq.producer.group=producer

3、生产者端

@Component
public class RocketMQUtils {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送RocketMQ消息
     *
     * @param topicName 主题
     * @param jsonMessage 消息
     */
    public void send(String topicName, String jsonMessage) {
        rocketMQTemplate.convertAndSend(topicName, jsonMessage);
    }
}

4、消费者端

@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.business.topic.name}", consumerGroup = "${rocketmq.producer.group}")
public class Receiver implements RocketMQListener<String> {

    private static final String LOG_BIZ_TYPE = "Receiver#onMessage";

    @Override
    public void onMessage(String message) {
		log.info("message: {}", message)
         //业务处理...
    }
2 对接阿里云的消息队列RocketMQ版

前提:对接阿里云的消息队列RocketMQ版需要提前在控制台创建Topic、Group相关资源

1、引入maven依赖


<dependency>
    <groupId>com.aliyun.openservicesgroupId>
    <artifactId>ons-clientartifactId>
    <version>1.8.8.Finalversion>
dependency>

2、配置aplication.yml

rocketmq:
  # 阿里云身份验证 AccessKey ID 和 AccessKey Secret
  accessKey: ************************
  secretKey: ******************************
  # TCP协议接入域名
  nameSrvAddr: http://**********************.mq-internet-access.mq-internet.aliyuncs.com:80
  topic: test-topic
  groupId: GID-test
  tag: '*'
参数名参数说明
nameSrvAddr设置TCP协议接入点,从消息队列RocketMQ版控制台的实例详情页面获取。
accessKey您在阿里云账号管理控制台中创建的AccessKey ID,用于身份认证。
secretKey您在阿里云账号管理控制台中创建的AccessKey Secret,用于身份认证。

其中nameSrvAddr来源于实例详情的接入点,这里采用的是TCP协议客户端接入的方式。

3、生产者端

MQ配置类

package com.swan.geese.support.config.rocketmq;

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
 * 阿里云版RocketMQ配置类
 **/
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfig {

    private String accessKey;
    private String secretKey;
    private String nameSrvAddr;
    private String topic;
    private String groupId;
    private String tag;

    public Properties getMqPropertie() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        return properties;
    }

    public String getAccessKey() {
        return accessKey;
    }

    public void setAccessKey(String accessKey) {
        this.accessKey = accessKey;
    }

    public String getSecretKey() {
        return secretKey;
    }

    public void setSecretKey(String secretKey) {
        this.secretKey = secretKey;
    }

    public String getNameSrvAddr() {
        return nameSrvAddr;
    }

    public void setNameSrvAddr(String nameSrvAddr) {
        this.nameSrvAddr = nameSrvAddr;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic= topic;
    }

    public String getLogTopic() {
        return logTopic;
    }

    public void setLogTopic(String logTopic) {
        this.logTopic = logTopic;
    }

    public String getGroupId() {
        return groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    public String getTag() {
        return tag;
    }

    public void setTag(String tag) {
        this.tag = tag;
    }
}

生产者配置类

package com.swan.geese.support.config.rocketmq;

import com.aliyun.openservices.ons.api.bean.ProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 生产者配置信息
 **/
@Configuration
public class ProducerClient {

    @Autowired
    private MqConfig mqConfig;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean buildProducer() {
        ProducerBean producer = new ProducerBean();
        producer.setProperties(mqConfig.getMqPropertie());
        return producer;
    }

}

生产消息工具类

package com.swan.geese.support.utils;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.swan.geese.support.config.rocketmq.MqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * 生产者工具类
 **/
@Component
@Slf4j
public class RocketMQUtils {

    @Autowired
    private ProducerBean producer;

    @Autowired
    private MqConfig mqConfig;

    /**
     * 发送RocketMQ消息
     * @param topicName 主题
     * @param jsonMessage 消息
     */
    public void send(String topicName, String jsonMessage) {
        Message msg = new Message(topicName, mqConfig.getTag(), jsonMessage.getBytes(StandardCharsets.UTF_8));
        try {
            // 发送消息,只要不抛异常就是成功
            producer.send(msg);
        } catch (ONSClientException e) {
            log.error("message send failed : ", e);
        }
    }

}

4、消费者端

配置类和生产者端公用

package com.swan.geese.handler.receiver;

import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.swan.geese.common.domain.AnchorInfo;
import com.swan.geese.common.domain.LogParam;
import com.swan.geese.common.domain.TaskInfo;
import com.swan.geese.common.enums.AnchorState;
import com.swan.geese.handler.pending.Task;
import com.swan.geese.handler.pending.TaskPendingHolder;
import com.swan.geese.handler.utils.GroupIdMappingUtils;
import com.swan.geese.support.config.rocketmq.MqConfig;
import com.swan.geese.support.utils.LogUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * 配置messageBusiness topic的消费者信息
 **/
@Configuration
public class ConsumerClient {

    @Autowired
    private MqConfig mqConfig;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
        //将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);
        //订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
        Subscription subscription = new Subscription();
        //订阅messageBusiness
        subscription.setTopic(mqConfig.getBusinessTopic());
        subscription.setExpression(mqConfig.getTag());
        subscriptionTable.put(subscription, new MessageListener() {
            //监听者内部处理消息
            @Override
            public Action consume(Message message, ConsumeContext consumeContext) {

                String msg = new String(message.getBody(), StandardCharsets.UTF_8);

                //业务处理...
                
                return Action.CommitMessage;
            }
        });
        //订阅多个topic如上面设置

        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/727967.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存