MQ消息队列

MQ消息队列,第1张

MQ消息队列
kafka:
  #0.8以前的kafka,消费的进度(offset)是写在zk中的,所以consumer需要知道zk的地址。这个方案有性能问题,0.9 的时候整体大改了一次,brokers 接管了消费进度,consumer 不再需要和 zookeeper 通信了,所以就用bootstrap-server了
  #以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接。
  bootstrap-servers: saas.kafka1.infra.kafka.cn:9092,saas.kafka2.infra.kafka.cn:9092,saas.kafka3.infra.kafka.cn:9092
  producer:
    #如果该值大于零时,表示启用重试失败的发送次数
    retries: 0
    #每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,这有助于提升客户端和服务器上的性能,此配置控制默认批量大小(以字节为单位),默认值为16384
    batch-size: 16384
    #生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为33554432
    buffer-memory: 33554432
    #key的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    #值的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
    #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
    #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
    #可以设置的值为:all, -1, 0, 1
    acks: 1
  consumer:
    #如果为true,则消费者的偏移量将在后台定期提交,默认值为true
    enable-auto-commit: true
    #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
    auto-commit-interval: 5000
    #用于标识此使用者所属的使用者组的唯一字符串。
    group-id: admin_service
    #ID在发出请求时传递给服务器;用于服务器端日志记录。
    client-id: admin_service
    #当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量,可选的值为latest, earliest, none
    #earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    #latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    #none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    #组与组间的消费者是没有关系的。topic中已有分组消费数据,新建其他分组ID的消费者时,之前分组提交的offset对新建的分组消费不起作用。
    auto-offset-reset: earliest
    #密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    #值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: root
    password: root
    #虚拟host 可以不设置,使用server默认host
    virtual-host: vhost
    #确认消息已发送到交换机(Exchange)
    publisher-/confirm/is: true
    #确认消息已发送到队列(Queue)
    publisher-returns: true

Direct Exchange

直连型交换机,根据消息携带的路由键将消息投递给对应队列

Fanout Exchange

扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

Topic Exchange

主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。

Dead Letter Exchange

死信交换机,进入的条件如下:

1.消息被拒绝(basic.reject / basic.nack),并且requeue = false

2.消息TTL过期

3.队列达到最大长度

Header Exchange

1.头交换机和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的header数据;

2.主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值;

RabbitMQ 如何确保每个消息能被消费

        RabbitMQ 使用 ack 消息确认的方式保证每个消息都能被消费,开发者可根据自己的实际业务,选择 channel.basicAck() 方法手动确认消息被消费。

RabbitMQ 接收到消息之后必须消费吗?

        RabbitMQ 接收到消息之后可以不消费,在消息确认消费之前,可以做以下两件事:

        1.拒绝消息消费,使用 channel.basicReject(消息编号, true) 方法,消息会被分配给其他订阅者;

        2.设置为死信队列,死信队列是用于专门存放被拒绝的消息队列;

消息持久化有哪些缺点?如何缓解?

        消息持久化的缺点是很消耗性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量。可使用固态硬盘来提高读写速度,以达到缓解消息持久化的缺点;

RabbitMQ 有哪些重要的组件?它们有什么作用?

        RabbitMQ 包含的重要组件有:ConnectionFactory(连接管理器)、Channel(信道)、Exchange(交换器)、Queue(队列)、RoutingKey(路由键)、BindingKey(绑定键) 等重要的组件,它们的作用如下:

        1.ConnectionFactory(连接管理器):应用程序与 RabbitMQ 之间建立连接的管理器,程序代码中使用;

        2.Channel(信道):消息推送使用的通道;

        3.Exchange(交换器):用于接受、分配消息;

        4.Queue(队列):用于存储生产者的消息;

        5.RoutingKey(路由键):用于把生成者的数据分配到交换器上;

        6.BindingKey(绑定键):用于把交换器的消息绑定到队列上;

消息队列的应用场景有哪些?

        1.应用解耦,比如,用户下单后,订单系统需要通知库存系统,假如库存系统无法访问,则订单减库存将失败,从而导致订单失败。订单系统与库存系统耦合,这个时候如果使用消息队列,可以返回给用户成功,先把消息持久化,等库存系统恢复后,就可以正常消费减去库存了;

        2.削峰填谷,比如,秒杀活动,一般会因为流量过大,从而导致流量暴增,应用挂掉,这个时候加上消息队列,服务器接收到用户的请求后,首先写入消息队列,假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面;

        3.日志系统,比如,客户端负责将日志采集,然后定时写入消息队列,消息队列再统一将日志数据存储和转发;

RabbitMQ 哪些交换机可以获取历史消息的是?

        fanout 和 topic 都是广播形式的,因此无法获取历史消息,而 direct 可以;

RabbitMQ 包含事务功能吗?如何使用?

        RabbitMQ 包含事务功能,主要是对信道(Channel)的设置,主要方法有以下三个:

        1.channel.txSelect() 声明启动事务模式;

        2.channel.txComment() 提交事务;

        3.channel.txRollback() 回滚事务;

RabbitMQ 的事务在什么情况下是无效的?

        RabbitMQ 的事务在 autoAck=true 也就是自动消费确认的时候,事务是无效的。因为如果是自动消费确认,RabbitMQ 会直接把消息从队列中移除,即使后面事务回滚也不能起到任何作用;

Kafka 可以脱离 ZooKeeper 单独使用吗?

        Kafka 不能脱离 ZooKeeper 单独使用,因为 Kafka 使用 ZooKeeper 管理和协调 Kafka 的节点服务器;

Kafka 有几种数据保留的策略?

        Kafka 有两种数据保存策略:按照过期时间保留和按照存储的消息大小保留;

Kafka 同时设置了 7 天和 10G 清除数据,到第五天的时候消息达到了 10G,这个时候 Kafka 将如何处理?

        这个时候 Kafka 会执行数据清除工作,时间和大小不论哪个满足条件,都会清空数据;

什么情况会导致 Kafka 运行变慢?

        1.CPU 性能瓶颈;

        2.磁盘读写瓶颈;

        3.网络瓶颈;

使用 Kafka 集群需要注意什么?

        集群的数量不是越多越好,最好不要超过 7 个,因为节点越多,消息复制需要的时间就越长,整个群组的吞吐量就越低;

        集群数量最好是单数,因为超过一半故障集群就不能用了,设置为单数容错率更高;

如何保证消费的可靠性传输?

        每种MQ都要从三个角度来分析:

        1.生产者弄丢数据;

        2.消息队列弄丢数据;

        3.消费者弄丢数据;

RabbitMQ如何保证消费的可靠性传输?

        生产者丢数据:

        1.从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和/confirm/i模式来确保生产者不丢消息;

        2.transaction机制就是说,发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;

        3.confirm 机制。一旦channel进入/confirm/i模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试 *** 作;

        消息队列丢数据:

        1.开启持久化磁盘的配置。这个持久化配置可以和/confirm/i机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发;

        消费者丢数据:

        1.消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时rabbitMQ会立即将消息删除,这种情况下,如果消费者出现异常而未能处理消息,就会丢失该消息。至于解决方案,采用手动确认消息即可。

Kafaka如何保证消费的可靠性传输?

        生产者丢数据:

        1.设置acks=all,生产者一定不会丢消息;

        消息队列丢数据:

        1.开启副本机制,保证每个分区副本至少有2个;

        2.设置min.insync.replicas大于1(ISR机制),要求leader至少感知一个follower跟自己保持联系,这样才能确保leader挂了还有一个follower;

        3.在producer设置acks=all,要求每条数据必须写入所有replica之后,才能认为是携程共了;

        4.在producer设置retries=MAX(较大的值),保证写入失败之后,无限重试;

        消费者丢数据:

        1.kafka会自动提交offset,关闭自动提交offset,处理完毕之后自己手动提交offset,可以保证数据不丢失;

什么是消息持久化?RabbitMQ 要实现消息持久化,需要满足哪些条件?

        1.消息持久化是把消息保存到物理介质上,以防止消息的丢失

        2.投递消息的时候 durable 设置为 true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),参数 2 设置为 true 持久化;

        3.设置投递模式 deliveryMode 设置为 2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENTTEXTPLAIN,x),参数 3 设置为存储纯文本到磁盘;

        4.消息已经到达持久化交换器上;

        5.消息已经到达持久化的队列;

如何保证消息的顺序性?

        1.需要保持先后顺序的消息放入到同一个消息队列中(kafka是partition,rabbitmq是queue),然后只用一个消费者去消费该队列;

        2.如果为了吞吐量,有多个消费者怎么办?

        3.rabbitmq,使用单线程保证消息的顺序性,对消息进行编号,消费者处理消息根据编号进行处理;

处理消息积压问题?

        1.全部丢弃:如果这些消息允许丢失,那么此时可以紧急修改消费者系统的代码,在代码里对所有的消息都获取到就直接丢弃,不做任何的处理,这样可以迅速的让积压在MQ里的百万消息被处理掉,只不过处理方式就是全部丢弃而已;

        2.临时扩容消费者系统,增加机器来加快消费速度;

如何保证消息不被重复消费?

        这个问题其实换一种问法就是,如何保证消息队列的幂等性?这个问题可以认为是消息队列领域的基本问题。

        先来说一下为什么会造成重复消费:

        1.正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发出的确认消息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offet的概念,简单说一下,就是每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了;

        2.造成重复消费的原因,就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者;

        如何解决?这个问题针对业务场景来答,分以下三种情况:

        1.比如,你拿到这个消息做数据库的insert *** 作,那就容易了,给这个消息做一个唯一的主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据;

        2.再比如,你拿到这个消息做redis的set的 *** 作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set *** 作本来就算幂等 *** 作;

        3.准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将以K-V形式写入redis.那消费者开始消费前,先去redis中查询有没有消费记录即可;

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5576067.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存