提示:这里可以添加本文要记录的大概内容
目录 |
---|
GitHub - Apache RocketMQ. |
提示:这里可以添加本文要记录的大概内容
1.1 待定1
二、安装教程提示:这里可以添加本文要记录的大概内容
2.1 Linux部署前提条件是需要Java环境,关于JDK安装不做说明。
[root@myDemo ~]# java -version
openjdk version "1.8.0_342"
OpenJDK Runtime Environment (build 1.8.0_342-b07)
OpenJDK 64-Bit Server VM (build 25.342-b07, mixed mode)
[root@myDemo ~]#
从官网拉取压缩包,然后解压
## 安装解压软件
yum -y install unzip
## 下载包 解压
wget https://archive.apache.org/dist/rocketmq/4.9.3/rocketmq-all-4.9.3-bin-release.zip
unzip rocketmq-all-4.9.3-bin-release.zip
启动NameServer和Broker
setsid sh mqnamesrv
setsid sh mqbroker -n 0.0.0.0:9876
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-R0rwJNJV-1663424387253)(http://www.kaotop.com/file/tupian/20220919/rocketmq-Linux-第一次部署-1.png)]
2.2 Docker部署1
2.3 控制台部署目录 |
---|
GitHub - apache/rocketmq-dashboard |
从这里把代码拉下来,然后修改application.yml配置文件
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
server:
port: 18086
servlet:
encoding:
charset: UTF-8
enabled: true
force: true
## SSL setting
# ssl:
# key-store: classpath:rmqcngkeystore.jks
# key-store-password: rocketmq
# key-store-type: PKCS12
# key-alias: rmqcngkey
spring:
application:
name: rocketmq-dashboard
logging:
config: classpath:logback.xml
rocketmq:
config:
# if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, default localhost:9876
# configure multiple namesrv addresses to manage multiple different clusters
namesrvAddrs:
- 192.168.247.184:9876
# - 127.0.0.2:9876
# if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
isVIPChannel:
# timeout for mqadminExt, default 5000ms
timeoutMillis:
# rocketmq-console's data path:dashboard/monitor /tmp/rocketmq-console/data
dataPath: tmp/rocketmq-console/data
# set it false if you don't want use dashboard.default true
enableDashBoardCollect: true
# set the message track trace topic if you don't want use the default one
msgTrackTopicName:
ticketKey: ticket
# must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
loginRequired: false
useTLS: false
# set the accessKey and secretKey if you used acl
accessKey: # if version > 4.4.0
secretKey: # if version > 4.4.0
threadpool:
config:
coreSize: 10
maxSize: 10
keepAliveTime: 3000
queueSize: 5000
主要修改namesrv的地址以及本地端口,开启后界面长这样
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hfp3VLNK-1663424387253)(http://www.kaotop.com/file/tupian/20220919/rocketmq-控制台界面-1.png)]
可以通过界面中的消息发送功能进行调试
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-N1SWuyqc-1663424387254)(http://www.kaotop.com/file/tupian/20220919/rocketmq-控制台-消息发送界面-1.png)]
点击消息发送,选择Topic和tag
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cBrxjoaK-1663424387254)(http://www.kaotop.com/file/tupian/20220919/rocketmq-控制台-消息发送界面-2.png)]
检查消费者是否有收到消息
三、开发实践提示:这里可以添加本文要记录的大概内容
3.1 SpringBoot入门开发示例pom依赖
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-spring-boot-starterartifactId>
<version>2.2.2version>
dependency>
yaml配置文件修改
rocketmq:
nameServer: 192.168.247.184:9876
producer:
group: demo-group
send-message-timeout: 3000
server:
port: 16661
写上方法进行调用测试,首先是生产者
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class DemoController {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/demo01")
public String demo01() {
rocketMQTemplate.syncSend("springboot-topic01","我是消息");
return "ok";
}
}
然后打开消费者
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* MessageModel:集群模式;广播模式
* ConsumeMode:顺序消费;无序消费
*/
@Component
@RocketMQMessageListener(topic = "springboot-topic01", consumerGroup = "consumer-group",
//selectorExpression = "tag1",selectorType = SelectorType.TAG,
messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public class MessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("----------接收到rocketmq消息:" + message);
// rocketmq会自动捕获异常回滚 (官方默认会重复消费16次)
// int a = 1 / 0;
}
}
进行消息发送,第一个Demo示例完成
3.2 消息类型/发送分类消息的发送也有如下三种分类
同步消息
生产者收到消费者同步回应的ACK后才会发送下一条消息,可靠性高,效率低
;
异步消息
生产者无需同步收到消费者回应的ACK,直接发送下一条,随后用异步的方式接收消费者回应的ACK即可,可靠性适中,发送效率适中
;
单向发送
只发送消息,不处理消费者的ACK回应,斋发送,效率最高,可靠性最差
;
然后我们在Springboot RocketMQTemplate中查看消息发送的方法
//
void send(D destination, Message<?> message) throws MessagingException;
//
public SendResult syncSend(String destination, Object payload);
//
public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey);
//
public <T> T sendAndReceive(String destination, Message<?> message, Type type);
//
void convertAndSend(Object payload) throws MessagingException;
//
public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout);
//
public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback);
//
public TransactionSendResult sendMessageInTransaction(final String destination,
final Message<?> message, final Object arg) throws MessagingException;
//
public void sendOneWay(String destination, Message<?> message);
//
public void sendOneWayOrderly(String destination, Message<?> message, String hashKey);
综上,简单将消息划分为如下几个类型
普通消息顺序消息延时消息事务消息批量消息后续将在开发实战中进行演示
3.3 示例:多Topic消费场景采取内部类解决
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 消息消费
* @author 李家民
*/
public class MqMessageListener {
/**
* 停车模块
*/
@Component
@RocketMQMessageListener(topic = "CHD_Parking", consumerGroup = "consumer-group",
// selectorExpression = "spaceTotal",
selectorType = SelectorType.TAG,
messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public static class ParkingMessage implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.println("getTopic(= " + message.getTopic());
System.out.println("getTags(= " + message.getTags());
System.out.println("getMsgId(= " + message.getMsgId());
System.out.println("getBody(= " + new String(message.getBody()));
}
}
/**
* 仓库模块
*/
@Component
@RocketMQMessageListener(topic = "CHD_Storehouse", consumerGroup = "consumer-group",
selectorType = SelectorType.TAG,
messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public static class StorehouseMessage implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.println("getTopic(= " + message.getTopic());
System.out.println("getTags(= " + message.getTags());
System.out.println("getMsgId(= " + message.getMsgId());
System.out.println("getBody(= " + new String(message.getBody()));
}
}
/**
* 租赁模块
*/
@Component
@RocketMQMessageListener(topic = "CHD_Lease", consumerGroup = "consumer-group",
selectorType = SelectorType.TAG,
messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public static class LeaseMessage implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.println("getTopic(= " + message.getTopic());
System.out.println("getTags(= " + message.getTags());
System.out.println("getMsgId(= " + message.getMsgId());
System.out.println("getBody(= " + new String(message.getBody()));
}
}
/**
* 设施模块
*/
@Component
@RocketMQMessageListener(topic = "CHD_Facility", consumerGroup = "consumer-group",
selectorType = SelectorType.TAG,
messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public static class FacilityMessage implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.println("getTopic(= " + message.getTopic());
System.out.println("getTags(= " + message.getTags());
System.out.println("getMsgId(= " + message.getMsgId());
System.out.println("getBody(= " + new String(message.getBody()));
}
}
}
3.4 消息过滤
1
3.5 消息发送重试1
3.6 消息消费重试1
3.7 死信队列1
四、原理解析提示:这里可以添加本文要记录的大概内容
4.1 待定1
总结提示:这里对文章进行总结:
例如:以上就是今天要讲的内容。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)