kafka-docker上使用+常用指令

kafka-docker上使用+常用指令,第1张

生产者向broker发送消息消费者接收消息,broker是物理概念,部署几个kafka即几个broker,topic是逻辑概念,往topic里发送消息会发送到设置好的几个partion上,每个partion存储作为不同队列存储不同数据,partion有leader和follower备份机制,消息发送时会轮循发送到不同broker的不同partion中,同一消费者只能消费同一分区,通过offset记录消费位置,消费者组可以访问一个topic的不同partion

启动镜像

启动kafka可以带上参数,这样会自动修改kafka里的配置文件(/opt/kafka_版本/conf/serverproperties),否则不带参数需要自己进入进行手动修改 带参数版启动可参考

其中1721703需要改成自己docker的网桥连接地址

查看已启动容器

查看所有容器

启动未启动的容器

进入kafka容器

创建主题

主题和分区可以理解为:topic是逻辑划分,kafka通过topic进行区分消息,topic的数据会被存储到日志中,如果数据量太大可以引入partion(同时提高读写吞吐量)来分段存储数据。其中replication-factor作用是将任意分区复制到broker上,broker是物理概念,部署了一个kafka可认为broker数为1,我本机只有一个kafka所以这里replication-factor超过1会报错。 综上几个概念可以理解为:集群中有多个broker,创建主题时可以指明topic有多个partitions(消息拆分到不同分区进行存储,一个partion只能被一个消费者消费--partion内部保证接收数据顺序),可以为分区创建多个副本replication,不同副本在不同的broker中(作为备份使用,这里有leader和flower的区分)

查看topic信息

集群部署
可以通过compose集群化部署过es,这里通过创建另一个composeyml文件来部署kafka,配置文件参考 docker-compose集群部署

生产者:

消费者:
方式一:从当前主题的迁移量位置+1开始取数据

方式二:从当前主题第一条消息开始消费

生产者将消息发送broker,broker将消息保存到本地日志中,消息的保存时有序的
单播消息:
当存在一个生产者,一个消费者组的时候,一个消费者组中只有一个消费者会收到消息

多播消息:
当存在一个生产者,多个消费组,不同消费组只有一个消费者收到消息

查看消费组详细信息:

CURRENT-OFFSET:最后被消费的偏移量
LOG-END-OFFSET:消息总量(最后一条消息的偏移量)
LAG :积压了多少条消息

常见问题:
1、如何防止消息丢失
生产者:使用同步消息发送;ack设置为1/all;设置同步分区数>=2
消费者:把自动提交改成手动提交
2、如何防止消息的重复消费
针对网络抖动导致的生产者重试(发送消息),可以设置消费者加锁解决;
3、消息积压
消费者使用多线程异步处理接收数据;创建多个消费者组部署到其他机器上;通过业务架构设计,提升业务层面消费性能。

ps:
缓冲区:kafka默认会创建一个消息缓冲区去存放要发送的消息,大小是32M,每次本地线程会去缓冲区拉16K数据发送到broker,如果不到16K等待10ms也会将数据发送到broker

参考链接:
1、kafka安装教程--推荐
2、kafka配置文件serverproperties参数说明
3、创建主题分区数
4、解决docker容器启动不了的问题
5、通过docker-compose集群部署
6、学习视频

serverproperties中有两个listeners。 listeners:启动kafka服务监听的ip和端口,可以监听内网ip和0000(不能为外网ip),默认为javanetInetAddressgetCanonicalHostName()获取的ip。advertisedlisteners:生产者和消费者连接的地址,kafka会把该地址注册到zookeeper中,所以只能为除0000之外的合法ip或域名 ,默认和listeners的配置一致。

只有在jdk19并且kafka版本在10x之前的版本才会出现。

如上图所示,kafaka集群的 broker,和 Consumer 都需要连接 Zookeeper。
Producer 直接连接 Broker。

Producer 把数据上传到 Broker,Producer可以指定数据有几个分区、几个备份。上面的图中,数据有两个分区 0、1,每个分区都有自己的副本:0'、 1'。

的分区为 leader,白色的为 follower。

leader 处理 partition 的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。 如下图所示,红色的为 leader,绿色的为 follower,leader复制自己到其他 Broker 中:

Topic 分区被放在不同的 Broker 中,保证 Producer 和 Consumer 错开访问 Broker,避免访问单个 Broker造成过度的IO压力,使得负载均衡。

Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来 ,此时就使用到了Zookeeper。在Zookeeper上会有一个专门 用来进行Broker服务器列表记录 的节点:

/brokers/ids

每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0N]。

Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后, 每个Broker就会将自己的IP地址和端口信息记录 到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。

在Kafka中,同一个 Topic的消息会被分成多个分区 并将其分布在多个Broker上, 这些分区信息及与Broker的对应关系 也都是由Zookeeper在维护,由专门的节点来记录,如:

/borkers/topics

Kafka中每个Topic都会以/brokers/topics/[topic]的形式被记录,如/brokers/topics/login和/brokers/topics/search等。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册自己的Broker ID并写入针对该Topic的分区总数,如/brokers/topics/login/3->2,这个节点表示Broker ID为3的一个Broker服务器,对于"login"这个Topic的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。

由于同一个Topic消息会被分区并将其分布在多个Broker上,因此, 生产者需要将消息合理地发送到这些分布式的Broker上 ,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。

(1) 四层负载均衡,根据生产者的IP地址和端口来为其确定一个相关联的Broker。通常,一个生产者只会对应单个Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的TCP连接,只需要和Broker维护单个TCP连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差异巨大,同时,生产者也无法实时感知到Broker的新增和删除。

(2) 使用Zookeeper进行负载均衡,由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制。

与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者, 每条消息都只会发送给分组中的一个消费者 ,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。

消费组 (Consumer Group):
consumer group 下有多个 Consumer(消费者)。
对于每个消费者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group 内部的所有消费者共享该 ID。订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。
同时,Kafka为每个消费者分配一个Consumer ID,通常采用"Hostname:UUID"形式表示。

在Kafka中,规定了 每个消息分区 只能被同组的一个消费者进行消费 ,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]

其中,[broker_id-partition_id]就是一个 消息分区 的标识,节点内容就是该 消息分区 上 消费者的Consumer ID。

在消费者对指定消息分区进行消息消费的过程中, 需要定时地将分区消息的消费进度Offset记录到Zookeeper上 ,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

节点内容就是Offset的值。

消费者服务器在初始化启动时加入消费者分组的步骤如下

注册到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。

对 消费者分组 中的 消费者 的变化注册监听 。每个 消费者 都需要关注所属 消费者分组 中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。

对Broker服务器变化注册监听 。消费者需要对/broker/ids/[0-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。

进行消费者负载均衡 。为了让同一个Topic下不同分区的消息尽量均衡地被多个 消费者 消费而进行 消费者 与 消息 分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会发出消费者负载均衡。

以下是kafka在zookeep中的详细存储结构图:

早期版本的 kafka 用 zk 做 meta 信息存储,consumer 的消费状态,group 的管理以及 offse t的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中确实逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖

下面已常用的选项作说明

下面对画方框的三列做着重解释。

注意如下这种情况也是不计算作倾斜的。

此时,broker2 拥有 3 个 leader 分区,超过平均范围的 2 个,所以 broker2 就 Leader 分区倾斜了,倾斜率 1/5=20%。

用下图举例说明:

上面三个参数对于衡量 topic 的稳定性有重要的影响:
Broker Skew : 反映 broker 的 I/O 压力,broker 上有过多的副本时,相对于其他 broker ,该 broker 频繁的从 Leader 分区 fetch 抓取数据,磁盘 *** 作相对于其他 broker 要多,如果该指标过高,说明 topic 的分区均不不好,topic 的稳定性弱;
Broker Leader Skew :数据的生产和消费进程都至于 Leader 分区打交道,如果 broker 的 Leader 分区过多,该 broker 的数据流入和流出相对于其他 broker 均要大,该指标过高,说明 topic 的分流做的不够好;
Under Replicated : 该指标过高时,表明 topic 的数据容易丢失,数据没有复制到足够的 broker 上。

下面着重讲述红框部分:

上述是关于“优先副本”的相关描述,即在理想的状态下,分区的 leader 最好是 “优先副本”,这样有利于保证集群中 broker 的领导权比较均衡。重新均衡集群的 leadership 可采用 kafka manager 提供的工具:

一般而言,手动调整、系统自动分配分区和添加分区之后,都需要调用 Reassign Partition

kafka manager 能够获取到当前消费 kafka 集群消费者的相关信息。

参考
[1]: Kafka副本同步机制理解
[2]: kafka维护工具使用指南
[3]: kafka 中文文档
[4]: Replication tools

首先要启动好kafka集群
1、集群时间同步
2、启动zookeeper集群
3、启动kafka集群
启动kafka集群的方式就是在集群中每台机器 kafka目录 下运行
nohup bin/kafka-server-startsh config/serverproperties >/dev/null 2>&1 &

kafka发出消息和接收消息都是基于topic,所以要先创建一个topic,才能向里面发消息。创建topic的脚本:

topic创建好了,就可以向里边发送消息了。

通过命令行实现数据的发送 producer 生产者
kafka-console-producersh 就是用来测试用的脚本,可以模拟kafka消息的发送端。
直接运行 kafka-console-producersh 查看帮助

--broker-list 指定我们kafka集群的地址
--topic 指定我们的消息发送到哪个topic里面去

通过命令行实现数据的接收 consumer 消费者

--bootstrap-server 表示我们的kafak集群的地址,在旧版本中使用的是--zookeeper参数,两者至少使用一个
--from-beginning 表示我们从最开始的数据进行消费
--topic指定我们topic的名字

在producer端发送数据,在consumer端可以收到数据

进入安装目录,修改serverproperties文件

修改如下属性,除id外,其他每台主机一致:

语义配置:(可选)

先启动zookeeper集群,已经在三台主机上配置好了zookeeper集群,启动:
在各台主机上进入zookeeper目录,分别启动zk:

在各台主机上进入kafka目录,分别启动kafka:

启动结果为:

kafka占据了前台,要使用主机,需要打开新终端

在新打开的终端上,进入zk目录,

进入kafka目录,创建主体


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/dianzi/13049734.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-29
下一篇 2023-05-29

发表评论

登录后才能评论

评论列表(0条)

保存