RocketMQ - Java 20220917

RocketMQ - Java 20220917,第1张

文章目录 前言一、概述1.1 待定 二、安装教程2.1 Linux部署2.2 Docker部署2.3 控制台部署 三、开发实践3.1 SpringBoot入门开发示例3.2 消息类型/发送分类3.3 示例:多Topic消费场景3.4 消息过滤3.5 消息发送重试3.6 消息消费重试3.7 死信队列 四、原理解析4.1 待定 总结

前言

提示:这里可以添加本文要记录的大概内容

目录
GitHub - Apache RocketMQ.
一、概述

提示:这里可以添加本文要记录的大概内容

1.1 待定

1

二、安装教程

提示:这里可以添加本文要记录的大概内容

2.1 Linux部署

前提条件是需要Java环境,关于JDK安装不做说明。

[root@myDemo ~]# java -version
openjdk version "1.8.0_342"
OpenJDK Runtime Environment (build 1.8.0_342-b07)
OpenJDK 64-Bit Server VM (build 25.342-b07, mixed mode)
[root@myDemo ~]# 

从官网拉取压缩包,然后解压

## 安装解压软件
yum -y install unzip

## 下载包 解压
wget https://archive.apache.org/dist/rocketmq/4.9.3/rocketmq-all-4.9.3-bin-release.zip

unzip rocketmq-all-4.9.3-bin-release.zip

启动NameServer和Broker

setsid sh mqnamesrv
setsid sh mqbroker -n 0.0.0.0:9876

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-R0rwJNJV-1663424387253)(http://www.kaotop.com/file/tupian/20220919/rocketmq-Linux-第一次部署-1.png)]

2.2 Docker部署

1

2.3 控制台部署
目录
GitHub - apache/rocketmq-dashboard

从这里把代码拉下来,然后修改application.yml配置文件

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

server:
  port: 18086
  servlet:
    encoding:
      charset: UTF-8
      enabled: true
      force: true
## SSL setting
#  ssl:
#    key-store: classpath:rmqcngkeystore.jks
#    key-store-password: rocketmq
#    key-store-type: PKCS12
#    key-alias: rmqcngkey

spring:
  application:
    name: rocketmq-dashboard

logging:
  config: classpath:logback.xml

rocketmq:
  config:
    # if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, default localhost:9876
    # configure multiple namesrv addresses to manage multiple different clusters
    namesrvAddrs:
      - 192.168.247.184:9876
    #      - 127.0.0.2:9876
    # if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
    isVIPChannel:
    # timeout for mqadminExt, default 5000ms
    timeoutMillis:
    # rocketmq-console's data path:dashboard/monitor    /tmp/rocketmq-console/data
    dataPath: tmp/rocketmq-console/data
    # set it false if you don't want use dashboard.default true
    enableDashBoardCollect: true
    # set the message track trace topic if you don't want use the default one
    msgTrackTopicName:
    ticketKey: ticket
    # must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
    loginRequired: false
    useTLS: false
    # set the accessKey and secretKey if you used acl
    accessKey: # if version > 4.4.0
    secretKey: # if version > 4.4.0

threadpool:
  config:
    coreSize: 10
    maxSize: 10
    keepAliveTime: 3000
    queueSize: 5000

主要修改namesrv的地址以及本地端口,开启后界面长这样

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hfp3VLNK-1663424387253)(http://www.kaotop.com/file/tupian/20220919/rocketmq-控制台界面-1.png)]

可以通过界面中的消息发送功能进行调试

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-N1SWuyqc-1663424387254)(http://www.kaotop.com/file/tupian/20220919/rocketmq-控制台-消息发送界面-1.png)]

点击消息发送,选择Topic和tag

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cBrxjoaK-1663424387254)(http://www.kaotop.com/file/tupian/20220919/rocketmq-控制台-消息发送界面-2.png)]

检查消费者是否有收到消息

三、开发实践

提示:这里可以添加本文要记录的大概内容

3.1 SpringBoot入门开发示例

pom依赖

<dependency>
            <groupId>org.apache.rocketmqgroupId>
            <artifactId>rocketmq-spring-boot-starterartifactId>
            <version>2.2.2version>
        dependency>

yaml配置文件修改

rocketmq:
  nameServer: 192.168.247.184:9876
  producer:
    group: demo-group
    send-message-timeout: 3000
server:
  port: 16661

写上方法进行调用测试,首先是生产者

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

@RestController
public class DemoController {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/demo01")
    public String demo01() {
       rocketMQTemplate.syncSend("springboot-topic01","我是消息");
        return "ok";
    }
}

然后打开消费者

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * MessageModel:集群模式;广播模式
 * ConsumeMode:顺序消费;无序消费
 */
@Component
@RocketMQMessageListener(topic = "springboot-topic01", consumerGroup = "consumer-group",
        //selectorExpression = "tag1",selectorType = SelectorType.TAG,
        messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public class MessageConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("----------接收到rocketmq消息:" + message);
        // rocketmq会自动捕获异常回滚  (官方默认会重复消费16次)
        // int a = 1 / 0;
    }
}

进行消息发送,第一个Demo示例完成

3.2 消息类型/发送分类

消息的发送也有如下三种分类

同步消息

生产者收到消费者同步回应的ACK后才会发送下一条消息,可靠性高,效率低

异步消息

生产者无需同步收到消费者回应的ACK,直接发送下一条,随后用异步的方式接收消费者回应的ACK即可,可靠性适中,发送效率适中

单向发送

只发送消息,不处理消费者的ACK回应,斋发送,效率最高,可靠性最差;

然后我们在Springboot RocketMQTemplate中查看消息发送的方法

// 
void send(D destination, Message<?> message) throws MessagingException;
// 
public SendResult syncSend(String destination, Object payload);
// 
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey);
//
public <T> T sendAndReceive(String destination, Message<?> message, Type type);
// 
void convertAndSend(Object payload) throws MessagingException;
// 
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout);
// 
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback);
//
public TransactionSendResult sendMessageInTransaction(final String destination,
        final Message<?> message, final Object arg) throws MessagingException;
//
public void sendOneWay(String destination, Message<?> message);
//
public void sendOneWayOrderly(String destination, Message<?> message, String hashKey);

综上,简单将消息划分为如下几个类型

普通消息顺序消息延时消息事务消息批量消息

后续将在开发实战中进行演示

3.3 示例:多Topic消费场景

采取内部类解决

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 消息消费
 * @author 李家民
 */
public class MqMessageListener {

	/**
	 * 停车模块
	 */
	@Component
	@RocketMQMessageListener(topic = "CHD_Parking", consumerGroup = "consumer-group",
		// selectorExpression = "spaceTotal",
		selectorType = SelectorType.TAG,
		messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
	public static class ParkingMessage implements RocketMQListener<MessageExt> {
		@Override
		public void onMessage(MessageExt message) {
			System.out.println("getTopic(= " + message.getTopic());
			System.out.println("getTags(= " + message.getTags());
			System.out.println("getMsgId(= " + message.getMsgId());
			System.out.println("getBody(= " + new String(message.getBody()));
		}
	}

	/**
	 * 仓库模块
	 */
	@Component
	@RocketMQMessageListener(topic = "CHD_Storehouse", consumerGroup = "consumer-group",
		selectorType = SelectorType.TAG,
		messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
	public static class StorehouseMessage implements RocketMQListener<MessageExt> {
		@Override
		public void onMessage(MessageExt message) {
			System.out.println("getTopic(= " + message.getTopic());
			System.out.println("getTags(= " + message.getTags());
			System.out.println("getMsgId(= " + message.getMsgId());
			System.out.println("getBody(= " + new String(message.getBody()));
		}
	}


	/**
	 * 租赁模块
	 */
	@Component
	@RocketMQMessageListener(topic = "CHD_Lease", consumerGroup = "consumer-group",
		selectorType = SelectorType.TAG,
		messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
	public static class LeaseMessage implements RocketMQListener<MessageExt> {
		@Override
		public void onMessage(MessageExt message) {
			System.out.println("getTopic(= " + message.getTopic());
			System.out.println("getTags(= " + message.getTags());
			System.out.println("getMsgId(= " + message.getMsgId());
			System.out.println("getBody(= " + new String(message.getBody()));
		}
	}


	/**
	 * 设施模块
	 */
	@Component
	@RocketMQMessageListener(topic = "CHD_Facility", consumerGroup = "consumer-group",
		selectorType = SelectorType.TAG,
		messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
	public static class FacilityMessage implements RocketMQListener<MessageExt> {
		@Override
		public void onMessage(MessageExt message) {
			System.out.println("getTopic(= " + message.getTopic());
			System.out.println("getTags(= " + message.getTags());
			System.out.println("getMsgId(= " + message.getMsgId());
			System.out.println("getBody(= " + new String(message.getBody()));
		}
	}

}
3.4 消息过滤

1

3.5 消息发送重试

1

3.6 消息消费重试

1

3.7 死信队列

1

四、原理解析

提示:这里可以添加本文要记录的大概内容

4.1 待定

1

总结

提示:这里对文章进行总结:
例如:以上就是今天要讲的内容。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/2990213.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-09-23
下一篇 2022-09-23

发表评论

登录后才能评论

评论列表(0条)

保存