SpringBoot集成Kafka,实现简单的收发消息

SpringBoot集成Kafka,实现简单的收发消息,第1张

在kafka的 config 目录下找到 serverproperties 配置文件

把 listeners 和 advertisedlisteners 两处配置的注释去掉,可以根据需要配置连接的服务器 外网IP 和 端口号 ,我这里演示选择的是本地 localhost 和默认端口 9092

KafkaTemplate 这个类包装了个生产者 Producer ,来提供方便的发送数据到 kafka 的主题 topic 里面。
send() 方法的源码, KafkaTemplate 类中还重载了很多 send() 方法,有需要可以看看源码

通过 KafkaTemplate 模板类发送数据
kafkaTemplatesend(String topic, K key, V data) ,第一个入参是主题,第二个入参是发送的对象,第三个入参是发送的数据。通过 @KafkaListener 注解配置用户监听 topics

bootstrap-servers :kafka服务器地址(可以多个)
consumergroup-id :指定一个默认的组名
不指定的话会报

1 earliest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费
2 latest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据
3 none : topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset ,则抛出异常

这个属性也是必须配置的,不然也是会报错的

在使用Kafka发送接收消息时,生产者 producer 端需要序列化,消费者 consumer 端需要反序列化,由于网络传输过来的是 byte[] ,只有反序列化后才能得到生产者发送的真实的消息内容。这样消息才能进行网络传输
consumerkey-deserializer 和 consumervalue-deserializer 是消费者 key/value 反序列化
producerkey-deserializer 和 producervalue-deserializer 是生产者 key/value 序列化

StringDeserializer 是内置的字符串反序列化方式

StringSerializer 是内置的字符串序列化方式

在 orgapachekafkacommonserialization 源码包中还提供了多种类型的序列化和反序列化方式
要自定义序列化方式,需要实现接口 Serializer
要自定义反序列化方式,需要实现接口 Deserializer

详细可以参考
>

kafka是一个 分布式 的、支持 分区的(partition )、多副本的 (replica ),基于 zookeeper 协调的 分布式消息系统。

从上面的描述中我们可以知道kafka的核心知识点:partition、replica

一个topic可以认为一个一类消息,每个topic将被分成多个partition。

在上图中我们的生产者会决定发送到哪个 Partition:

如果没有 Key 值则进行轮询发送。

如果有 Key 值,对 Key 值进行 Hash,然后对分区数量取余,保证了同一个 Key 值的会被路由到同一个分区。(所有系统的partition都是同一个路数)

在上图我们也可以看到,offset是跟partition走的,每个partition都有自己的offset。

总所周知,topic在物理层面以partition为分组,一个topic可以分成若干个partition,那么topic以及partition又是怎么存储的呢

其实partition还可以细分为logSegment,一个partition物理上由多个logSegment组成,那么这些segment又是什么呢

LogSegment 文件由两部分组成,分别为“index”文件和“log”文件,分别表示为 Segment 索引文件和数据文件。

这两个文件的命令规则为:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,如下:

如上图,“index”索引文件存储大量的元数据,“log”数据文件存储大量的消息,索引文件中的元数据指向对应数据文件中message的物理偏移地址。其中以“index”索引文件中的元数据[3, 348]为例,在“log”数据文件表示第3个消息,即在全局partition中表示170410+3=170413个消息,该消息的物理偏移地址为348。

那么如何从partition中通过offset查找message呢

以上图为例,读取offset=170418的消息,首先查找segment文件,其中00000000000000000000index为最开始的文件,第二个文件为00000000000000170410index(起始偏移为170410+1=170411),而第三个文件为00000000000000239430index(起始偏移为239430+1=239431),所以这个offset=170418就落到了第二个文件之中。其他后续文件可以依次类推,以其实偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。其次根据00000000000000170410index文件中的[8,1325]定位到00000000000000170410log文件中的1325的位置进行读取。

要是读取offset=170418的消息,从00000000000000170410log文件中的1325的位置进行读取,那么怎么知道何时读完本条消息,否则就读到下一条消息的内容了

这个就需要联系到消息的物理结构了,消息都具有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小,即读取到哪里截止。

Kafka 的副本机制是多个服务端节点对其他节点的主题分区的日志进行复制。当集群中的某个节点出现故障,访问故障节点的请求会被转移到其他正常节点(这一过程通常叫 Reblance)。

Kafka 每个主题的每个分区都有一个主副本以及 0 个或者多个副本,副本保持和主副本的数据同步,当主副本出故障时就会被替代。

当producer向leader发送数据时,可以通过requestrequiredacks参数来设置数据可靠性的级别:

在kafka系统中,会涉及到多处选举机制,主要有这三方面:

1、定位:分布式的消息队列系统,同时提供数据分布式缓存功能(默认7天)
2、消息持久化到磁盘,达到O(1)访问速度,预读和后写,对磁盘的顺序访问(比内存访问还要快)
3、Storm(分布式的实时计算框架)
Kafka目标成为队列平台
4、基本组件:
Broker:每一台机器是一个Broker
Producer:日志消息生产者,主要写数据
Consumer:日志消息消费者,主要读数据
Topic:是虚拟概念,不同的consumer去指定的topic去读数据,不同producer可以往不同的topic去写
Partition:是实际概念,文件夹,是在Topic的基础上做了进一步分层
5、Partition功能:负载均衡,需要保证消息的顺序性
顺序性的保证:订阅消息是从头往后读取的,写消息是尾部追加,所以整体消息是顺序的
如果有多个partiton存在,可能会出现顺序不一致的情况,原因:每个Partition相互独立
6、Topic:逻辑概念
一个或多个Partition组成一个Topic
7、Partition以文件夹的形式存在
8、Partition有两部分组成:
(1)index log:(存储索引信息,快速定位segment文件)
(2)message log:(真实数据的所在)
9、HDFS多副本的方式来完成数据高可用
如果设置一个Topic,假设这个Topic有5个Partition,3个replication
Kafka分配replication的算法:
假设:
将第i个Partition分配到(i % N)个Broker上
将第i个Partition的第j个replication分配到( (i+j) % N)个Broker上
虽然Partition里面有多个replication
如果里面有M个replication,其中有一个是Leader,其他M-1个follower
10、zookeeper包系统的可用性,zk中会保存一些meta信息(topic)
11、物理上,不同的topic的消息肯定是分开存储的
12、偏移量——offset:用来定位数据读取的位置
13、kafka内部最基本的消息单位——message
14、传输最大消息message的size不能超过1M,可以通过配置来修改
15、Consumer Group
16、传输效率:zero-copy
0拷贝:减少Kernel和User模式上下文的切换
直接把disk上的data传输给socket,而不是通过应用程序来传输
17、Kafka的消息是无状态的,消费者必须自己维护已消费的状态信息(offset)
减轻Kafka的实现难度
18、Kafka内部有一个时间策略:SLA——消息保留策略(消息超过一定时间后,会自动删除)
19、交付保证:
at least once:至少一次(会有重复、但不丢失)
at most once:最多发送一次(不重复、但可能丢失)
exactly once:只有一次(最理想),目前不支持,只能靠客户端维护
20、Kafka集群里面,topic内部由多个partition(包含多个replication),达到高可用的目的:
日志副本:保证可靠性
角色:主、从
ISR:是一个集合,只有在集合中的follower,才有机会被选为leader
如何让leader知道follower是否成功接收数据(心跳,ack)
如果心跳正常,代表节点活着
21、怎么算“活着”
(1)心跳
(2)如果follower能够紧随leader的更新,不至于被落的太远
如果一旦挂掉,从ISR集合把该节点删除掉

前提:需要把zookeeper提前启动好
一、单机版
1、启动进程:
]# /bin/kafka-server-startsh config/serverproperties
2、查看topic列表:
]# /bin/kafka-topicssh --list --zookeeper localhost:2181
3、创建topic:
]# /bin/kafka-topicssh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic newyear_test
4、查看topic描述:
]# /bin/kafka-topicssh --describe --zookeeper localhost:2181 --topic newyear_test
5、producer发送数据:
]# /bin/kafka-console-producersh --broker-list localhost:9092 --topic newyear_test
6、consumer接收数据:
]# /bin/kafka-console-consumersh --zookeeper localhost:2181 --topic newyear_test --from-beginning
7、删除topic:
]# /bin/kafka-topicssh --delete --zookeeper localhost:2181 --topic newyear_test
二、集群版
在slave1和slave2上的brokerid一定设置不同
分别在slave1和slave2上开启进程:
/bin/kafka-server-startsh config/serverproperties

创建topic:
]# /bin/kafka-topicssh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic newyear_many_test

1、实现一个consumer group
首先在不同的终端分别开启consumer,保证groupid一致
]# python consumer_kafkapy

执行一次producer:
]# python producer_kafkapy

2、指定partition发送数据
]# python producer_kafka_2py

3、指定partition读出数据
]# python consumer_kafka_2py
consumer_kafkapy:

producer_kafkapy:

consumer_kafka_2py:

producer_kafka_2py:

1新建/conf/kafka_test/flume_kafkaconf

2启动flume:
]# flume-ng agent -c conf -f /conf/kafka_test/flume_kafkaconf -n a1 -Dflumerootlogger=INFO,console
启动成功,如下图:

3测试:
11flume监控产生的数据:
]# for i in seq 1 100 ; do echo '====> '$i >> 1log ; done
12kafka消费数据:
]# /bin/kafka-console-consumersh --zookeeper localhost:2181 --topic topic_1013 --from-beginning
消费结果如下图:

中一个条件的时候就触发 发送 batchnummessages 异步发送 每次批量发送的条目 queuebufferingmaxms 异步发送的时候 发送时间间隔 单位是毫秒 一些相关参数


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/dianzi/13486538.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-08-16
下一篇 2023-08-16

发表评论

登录后才能评论

评论列表(0条)