kafka: #0.8以前的kafka,消费的进度(offset)是写在zk中的,所以consumer需要知道zk的地址。这个方案有性能问题,0.9 的时候整体大改了一次,brokers 接管了消费进度,consumer 不再需要和 zookeeper 通信了,所以就用bootstrap-server了 #以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接。 bootstrap-servers: saas.kafka1.infra.kafka.cn:9092,saas.kafka2.infra.kafka.cn:9092,saas.kafka3.infra.kafka.cn:9092 producer: #如果该值大于零时,表示启用重试失败的发送次数 retries: 0 #每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,这有助于提升客户端和服务器上的性能,此配置控制默认批量大小(以字节为单位),默认值为16384 batch-size: 16384 #生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为33554432 buffer-memory: 33554432 #key的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer key-serializer: org.apache.kafka.common.serialization.StringSerializer #值的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer value-serializer: org.apache.kafka.common.serialization.StringSerializer #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。 #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。 #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。 #可以设置的值为:all, -1, 0, 1 acks: 1 consumer: #如果为true,则消费者的偏移量将在后台定期提交,默认值为true enable-auto-commit: true #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。 auto-commit-interval: 5000 #用于标识此使用者所属的使用者组的唯一字符串。 group-id: admin_service #ID在发出请求时传递给服务器;用于服务器端日志记录。 client-id: admin_service #当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量,可选的值为latest, earliest, none #earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 #latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 #none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 #组与组间的消费者是没有关系的。topic中已有分组消费数据,新建其他分组ID的消费者时,之前分组提交的offset对新建的分组消费不起作用。 auto-offset-reset: earliest #密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
rabbitmq: host: 127.0.0.1 port: 5672 username: root password: root #虚拟host 可以不设置,使用server默认host virtual-host: vhost #确认消息已发送到交换机(Exchange) publisher-/confirm/is: true #确认消息已发送到队列(Queue) publisher-returns: true
Direct Exchange
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
Fanout Exchange
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
Topic Exchange
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
Dead Letter Exchange死信交换机,进入的条件如下:
1.消息被拒绝(basic.reject / basic.nack),并且requeue = false
2.消息TTL过期
3.队列达到最大长度
Header Exchange
1.头交换机和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的header数据;
2.主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值;
RabbitMQ 如何确保每个消息能被消费?RabbitMQ 使用 ack 消息确认的方式保证每个消息都能被消费,开发者可根据自己的实际业务,选择 channel.basicAck() 方法手动确认消息被消费。
RabbitMQ 接收到消息之后必须消费吗?RabbitMQ 接收到消息之后可以不消费,在消息确认消费之前,可以做以下两件事:
1.拒绝消息消费,使用 channel.basicReject(消息编号, true) 方法,消息会被分配给其他订阅者;
2.设置为死信队列,死信队列是用于专门存放被拒绝的消息队列;
消息持久化有哪些缺点?如何缓解?消息持久化的缺点是很消耗性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量。可使用固态硬盘来提高读写速度,以达到缓解消息持久化的缺点;
RabbitMQ 有哪些重要的组件?它们有什么作用?RabbitMQ 包含的重要组件有:ConnectionFactory(连接管理器)、Channel(信道)、Exchange(交换器)、Queue(队列)、RoutingKey(路由键)、BindingKey(绑定键) 等重要的组件,它们的作用如下:
1.ConnectionFactory(连接管理器):应用程序与 RabbitMQ 之间建立连接的管理器,程序代码中使用;
2.Channel(信道):消息推送使用的通道;
3.Exchange(交换器):用于接受、分配消息;
4.Queue(队列):用于存储生产者的消息;
5.RoutingKey(路由键):用于把生成者的数据分配到交换器上;
6.BindingKey(绑定键):用于把交换器的消息绑定到队列上;
消息队列的应用场景有哪些?1.应用解耦,比如,用户下单后,订单系统需要通知库存系统,假如库存系统无法访问,则订单减库存将失败,从而导致订单失败。订单系统与库存系统耦合,这个时候如果使用消息队列,可以返回给用户成功,先把消息持久化,等库存系统恢复后,就可以正常消费减去库存了;
2.削峰填谷,比如,秒杀活动,一般会因为流量过大,从而导致流量暴增,应用挂掉,这个时候加上消息队列,服务器接收到用户的请求后,首先写入消息队列,假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面;
3.日志系统,比如,客户端负责将日志采集,然后定时写入消息队列,消息队列再统一将日志数据存储和转发;
RabbitMQ 哪些交换机可以获取历史消息的是?fanout 和 topic 都是广播形式的,因此无法获取历史消息,而 direct 可以;
RabbitMQ 包含事务功能吗?如何使用?RabbitMQ 包含事务功能,主要是对信道(Channel)的设置,主要方法有以下三个:
1.channel.txSelect() 声明启动事务模式;
2.channel.txComment() 提交事务;
3.channel.txRollback() 回滚事务;
RabbitMQ 的事务在什么情况下是无效的?RabbitMQ 的事务在 autoAck=true 也就是自动消费确认的时候,事务是无效的。因为如果是自动消费确认,RabbitMQ 会直接把消息从队列中移除,即使后面事务回滚也不能起到任何作用;
Kafka 可以脱离 ZooKeeper 单独使用吗?Kafka 不能脱离 ZooKeeper 单独使用,因为 Kafka 使用 ZooKeeper 管理和协调 Kafka 的节点服务器;
Kafka 有几种数据保留的策略?Kafka 有两种数据保存策略:按照过期时间保留和按照存储的消息大小保留;
Kafka 同时设置了 7 天和 10G 清除数据,到第五天的时候消息达到了 10G,这个时候 Kafka 将如何处理?这个时候 Kafka 会执行数据清除工作,时间和大小不论哪个满足条件,都会清空数据;
什么情况会导致 Kafka 运行变慢?1.CPU 性能瓶颈;
2.磁盘读写瓶颈;
3.网络瓶颈;
使用 Kafka 集群需要注意什么?集群的数量不是越多越好,最好不要超过 7 个,因为节点越多,消息复制需要的时间就越长,整个群组的吞吐量就越低;
集群数量最好是单数,因为超过一半故障集群就不能用了,设置为单数容错率更高;
如何保证消费的可靠性传输?每种MQ都要从三个角度来分析:
1.生产者弄丢数据;
2.消息队列弄丢数据;
3.消费者弄丢数据;
RabbitMQ如何保证消费的可靠性传输?生产者丢数据:
1.从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和/confirm/i模式来确保生产者不丢消息;
2.transaction机制就是说,发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;
3.confirm 机制。一旦channel进入/confirm/i模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试 *** 作;
消息队列丢数据:
1.开启持久化磁盘的配置。这个持久化配置可以和/confirm/i机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发;
消费者丢数据:
1.消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时rabbitMQ会立即将消息删除,这种情况下,如果消费者出现异常而未能处理消息,就会丢失该消息。至于解决方案,采用手动确认消息即可。
Kafaka如何保证消费的可靠性传输?生产者丢数据:
1.设置acks=all,生产者一定不会丢消息;
消息队列丢数据:
1.开启副本机制,保证每个分区副本至少有2个;
2.设置min.insync.replicas大于1(ISR机制),要求leader至少感知一个follower跟自己保持联系,这样才能确保leader挂了还有一个follower;
3.在producer设置acks=all,要求每条数据必须写入所有replica之后,才能认为是携程共了;
4.在producer设置retries=MAX(较大的值),保证写入失败之后,无限重试;
消费者丢数据:
1.kafka会自动提交offset,关闭自动提交offset,处理完毕之后自己手动提交offset,可以保证数据不丢失;
什么是消息持久化?RabbitMQ 要实现消息持久化,需要满足哪些条件?1.消息持久化是把消息保存到物理介质上,以防止消息的丢失
2.投递消息的时候 durable 设置为 true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),参数 2 设置为 true 持久化;
3.设置投递模式 deliveryMode 设置为 2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENTTEXTPLAIN,x),参数 3 设置为存储纯文本到磁盘;
4.消息已经到达持久化交换器上;
5.消息已经到达持久化的队列;
如何保证消息的顺序性?1.需要保持先后顺序的消息放入到同一个消息队列中(kafka是partition,rabbitmq是queue),然后只用一个消费者去消费该队列;
2.如果为了吞吐量,有多个消费者怎么办?
3.rabbitmq,使用单线程保证消息的顺序性,对消息进行编号,消费者处理消息根据编号进行处理;
处理消息积压问题?1.全部丢弃:如果这些消息允许丢失,那么此时可以紧急修改消费者系统的代码,在代码里对所有的消息都获取到就直接丢弃,不做任何的处理,这样可以迅速的让积压在MQ里的百万消息被处理掉,只不过处理方式就是全部丢弃而已;
2.临时扩容消费者系统,增加机器来加快消费速度;
如何保证消息不被重复消费?这个问题其实换一种问法就是,如何保证消息队列的幂等性?这个问题可以认为是消息队列领域的基本问题。
先来说一下为什么会造成重复消费:
1.正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发出的确认消息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offet的概念,简单说一下,就是每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了;
2.造成重复消费的原因,就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者;
如何解决?这个问题针对业务场景来答,分以下三种情况:
1.比如,你拿到这个消息做数据库的insert *** 作,那就容易了,给这个消息做一个唯一的主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据;
2.再比如,你拿到这个消息做redis的set的 *** 作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set *** 作本来就算幂等 *** 作;
3.准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)