Kafka 安装教程 + nodejs 连接

Kafka 安装教程 + nodejs 连接,第1张

1、kafka安装包

http://kafka.apache.org/downloads

2、zookeeper安装包

https://zookeeper.apache.org/releases.html#download

1、先安装运行zookeeper

2、安装运行kafka

修改下图ip

advertised.listeners=PLAINTEXT://71.24.89.191:9092

这里主要是检测对应的端口是否是打开状态

分别是 zookeeper的默认端口 2181 和 kafka的 9092

检测网址

1、 添加

2、查看所有主题

3、查看主题下所有分区

4、动态 修改主题 分区为12

小伙伴们可以看我另一片文章

Nodejs kafka连接

1、内存不足

这里因为我的机器的内存比较小

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000e0000000, 536870912, 0) failederror='Cannot allocate memory' (errno=12)

There is insufficient memory for the Java Runtime Environment to continue.

Native memory allocation (mmap) failed to map 536870912 bytes for committing reserved memory.

我们只需要把kafka启动脚本修改一下就可以了

修改内存为256:

1、安装

brew install kafka

会一起安装zookeeper和Kafka两个插件

2、文件路径

(1)Kafka的bin目录路径

/usr/local/Cellar/kafka/2.5.0/bin,里面包含Kafka的各种 *** 作命令

kafka-server-start,服务启动命令

kafka-server-stop,服务停止命令

kafka-topics,主题 *** 作命令

kafka-console-producer,生产消息命令

kafka-console-consumer,消费消息

(2)Kafka的配置文件路径

/usr/local/etc/kafka

zookeeper.properties,zookeeper的配置文件

 server.properties,Kafka服务的配置文件

3、必须的前提修改Kafka的配置文件,否则后面的启动会失败

vim /usr/local/etc/kafka/server.properties

增加:listeners=PLAINTEXT://localhost:9092

其中有改行,默认被注释掉了,打开修改即可

4、打开新终端窗口1,进入Kafka的bin目录启动zookeeper服务

kafka-server-start /usr/local/etc/kafka/server.properties &

都是info,没有报fatal或者error错误,即为启动成功。

如果不先修改第3步的配置文件,那么会报错如下

ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)

5、打开新终端窗口2,进入Kafka的bin目录启动Kafka资深的服务

kafka-server-start /usr/local/etc/kafka/server.properties &

都是info,没有报fatal或者error错误,即为启动成功。

如果不先修改第3步的配置文件,那么会报错如下

ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)

6、打开新终端窗口3,进入Kafka的bin目录,新建topic主题test

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 

出现:Created topic test. 表明建立topic成功

查看所有目前的topic,校验是否新建完毕

kafka-topics --list --zookeeper localhost:2181

会列出所有建立的topic

7、打开新终端窗口4,进入Kafka的bin目录,打开生产者客户端,topic为test的主题

kafka-console-producer --topic test --broker-list localhost:9092

出现>表示启动生产者客户端成功,等待输入生产者消息,输入消息,回车,即表示插入了一条消息到主题test的队列中

8、打开新终端窗口5,进入Kafka的bin目录,打开消费者客户端,topic为test的主题

kafka-console-consumer --bootstrap-server localhost:9092 -topic test   

等待显示生产者输入的消息,并同步显示出来

Kafka到底是个啥?用来干嘛的?

官方定义如下:

翻译过来,大致的意思就是,这是一个实时数据处理系统,可以横向扩展,并高可靠!

实时数据处理 ,从名字上看,很好理解,就是将数据进行实时处理,在现在流行的微服务开发中,最常用实时数据处理平台有 RabbitMQ、RocketMQ 等消息中间件。

这些中间件,最大的特点主要有两个:

在早期的 web 应用程序开发中,当请求量突然上来了时候,我们会将要处理的数据推送到一个队列通道中,然后另起一个线程来不断轮训拉取队列中的数据,从而加快程序的运行效率。

但是随着请求量不断的增大,并且队列通道的数据一致处于高负载,在这种情况下,应用程序的内存占用率会非常高,稍有不慎,会出现内存不足,造成程序内存溢出,从而导致服务不可用。

随着业务量的不断扩张,在一个应用程序内,使用这种模式已然无法满足需求,因此之后,就诞生了各种消息中间件,例如 ActiveMQ、RabbitMQ、RocketMQ等中间件。

采用这种模型,本质就是将要推送的数据,不在存放在当前应用程序的内存中,而是将数据存放到另一个专门负责数据处理的应用程序中,从而实现服务解耦。

消息中间件 :主要的职责就是保证能接受到消息,并将消息存储到磁盘,即使其他服务都挂了,数据也不会丢失,同时还可以对数据消费情况做好监控工作。

应用程序 :只需要将消息推送到消息中间件,然后启用一个线程来不断从消息中间件中拉取数据,进行消费确认即可!

引入消息中间件之后,整个服务开发会变得更加简单,各负其责。

Kafka 本质其实也是消息中间件的一种,Kafka 出自于 LinkedIn 公司,与 2010 年开源到 github。

LinkedIn 的开发团队,为了解决数据管道问题,起初采用了 ActiveMQ 来进行数据交换,大约是在 2010 年前后,那时的 ActiveMQ 还远远无法满足 LinkedIn 对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,LinkedIn 决定研发自己的消息传递系统, Kafka 由此诞生

在 LinkedIn 公司,Kafka 可以有效地处理每天数十亿条消息的指标和用户活动跟踪,其强大的处理能力,已经被业界所认可,并成为大数据流水线的首选技术。

先来看一张图, 下面这张图就是 kafka 生产与消费的核心架构模型

如果你看不懂这些概念没关系,我会带着大家一起梳理一遍!

简而言之,kafka 本质就是一个消息系统,与大多数的消息系统一样,主要的特点如下:

与 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在于,它有一个**分区 Partition **的概念。

这个分区的意思就是说,如果你创建的 topic 有5个分区,当你一次性向 kafka 中推 1000 条数据时,这 1000 条数据默认会分配到 5 个分区中,其中每个分区存储 200 条数据。

这样做的目的,就是方便消费者从不同的分区拉取数据,假如你启动 5 个线程同时拉取数据,每个线程拉取一个分区,消费速度会非常非常快!

这是 kafka 与其他的消息系统最大的不同!

和其他的中间件一样,kafka 每次发送数据都是向 Leader 分区发送数据,并顺序写入到磁盘,然后 Leader 分区会将数据同步到各个从分区 Follower ,即使主分区挂了,也不会影响服务的正常运行。

那 kafka 是如何将数据写入到对应的分区呢?kafka中有以下几个原则:

与生产者一样,消费者主动的去kafka集群拉取消息时,也是从 Leader 分区去拉取数据。

这里我们需要重点了解一个名词: 消费组

考虑到多个消费者的场景,kafka 在设计的时候,可以由多个消费者组成一个消费组,同一个消费组者的消费者可以消费同一个 topic 下不同分区的数据,同一个分区只会被一个消费组内的某个消费者所消费,防止出现重复消费的问题!

但是不同的组,可以消费同一个分区的数据!

你可以这样理解,一个消费组就是一个客户端,一个客户端可以由很多个消费者组成,以便加快消息的消费能力。

但是,如果一个组下的消费者数量大于分区数量,就会出现很多的消费者闲置。

如果分区数量大于一个组下的消费者数量,会出现一个消费者负责多个分区的消费,会出现消费性能不均衡的情况。

因此,在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量保持一致!

光说理论可没用,下面我们就以 centos7 为例,介绍一下 kafka 的安装和使用。

kafka 需要 zookeeper 来保存服务实例的元信息,因此在安装 kafka 之前,我们需要先安装 zookeeper。

zookeeper 安装环境依赖于 jdk,因此我们需要事先安装 jdk

下载zookeeper,并解压文件包

创建数据、日志目录

配置zookeeper

重新配置 dataDir 和 dataLogDir 的存储路径

最后,启动 Zookeeper 服务

到官网 http://kafka.apache.org/downloads.html 下载想要的版本,我这里下载是最新稳定版 2.8.0 。

按需修改配置文件 server.properties (可选)

server.properties 文件内容如下:

其中有四个重要的参数:

可根据自己需求修改对应的配置!

启动 kafka 服务

创建一个名为 testTopic 的主题,它只包含一个分区,只有一个副本:

运行 list topic 命令,可以看到该主题。

输出内容:

Kafka 附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到 Kafka 集群。默认情况下,每行将作为单独的消息发送。

运行生产者,然后在控制台中键入一些消息以发送到服务器。

输入两条内容并回车:

Kafka 还有一个命令行使用者,它会将消息转储到标准输出。

输出结果如下:

本文主要围绕 kafka 的架构模型和安装环境做了一些初步的介绍,难免会有理解不对的地方,欢迎网友批评、吐槽。

由于篇幅原因,会在下期文章中详细介绍 java 环境下 kafka 应用场景!


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/6078765.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-03-14
下一篇 2023-03-14

发表评论

登录后才能评论

评论列表(0条)

保存