Kafka知识总结之集群环境搭建

Kafka知识总结之集群环境搭建,第1张

Kafka知识总结之集群环境搭建 简述

Kafka是一个分布式流平台,本质是一个消息队列。消息队列的三个作用:异步、消峰和解耦。

一. 安装zookeeper 1.1. 下载并解压
# 下载
wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz

# 解压
tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz
1.2. 修改配置

这里我们需要将解压目录中的config目录里面的zoo_sample.cfg复制一份为zoo.cfg。

这个配置文件就是zookeeper的配置文件,整体来说我们不需要修改,但这里我们别为了不影响其他的文件目录,只为了我们测试使用,需要修改下zookeeper的数据目录。

# CS通信心跳时间
tickTime=2000

# 集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)
initLimit=10

# 集群中flower服务器(F)跟leader(L)服务器之间的请求和答应最多能容忍的心跳数
syncLimit=5

# 该属性对应的目录是用来存放myid信息跟一些版本,日志,跟服务器唯一的ID信息等
dataDir=/tmp/zookeeper

# 客户端连接的接口,客户端连接zookeeper服务器的端口,zookeeper会监听这个端口,接收客户端的请求访问!这个端口默认是2181
clientPort=2181

#maxClientCnxns=60

#autopurge.snapRetainCount=3

#autopurge.purgeInterval=1

## Metrics Providers

#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

我们这里修改dataDir=/tmp/zookeeper为刚才我们解压的目录!

1.3. 启动zookeeper

进入bin目录,执行./zkServer.sh start

二. 安装kafka 2.1. 下载并解压
# 下载
https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
# 解压
tar -zxf kafka_2.12-2.6.0.tgz
2.2. 修改配置文件

我们修改三个位置:

配置项值描述listenersPLAINTEXT://192.168.31.26:9092broker 服务器要监听的地址及端口 . 默认是 localhost:9092 ,0.0.0.0的话 ,表示监听本机的所有ip地址.advertiesd.listenersPLAINTEXT://192.168.31.26:9092这个是对外提供的地址 , 当client请求到kafka时, 会分发这个地址.log.dirs/home/long/kakfa_test/kafka_datakafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-eogs-1,/tmp/kafka-logs-2
# 每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id=0

############################# Socket Server Settings #############################
listeners=PLAINTEXT://192.168.31.26:9092

advertiesd.listeners=PLAINTEXT://192.168.31.26:9092

# broker处理消息的最大线程数,一般情况下数量为cpu核数
num.network.threads=3

# broker处理磁盘IO的线程数,数值为cpu核数2倍
num.io.threads=8

# socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes=102400

# socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes=102400

# socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600


############################# Log Basics #############################
# kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs=/home/long/kakfa_test/kafka_data

num.partitions=1

num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
# 检查是否需要固化到硬盘的时间间隔
#log.flush.interval.messages=10000

#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# 数据文件保留多长时间
log.retention.hours=168

#log.retention.bytes=1073741824

# topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes=1073741824

# 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
# zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
# ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
2.3. 启动kafka

执行 bin/kafka-server-start.sh config/server.properties ,就可以启动kafka了。

上面的方法是阻塞执行的,我们可以通过-daemon 进行守护执行。

bin/kafka-server-start.sh -daemon config/server.properties

停止 kafka ,需要执行 bin/kafka-server-stop.sh

三、kafka的基本使用 3.1. kafka中的一些概念 值描述topic一个虚拟的概念,由1到多个partition组成partition实际消息存储单位producer消息生产者consumer消息消费者 3.2. 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic long-topic

参数解释:

参数解释–create创建topic动作指令–zookeeper制定kafka所连接的zookeeper服务地址–topic指定了所创建的主题名称–replication-factor指定副本因子–partitions指定分区个数

3.3. 查看所有topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

3.4. 删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic long-topic
3.5. 详情topic
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic long-topic
3.6. 发送和接受消息

发送消息

bin/kafka-console-producer.sh --broker-list 192.168.31.26:9092 --topic long-topic

接受消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.26:9092 --topic long-topic --from-beginning

四. kafka涉及名称解释

根据下面的图了解下Kafka涉及到相关名词:

重要名词

名词解释Producer消息生产者,该角色将消息发布到kafka的topic中,broker接受到生产者发送的消息后,broker将该消息追加到当前追加数据的segment文件中。生产者发送的消息,存储到一个parition中,生产者也可以指定数据存储到paritionConsumer消息消费者,向Kafka Broker取消息的客户端。消费者可以消费多个topic中的数据。Consumer Group消费组,由多个Consumer组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。Broker一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。Topic话题,可以理解为一个队列, 生产者和消费者面向的都是一个 topic。可以将kafka看作是一个数据库,topic相当于数据库中的一张表,topic相当于表名。Partition为了实现扩展性,一个非常大的 topic 可以分割成一个或者多个parition。每一个topic至少拥有一个parition。每一个parition中的数据使用多个segment文件存储。parition是有序的,但是多个parition之间是没有顺序的,在要保证消息的消费顺序的时候,需要将parition数目设置为1。partition offset每条消息都有一个当前parition下唯一的64字节的offset,它指明了这条消息的起始位置。Replica副本(Replication),为保证集群中的某个节点发生故障时, 该节点上的 partition 数据不丢失,且 Kafka仍然能够继续工作, Kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。replicas是不会被消费者消费的。Leader每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。Follower每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。 leader 发生故障时,某个 Follower 会成为新的 leader。zookeeper负责维护和协调broker。当kafka系统增加broker或者broker发生故障失效时,由zookeeper通知生产者和消费者,生产者和消费者依据zookeeper的broker状态信息于broker协调数据的发布和订阅任务AR(Assigned Replicas)分区中所有的副本统称为ARISR(In Sync Replicas)所有与Leader部分保持一定程度的副本(包括leader的副本在内)组成ISROSR与Leader副本同步滞后过多的副本HW(High Watermark)高水位,标识一个特定的offset,消费者只能拉取到这个offset之前的消息LEO(Log End Offset)即日志末端位移,记录了该副本底层日志中下一条消息的位移值。注意是下一条消息! 五. Kafka特点

特性:

高吞吐、低延迟:kafka每秒可以处理几十万消息,延迟最低只有几毫秒,每个主体可以分为多个分区,消费组对分区进行消费 *** 作可扩展性,kafka集群支持热扩展持久性、可靠性,消息可以被持久到本地磁盘,并且支持数据备份防止数据丢失容错性,允许集群结点失败(若副本数量为n,则允许失败n-1个结点)高并发:支持数千个客户端同时读写 六. 使用场景

场景:

日志收集:kafka可以收集各种服务的log,通过kafka以统一接口服务开放给各种消费者消息系统:解藕生产者和消费者、缓冲消息等用户活动跟踪:kafka可以用来记录web用户或者app的各种活动 *** 作,做实时监控分析,或者装载到hadoop、数据仓库中做离线分析和数据挖掘运营指标:kafka也可以用来记录运营监控数据。流式处理:storm 七. 技术优势

可伸缩性:

    kafka在运行期间可以轻松的扩展和收缩(可以添加和删除代理),而不会宕机可以扩展一个kafka主题包来包含更多的分区;由于一个分区无法扩展到多个代理,所以它的容量收到代理磁盘空间限制,能够增加分区和代理的数量意味着当个主题可以存储的数据量没有限制。

容错性和可靠性:

kafka的设计方式使某一个代理的故障能够被集群中其他的代理监测到,由于每一个主体都可以在多个代理上复制,所以集群可以在不中断服务的情况下从此故障中继续运行。

吞吐量:

代理能够以快速有效的存储和检索数据

八. Topic、Parition、Broker、Replica、leader和follower之间的关系

在Kafka中是以Broker区分集群内服务器的;同一个Topic下,多个Parition经Hash分布到不同的Broker;一个Topic(主题)对应多个Parition(分区),这里Parition分布在不同的Broker上,多个Broker一起提供Kafka服务;Parition默认是1,不可以减少Parition的数量,但是可以增加;如果想要减少就需要删除原先的Topic,然后创建新Topic,重新设置分区数同一个Topic中的不同Parition中数据有顺序性,但是Parition之间不存在数据顺序性;每个Parition都会有多个数据Replica(副本),这些Replica分布于不同的Broker中,这些副本中会一个副本是Leader,其他的副本是Follower;当producer或者consumer发往Parition的请求,都是通过leader数据副本所在broker进行处理,当leader所在的Broker发生故障,这个Broker会成暂时不可用,kakfa会自动从其他副本选择一个leader用于接收客户端请求;保证在broker之间平均分布partition副本,每个副本分布在不同的broker上,broker分布可用轮询或哈希。 九. Java *** 作Kafka

引入依赖:


    
        org.apache.kafka
        kafka-clients
        2.8.0
    

生产者:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Random;


public class Producer {

    
    public final static String Topic = "test-topic";

    public static void main(String[] args) throws InterruptedException {

        Properties properties = new Properties();
        // 集群通过逗号分割
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.138:9092");
        // 设置key和value的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
        try {
            for (int i = 0; i < 100; i++) {
                String msg = "Hello, " + new Random().nextInt(100);
                // 构建消息
                ProducerRecord record = new ProducerRecord<>(Topic, msg);
                // 发送
                kafkaProducer.send(record);
                System.out.println("消息发送:" + msg);
                Thread.sleep(500);
            }
        } finally {
            // 执行完之后关闭
            kafkaProducer.close();
        }
    }

}
十. Kafka集群搭建

这里使用一个zookeeper+三个kafka组成kafka集群,如果zookeeper也需要集群的话可以从前面文章的zookeeper集群搭建获取shell脚本。

这里需要注意下面几点:

节点kafka-1kafka-2kafka-3broker.id123listenersPLAINTEXT://192.168.0.117:9091PLAINTEXT://192.168.0.117:9092PLAINTEXT://192.168.0.117:9093advertised.listenersPLAINTEXT://192.168.0.117:9091PLAINTEXT://192.168.0.117:9092PLAINTEXT://192.168.0.117:9093log.dirskafka-1/datakafka-2/datakafka-3/datazookeeper.connect192.168.0.xxx:2181192.168.0.xxx:2181192.168.0.xxx:2181

搭建脚本附上:

install_kafka() {
  echo "安装kafka集群"
  wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
  tar -zxvf kafka_2.12-2.8.1.tgz
  mv kafka_2.12-2.8.1 kafka-1
  cp -r kafka-1 kafka-2 
  cp -r kafka-1 kafka-3
  mkdir kafka-{1,2,3}/data
  basePath=$(pwd)
  for (( i = 1; i <= 3; i++ )); do
    #statements
    sed -i "/^broker.id/cbroker.id=$i" kafka-$i/config/server.properties
    # #listeners=PLAINTEXT://:9092
    sed -i "/^#listeners/clisteners=PLAINTEXT://192.168.0.117:909$i" kafka-$i/config/server.properties
    # #advertised.listeners=PLAINTEXT://your.host.name:9092
    sed -i "/^#advertised.listeners/cadvertised.listeners=PLAINTEXT://192.168.0.117:909$i" kafka-$i/config/server.properties
    # log.dirs=/tmp/kafka-logs
    sed -i "/^log.dirs/clog.dirs=$basePath/kafka-$i/data" kafka-$i/config/server.properties
    # zookeeper.connect=localhost:2181
    sed -i '/^zookeeper.connect=/czookeeper.connect=192.168.0.117:2181' kafka-$i/config/server.properties
  done
}

start() {
  echo "启动kafka集群"
  for (( i = 1; i <= 3; i++ )); do
    ./kafka-$i/bin/kafka-server-start.sh -daemon ./kafka-$i/config/server.properties
  done
}

stop() {
  echo "停止kafka集群"
  for (( i = 1; i <= 3; i++ )); do
    ./kafka-$i/bin/kafka-server-stop.sh
  done
}

echo "Kafka集群脚本"
case  in
  install)
    install_kafka
  ;;
  start)
    start
  ;;
  stop)
    stop
  ;;
esac

上面演示的是伪集群搭建,学习Kafka足够了,在实际使用的推荐先搭建zookeeper集群在搭建kafka集群。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5706182.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存