1.1、单节点
优点:本地开发测试,配置简单,同步刷盘消息一条都不会丢
缺点:不可靠,如果宕机,会导致服务不可用
1.2、主从(异步、同步双写) :
优点:同步双写消息不丢失(把消息写入master和slave), 异步复制存在少量丢失 (把数据写入master,master把数据复制到slave),主节点宕机,从节点可以对外提供消息的消费,但是不支持写入
缺点:主备有短暂消息延迟,毫秒级,目前不支持自动切换,需要脚本或者其他程序进行检测然后进行停止broker,重启让从节点成为主节点
1.3、双主:
优点:配置简单, 可以靠配置RAID磁盘阵列保证消息可靠,异步刷盘丢失少量消息
缺点: master机器宕机期间,未被消费的消息在机器恢复之前不可消费,实时性会受到影响
1.4、 双主双从,多主多从模式(异步复制)
优点:磁盘损坏,消息丢失的非常少,消息实时性不会受影响,Master 宕机后,消费者仍然可以从Slave消费
缺点:主备有短暂消息延迟,毫秒级,如果Master宕机,磁盘损坏情况,会丢失少量消息
1.5、 双主双从,多主多从模式(同步双写)
优点:同步双写方式,主备都写成功,向应用才返回成功,服务可用性与数据可用性都非常高
缺点:性能比异步复制模式略低,主宕机后,备机不能自动切换为主机
推荐:1.2、1.4、1.5
2、同步刷盘、异步刷盘同步刷盘:数据到了应用程序内存,消息立马写入磁盘中,数据安全性高。
异步刷盘:性能高,不定时的从应用程序内存把数据写入磁盘中。
3.1、环境准备(RocketMQ 安装请看 RocketMQ 入门、安装、广控台)
192.168.1.38 主节点 光控台 192.168.1.39 从节点
主:
从:
3.2、修改配置文件
主节点:
cd /usr/local/rocketmq/distribution/target/apache-rocketmq/conf/2m-2s-async #修改配置文件 vim broker-a.properties #NameServer 地址 namesrvAddr=192.168.1.38:9876;192.168.1.39:9876 #集群名称,主从要一样 brokerClusterName=DefaultCluster #broker名称,主从要一样,根据BrokerRole来确定谁是主谁是从 brokerName=broker-a #brokerID 0是主,>0是从 brokerId=0 #删除日志时间为凌晨4点 deleteWhen=04 #文件保留时间48小时 fileReservedTime=48 #主从关系,异步复制 brokerRole=ASYNC_MASTER #刷盘模式,异步刷盘 flushDiskType=ASYNC_FLUSH
从节点:
cd /usr/local/rocketmq/distribution/target/apache-rocketmq/conf/2m-2s-async #修改配置文件 vim broker-a-s.properties namesrvAddr=192.168.1.38:9876;192.168.1.39:9876 brokerClusterName=DefaultCluster brokerName=broker-a #ID 大于0是从节点 brokerId=1 deleteWhen=04 fileReservedTime=48 #角色是从节点 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH
3.3、启动主节点、从节点
cd /usr/local/rocketmq/distribution/target/apache-rocketmq/bin #启动NameServer nohup sh ./mqnamesrv >/dev/null 2>&1 & #启动主Broker nohup sh ./mqbroker -c ../conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 & #启动从Broker nohup sh ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &
3.4、搭建广控台(具体搭建请看第一章节)
上传光控制台:
#修改配置文件 cd /usr/local/rocketmq-console/src/main/resources vim application.properties
打包运行
cd /usr/local/rocketmq-console mvn clean package -Dmaven.test.skip=true cd /usr/local/rocketmq-console/target nohup java -jar rocketmq-console-ng-1.0.1.jar 1>/dev/null 2>&1 & lsof -i :80804、测试故障转移
4.1、pom文件
org.apache.rocketmq rocketmq-client 4.4.0
4.2、Consts
public class RocketMQConsts { //=========================Hello RocketMQ========================== public static final String HELLO_TOPIC = "helloTopic"; public static final String HELLO_GROUP_PROVIDER = "helloGroupProvider"; public static final String HELLO_TAG = "helloTag"; public static final String HELLO_NAMESERVER="192.168.1.38:192.168.1.39:9876"; public static final String HELLO_GROUP_CUSTOMER="helloGroupCustomer"; }
4.3、Producer
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; public class RocketMQProducer { public static void main(String[] args) throws Exception { //创建producer,设置NameServer地址, 关闭rocketmq 虚拟IP DefaultMQProducer producer = new DefaultMQProducer(RocketMQConsts.HELLO_GROUP_PROVIDER); producer.setNamesrvAddr(RocketMQConsts.HELLO_NAMESERVER); producer.setVipChannelEnabled(false); producer.start(); //发送消息 for (int i = 1; i <= 10; i++) { //主题、TAG、消息体 Message msg = new Message(RocketMQConsts.HELLO_TOPIC, RocketMQConsts.HELLO_TAG, ("Hello RocketMQ " + i).getBytes()); producer.send(msg); System.out.println("发送消息" + msg); } //关闭 producer.shutdown(); } }
4.4、Customer
public class RocketMQCustomer { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); //配置组名 IP consumer.setNamesrvAddr(RocketMQConsts.HELLO_NAMESERVER); consumer.setConsumerGroup(RocketMQConsts.HELLO_GROUP_CUSTOMER); //最后偏移量读取消息 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); try { //订阅主题,监听主题下哪些标签 consumer.subscribe(RocketMQConsts.HELLO_TOPIC, "*"); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try { //默认一条一条消息消费 Message msg = msgs.get(0); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); String topic = msg.getTopic(); String body = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); String keys = msg.getKeys(); System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); //返回成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (UnsupportedEncodingException e) { e.printStackTrace(); //返回稍后进行投递 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); //开始 consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } } }
4.5、启动消费者、启动生产者
4.6、关闭RocketMQ Master、启动生产者
#关闭Master Broker cd /usr/local/rocketmq/distribution/target/apache-rocketmq/bin sh ./mqshutdown broker
启动生产者发送消息
4.7、启动消费者
4.8、在启动主节点,发现消费者没有数据可以消费
5.1、nameserver nameserver互相独立,彼此没有通信关系,单台nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用。nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。 5.2、broker与nameserver关系 broker:单个broker和所有nameserver保持长连接,每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。 nameserver:nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接。 5.3、provider与nameserver关系 单个生产者者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,生产者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。默认情况下,生产者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,生产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败。 5.4、provider与broker关系 单个生产者和该生产者关联的所有broker保持长连接。默认情况下,生产者每隔30秒向所有broker发送心跳,该时间由DefaultMQProducer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接。 5.5、cunstomer与nameserver关系 单个消费者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由DefaultMQPushConsumer的pollNameServerInteval参数决定,可手动配置。 5.6、cunstomer与broker关系 单个消费者和该消费者关联的所有broker保持长连接。默认情况下,消费者每隔30秒向所有broker发送心跳,该时间由DefaultMQPushConsumer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。 5.7、主从 只有master才能进行写入 *** 作,slave不允许写入只能同步,同步策略取决于master的配置。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)