Kafka 使用
简介-
Kafka 是由 linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 zookeeper 协调的分布式日志系统。
-
最大特性:实时处理大量数据
-
应用场景:
- 基于 hadoop 的批处理系统;
- 低延迟的实时系统;
- storm/Spark 流式处理引擎;
- web/nginx 日志;
- 访问日志;
- 消息服务系统
-
Kafka 特性
- 高吞吐量,低延迟
- 可扩展性:kafka 集群支持热扩展
- 持久性、可靠性:消息被持久化到本地硬盘,并支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许 n - 1 个节点失败)
- 高并发:支持数千个客户端同时读写
-
Kafka 使用场景
- 日志收集
- 消息系统
- 用户活动跟踪
- 运营指标
- 流式处理
- 事件源
-
主要设计目标:
- 以时间复杂度为 O(1) 方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间的访问性能;
- 高吞吐率,即使在非常廉价的商用机器上也能做到单机支持每秒 100k 消息的传输;
- 支持 Kafka Server 的消息分区,及分布式消费,同时保证每个 partition 内的消息顺序传输;
- 同时支持离线数据处理和实时数据处理;
- Scale out 支持在线水平扩展。
-
消息传递可靠性判断
- 第一种,消息发送出去就当作成功;
- 第二种,Master-Slave 模型。只有当 Master 和 Slave 都接收到消息,才算发送成功,这种模型提供了最高的投递可靠性,但是损伤了性能
- 第三种,即只要 Master 确认接收到消息,就算投递成功。(实际应用中常用)
- 分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息;
- 消息传递模式:
- 点对点传递模式
- 发布-订阅模式
- 其中,Kafka是一种高性能跨语言发布-订阅消息队列系统。
- 下载 zookeeper 安装包
# 下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/ wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz # 注意:需要下载文件名中包含 bin 的压缩包,因为不含 bin 的压缩包是源码,无法使用。
- 安装包解压
tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz
- 创建目录,并移动目录
mkdir -p /usr/local/services/zookeeper mv apache-zookeeper-3.6.3-bin /usr/local/services/zookeeper/
- 复制,并重命名配置文件
cd /usr/local/services/zookeeper/apache-zookeeper-3.6.3-bin/conf cp zoo_sample.cfg zoo.cfg
- 修改配置文件 zoo.cfg
vi zoo.cfg # 数据文件夹 dataDir=/usr/local/services/zookeeper/apache-zookeeper-3.6.3-bin/data # 日志文件夹 dataLogDir=/usr/local/services/zookeeper/apache-zookeeper-3.6.3-bin/logs
- zookeeper 添加环境变量
vi /etc/profile # 在文件末尾添加下面内容: export ZOOKEEPER_HOME=/usr/local/services/zookeeper/apache-zookeeper-3.6.3-bin export PATH=$ZOOKEEPER_HOME/bin:$PATH export PATH # 运行命令,使 profile 文件生效 source /etc/profile
- zookeeper 服务启动,停止,重启,查看状态
# 进入启动脚本目录 cd /usr/local/services/zookeeper/apache-zookeeper-3.6.3-bin/bin # 服务启动 ./zkServer.sh start # 查看 zookeeper 状态 ./zkServer.sh status # 服务关闭 ./zkServer.sh stop # 服务重启 ./zkServer.sh restartkafka 介绍 kafka 三大名词
- 话题(Topic):是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名。
- 生产者(Producer):是能够发布消息到话题的任何对象。已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。
- 消费者:可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息
- 下载安装包
# 下载目录:https://downloads.apache.org/kafka/ wget https://downloads.apache.org/kafka/2.8.1/kafka_2.13-2.8.1.tgz
- 解压安装包
tar zxf kafka_2.13-2.8.1.tgz
- 移动目录,并设置软连接
mv kafka_2.13-2.8.1 /usr/local/ ln -s /usr/local/kafka_2.13-2.8.1/ /usr/local/kafka
- 设置 Zookeeper 地址
vi /usr/local/kafka/config/server.properties zookeeper.connect=localhost:2181 # 注意:可以使用默认的,也可以自己配置地址
- kafka 启动
# 启动命令脚本在 bin 目录 cd /usr/local/kafka/bin # 服务器的 ./kafka-server-start.sh ../config/server.properties测试 kafka 是否安装成功
- 注意:以下命令是在 bin 目录下运行
- 2181 是 zookeeper 服务端口
- 9092 是 kafka 服务端口
- 创建 topic --> test
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
- 查看已经存在的 topic
./kafka-topics.sh --list --zookeeper 127.0.0.1:2181
- 发送消息
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
- 接收消费消息
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)