RocketMQ 主从搭建

RocketMQ 主从搭建,第1张

RocketMQ 主从搭建 1、多种集群模式讲解

1.1、单节点
优点:本地开发测试,配置简单,同步刷盘消息一条都不会丢
缺点:不可靠,如果宕机,会导致服务不可用

1.2、主从(异步、同步双写) :
优点:同步双写消息不丢失(把消息写入master和slave), 异步复制存在少量丢失 (把数据写入master,master把数据复制到slave),主节点宕机,从节点可以对外提供消息的消费,但是不支持写入
缺点:主备有短暂消息延迟,毫秒级,目前不支持自动切换,需要脚本或者其他程序进行检测然后进行停止broker,重启让从节点成为主节点

1.3、双主:
优点:配置简单, 可以靠配置RAID磁盘阵列保证消息可靠,异步刷盘丢失少量消息
缺点: master机器宕机期间,未被消费的消息在机器恢复之前不可消费,实时性会受到影响

1.4、 双主双从,多主多从模式(异步复制)
优点:磁盘损坏,消息丢失的非常少,消息实时性不会受影响,Master 宕机后,消费者仍然可以从Slave消费
缺点:主备有短暂消息延迟,毫秒级,如果Master宕机,磁盘损坏情况,会丢失少量消息

1.5、 双主双从,多主多从模式(同步双写)
优点:同步双写方式,主备都写成功,向应用才返回成功,服务可用性与数据可用性都非常高
缺点:性能比异步复制模式略低,主宕机后,备机不能自动切换为主机

推荐:1.2、1.4、1.5

2、同步刷盘、异步刷盘

同步刷盘:数据到了应用程序内存,消息立马写入磁盘中,数据安全性高。


异步刷盘:性能高,不定时的从应用程序内存把数据写入磁盘中。

3、主从搭建 异步复制

3.1、环境准备(RocketMQ 安装请看 RocketMQ 入门、安装、广控台)

192.168.1.38 主节点  光控台
192.168.1.39 从节点

主:


从:


3.2、修改配置文件
主节点:

cd /usr/local/rocketmq/distribution/target/apache-rocketmq/conf/2m-2s-async

#修改配置文件
vim broker-a.properties

#NameServer 地址
namesrvAddr=192.168.1.38:9876;192.168.1.39:9876
#集群名称,主从要一样
brokerClusterName=DefaultCluster
#broker名称,主从要一样,根据BrokerRole来确定谁是主谁是从
brokerName=broker-a
#brokerID 0是主,>0是从
brokerId=0
#删除日志时间为凌晨4点
deleteWhen=04
#文件保留时间48小时
fileReservedTime=48
#主从关系,异步复制
brokerRole=ASYNC_MASTER
#刷盘模式,异步刷盘
flushDiskType=ASYNC_FLUSH

从节点:

cd /usr/local/rocketmq/distribution/target/apache-rocketmq/conf/2m-2s-async

#修改配置文件
vim broker-a-s.properties

namesrvAddr=192.168.1.38:9876;192.168.1.39:9876
brokerClusterName=DefaultCluster
brokerName=broker-a
#ID 大于0是从节点
brokerId=1
deleteWhen=04
fileReservedTime=48
#角色是从节点
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

3.3、启动主节点、从节点

cd /usr/local/rocketmq/distribution/target/apache-rocketmq/bin
#启动NameServer
nohup sh ./mqnamesrv  >/dev/null 2>&1 &
#启动主Broker
nohup sh  ./mqbroker  -c  ../conf/2m-2s-async/broker-a.properties  >/dev/null  2>&1  &
#启动从Broker
nohup sh  ./mqbroker  -c  ../conf/2m-2s-async/broker-a-s.properties  >/dev/null  2>&1  &

3.4、搭建广控台(具体搭建请看第一章节)
上传光控制台:

#修改配置文件
cd /usr/local/rocketmq-console/src/main/resources
vim application.properties


打包运行

cd /usr/local/rocketmq-console
mvn clean package -Dmaven.test.skip=true
cd /usr/local/rocketmq-console/target
nohup java -jar  rocketmq-console-ng-1.0.1.jar   1>/dev/null  2>&1 &
lsof -i :8080

4、测试故障转移

4.1、pom文件

    
        org.apache.rocketmq
        rocketmq-client
        4.4.0
    

4.2、Consts

public class RocketMQConsts {
    //=========================Hello RocketMQ==========================
    public static final String HELLO_TOPIC = "helloTopic";
    public static  final String HELLO_GROUP_PROVIDER = "helloGroupProvider";
    public static final String HELLO_TAG = "helloTag";
    public static final String HELLO_NAMESERVER="192.168.1.38:192.168.1.39:9876";
    public static final String HELLO_GROUP_CUSTOMER="helloGroupCustomer";
}

4.3、Producer

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class RocketMQProducer {

    public static void main(String[] args) throws Exception {
        //创建producer,设置NameServer地址, 关闭rocketmq 虚拟IP
        DefaultMQProducer producer = new DefaultMQProducer(RocketMQConsts.HELLO_GROUP_PROVIDER);
        producer.setNamesrvAddr(RocketMQConsts.HELLO_NAMESERVER);
        producer.setVipChannelEnabled(false);
        producer.start();
        //发送消息
        for (int i = 1; i <= 10; i++) {
            //主题、TAG、消息体
            Message msg = new Message(RocketMQConsts.HELLO_TOPIC, RocketMQConsts.HELLO_TAG, ("Hello RocketMQ " + i).getBytes());
            producer.send(msg);
            System.out.println("发送消息" + msg);
        }
        //关闭
        producer.shutdown();
    }

}

4.4、Customer

public class RocketMQCustomer {
    public static void main(String[] args) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        //配置组名 IP
        consumer.setNamesrvAddr(RocketMQConsts.HELLO_NAMESERVER);
        consumer.setConsumerGroup(RocketMQConsts.HELLO_GROUP_CUSTOMER);
        //最后偏移量读取消息
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        try {
            //订阅主题,监听主题下哪些标签
            consumer.subscribe(RocketMQConsts.HELLO_TOPIC, "*");
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                try {
                    //默认一条一条消息消费
                    Message msg = msgs.get(0);
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
                    String topic = msg.getTopic();
                    String body = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    String keys = msg.getKeys();
                    System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
                    //返回成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    //返回稍后进行投递
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            });
            //开始
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}

4.5、启动消费者、启动生产者


4.6、关闭RocketMQ Master、启动生产者

#关闭Master Broker
cd /usr/local/rocketmq/distribution/target/apache-rocketmq/bin
sh ./mqshutdown broker


启动生产者发送消息


4.7、启动消费者

4.8、在启动主节点,发现消费者没有数据可以消费

5、总结

5.1、nameserver
nameserver互相独立,彼此没有通信关系,单台nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用。nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。

5.2、broker与nameserver关系
broker:单个broker和所有nameserver保持长连接,每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。
nameserver:nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接。

5.3、provider与nameserver关系
单个生产者者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,生产者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。默认情况下,生产者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,生产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败。

5.4、provider与broker关系
单个生产者和该生产者关联的所有broker保持长连接。默认情况下,生产者每隔30秒向所有broker发送心跳,该时间由DefaultMQProducer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接。

5.5、cunstomer与nameserver关系
单个消费者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由DefaultMQPushConsumer的pollNameServerInteval参数决定,可手动配置。

5.6、cunstomer与broker关系
单个消费者和该消费者关联的所有broker保持长连接。默认情况下,消费者每隔30秒向所有broker发送心跳,该时间由DefaultMQPushConsumer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。

5.7、主从
只有master才能进行写入 *** 作,slave不允许写入只能同步,同步策略取决于master的配置。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5708985.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存