Kafka是一个分布式流平台,本质是一个消息队列。消息队列的三个作用:异步、消峰和解耦。
一. 安装zookeeper 1.1. 下载并解压# 下载 wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz # 解压 tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz1.2. 修改配置
这里我们需要将解压目录中的config目录里面的zoo_sample.cfg复制一份为zoo.cfg。
这个配置文件就是zookeeper的配置文件,整体来说我们不需要修改,但这里我们别为了不影响其他的文件目录,只为了我们测试使用,需要修改下zookeeper的数据目录。
# CS通信心跳时间 tickTime=2000 # 集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量) initLimit=10 # 集群中flower服务器(F)跟leader(L)服务器之间的请求和答应最多能容忍的心跳数 syncLimit=5 # 该属性对应的目录是用来存放myid信息跟一些版本,日志,跟服务器唯一的ID信息等 dataDir=/tmp/zookeeper # 客户端连接的接口,客户端连接zookeeper服务器的端口,zookeeper会监听这个端口,接收客户端的请求访问!这个端口默认是2181 clientPort=2181 #maxClientCnxns=60 #autopurge.snapRetainCount=3 #autopurge.purgeInterval=1 ## Metrics Providers #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider #metricsProvider.httpPort=7000 #metricsProvider.exportJvmInfo=true
我们这里修改dataDir=/tmp/zookeeper为刚才我们解压的目录!
1.3. 启动zookeeper进入bin目录,执行./zkServer.sh start
二. 安装kafka 2.1. 下载并解压# 下载 https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz # 解压 tar -zxf kafka_2.12-2.6.0.tgz2.2. 修改配置文件
我们修改三个位置:
# 每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers broker.id=0 ############################# Socket Server Settings ############################# listeners=PLAINTEXT://192.168.31.26:9092 advertiesd.listeners=PLAINTEXT://192.168.31.26:9092 # broker处理消息的最大线程数,一般情况下数量为cpu核数 num.network.threads=3 # broker处理磁盘IO的线程数,数值为cpu核数2倍 num.io.threads=8 # socket的发送缓冲区,socket的调优参数SO_SNDBUFF socket.send.buffer.bytes=102400 # socket的接受缓冲区,socket的调优参数SO_RCVBUFF socket.receive.buffer.bytes=102400 # socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖 socket.request.max.bytes=104857600 ############################# Log Basics ############################# # kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2 log.dirs=/home/long/kakfa_test/kafka_data num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # 检查是否需要固化到硬盘的时间间隔 #log.flush.interval.messages=10000 #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # 数据文件保留多长时间 log.retention.hours=168 #log.retention.bytes=1073741824 # topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖 log.segment.bytes=1073741824 # 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3 zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper # ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大 zookeeper.connection.timeout.ms=18000 ############################# Group Coordinator Settings ############################# group.initial.rebalance.delay.ms=02.3. 启动kafka
执行 bin/kafka-server-start.sh config/server.properties ,就可以启动kafka了。
上面的方法是阻塞执行的,我们可以通过-daemon 进行守护执行。
bin/kafka-server-start.sh -daemon config/server.properties
停止 kafka ,需要执行 bin/kafka-server-stop.sh
三、kafka的基本使用 3.1. kafka中的一些概念bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic long-topic
参数解释:
bin/kafka-topics.sh --list --zookeeper localhost:21813.4. 删除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic long-topic3.5. 详情topic
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic long-topic3.6. 发送和接受消息
发送消息
bin/kafka-console-producer.sh --broker-list 192.168.31.26:9092 --topic long-topic
接受消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.26:9092 --topic long-topic --from-beginning四. kafka涉及名称解释
根据下面的图了解下Kafka涉及到相关名词:
重要名词
特性:
高吞吐、低延迟:kafka每秒可以处理几十万消息,延迟最低只有几毫秒,每个主体可以分为多个分区,消费组对分区进行消费 *** 作可扩展性,kafka集群支持热扩展持久性、可靠性,消息可以被持久到本地磁盘,并且支持数据备份防止数据丢失容错性,允许集群结点失败(若副本数量为n,则允许失败n-1个结点)高并发:支持数千个客户端同时读写 六. 使用场景
场景:
日志收集:kafka可以收集各种服务的log,通过kafka以统一接口服务开放给各种消费者消息系统:解藕生产者和消费者、缓冲消息等用户活动跟踪:kafka可以用来记录web用户或者app的各种活动 *** 作,做实时监控分析,或者装载到hadoop、数据仓库中做离线分析和数据挖掘运营指标:kafka也可以用来记录运营监控数据。流式处理:storm 七. 技术优势
可伸缩性:
- kafka在运行期间可以轻松的扩展和收缩(可以添加和删除代理),而不会宕机可以扩展一个kafka主题包来包含更多的分区;由于一个分区无法扩展到多个代理,所以它的容量收到代理磁盘空间限制,能够增加分区和代理的数量意味着当个主题可以存储的数据量没有限制。
容错性和可靠性:
kafka的设计方式使某一个代理的故障能够被集群中其他的代理监测到,由于每一个主体都可以在多个代理上复制,所以集群可以在不中断服务的情况下从此故障中继续运行。
吞吐量:
代理能够以快速有效的存储和检索数据
八. Topic、Parition、Broker、Replica、leader和follower之间的关系在Kafka中是以Broker区分集群内服务器的;同一个Topic下,多个Parition经Hash分布到不同的Broker;一个Topic(主题)对应多个Parition(分区),这里Parition分布在不同的Broker上,多个Broker一起提供Kafka服务;Parition默认是1,不可以减少Parition的数量,但是可以增加;如果想要减少就需要删除原先的Topic,然后创建新Topic,重新设置分区数同一个Topic中的不同Parition中数据有顺序性,但是Parition之间不存在数据顺序性;每个Parition都会有多个数据Replica(副本),这些Replica分布于不同的Broker中,这些副本中会一个副本是Leader,其他的副本是Follower;当producer或者consumer发往Parition的请求,都是通过leader数据副本所在broker进行处理,当leader所在的Broker发生故障,这个Broker会成暂时不可用,kakfa会自动从其他副本选择一个leader用于接收客户端请求;保证在broker之间平均分布partition副本,每个副本分布在不同的broker上,broker分布可用轮询或哈希。 九. Java *** 作Kafka
引入依赖:
org.apache.kafka kafka-clients2.8.0
生产者:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.Random; public class Producer { public final static String Topic = "test-topic"; public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); // 集群通过逗号分割 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.138:9092"); // 设置key和value的序列化器 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer十. Kafka集群搭建kafkaProducer = new KafkaProducer<>(properties); try { for (int i = 0; i < 100; i++) { String msg = "Hello, " + new Random().nextInt(100); // 构建消息 ProducerRecord record = new ProducerRecord<>(Topic, msg); // 发送 kafkaProducer.send(record); System.out.println("消息发送:" + msg); Thread.sleep(500); } } finally { // 执行完之后关闭 kafkaProducer.close(); } } }
这里使用一个zookeeper+三个kafka组成kafka集群,如果zookeeper也需要集群的话可以从前面文章的zookeeper集群搭建获取shell脚本。
这里需要注意下面几点:
搭建脚本附上:
install_kafka() { echo "安装kafka集群" wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz tar -zxvf kafka_2.12-2.8.1.tgz mv kafka_2.12-2.8.1 kafka-1 cp -r kafka-1 kafka-2 cp -r kafka-1 kafka-3 mkdir kafka-{1,2,3}/data basePath=$(pwd) for (( i = 1; i <= 3; i++ )); do #statements sed -i "/^broker.id/cbroker.id=$i" kafka-$i/config/server.properties # #listeners=PLAINTEXT://:9092 sed -i "/^#listeners/clisteners=PLAINTEXT://192.168.0.117:909$i" kafka-$i/config/server.properties # #advertised.listeners=PLAINTEXT://your.host.name:9092 sed -i "/^#advertised.listeners/cadvertised.listeners=PLAINTEXT://192.168.0.117:909$i" kafka-$i/config/server.properties # log.dirs=/tmp/kafka-logs sed -i "/^log.dirs/clog.dirs=$basePath/kafka-$i/data" kafka-$i/config/server.properties # zookeeper.connect=localhost:2181 sed -i '/^zookeeper.connect=/czookeeper.connect=192.168.0.117:2181' kafka-$i/config/server.properties done } start() { echo "启动kafka集群" for (( i = 1; i <= 3; i++ )); do ./kafka-$i/bin/kafka-server-start.sh -daemon ./kafka-$i/config/server.properties done } stop() { echo "停止kafka集群" for (( i = 1; i <= 3; i++ )); do ./kafka-$i/bin/kafka-server-stop.sh done } echo "Kafka集群脚本" case in install) install_kafka ;; start) start ;; stop) stop ;; esac
上面演示的是伪集群搭建,学习Kafka足够了,在实际使用的推荐先搭建zookeeper集群在搭建kafka集群。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)