在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。
准备3台Debian服务器,并配置好静态IP、主机名
软件版本说明
ZooKeeper节点信息如下,相关部署见 这篇文章
注 :kafka_2.12-2.3.0.tgz 其中2.12是Scala编译器的版本,2.3.0才是Kafka的版本
配置日志目录、指定ZooKeeper服务器
分节点配置
防火墙设置
分别启动Kafka
在kafka01(Broker)上创建测试Tpoic: test-kafka ,这里我们指定了3个副本、1个分区
Topic在kafka01上创建后也会同步到集群中另外两个Broker:kafka02、kafka03
这里我们向Broker(id=0)的Topic=test-kafka发送消息
在Kafka02上消费Broker03的消息
在Kafka03上消费Broker02的消息
然后均能收到消息
这是因为这两个消费消息的命令是建立了两个不同的Consumer。如果我们启动Consumer指定Consumer Group Id就可以作为一个消费组协同工,同一分区同一topic的消息同时只会被消费者组中的一个Consumer消费到
源端集群kafka01:10.1.1.10
kafka02:10.1.1.11
kafka03:10.1.1.12
目标集群
kafka11:10.1.1.13
kafka12:10.1.1.14
kafka13:10.1.1.15
公司新业务需要从其他部门取到Kafka中的数据到我们的Kafka集群,这里使用Kafka自带的kafka-mirror-maker工具进行数据的同步,数据流向为源端数据到目标集群,具体配置看下面配置,这里只提供基础的配置,生产中使用请去官网根据文档配置自己需要的个性化配置。
在目标端集群配置$KAFKA_HOME/config/consumer.properties
group.id可以自己定义
在目标端集群配置$KAFKA_HOME/config/producer.properties
在目标端及源端各自新建test02的topic
在目标端启动同步进程(如要后台启动请加nohup)
===测试===
在源端启动生产者进程
在目标端启动消费者进程,在消费端能看到生产者发送的信息即可
文章主要介绍以docker容器的方式部署kafka集群。
上述配置文件中的server.x,数字x对应到data/myid文件中的值。三台机器x的值分别就是1,2,3。参数详细说明请参考 官网文档 。
1.--net=host: 容器与主机共享同一Network Namespace,即容器与网络看到的是相同的网络视图(host模式存在一定的风险,对安全要求很高的生产环境最好不要用host模式,应考虑除此之外的其他几种模式)
2.-v: 指定主机到容器的目录映射关系
这样就以容器的方式启动了zookeeper的服务,可以通过 "docker exec -it zookeeper bash" 命令进入容器中进行一些 *** 作,例如查看服务启动是否正常。也可以通过查看2181端口是否被监听判断zookeeper的服务是否运行
详细的参数配置说明请参考 官方文档 ,参数不仅可以通过上述文件的方式来配置,也可以通过容器环境变量的方式来配置,这里结合两种方式使用。
1.KAFKA_ADVERTISED_HOST_NAME、KAFKA_BROKER_ID的值要结合每台机器自身设置
2./etc/hosts文件中最好配置ip与hostname的映射关系,否则会报出如下错误" Error: Exception thrown by the agent : java.net.MalformedURLException: Local host name unknown: java.net.UnknownHostException: node0: node0: System error "
3.通过-e 指定的环境变量与在server.properties中配置的选项其效果是一样的
4.配置文件中的选项若要通过环境变量来指定,方式为:如broker.id对应KAFKA_BROKER_ID,类似的log.dirs对应KAFKA_LOG_DIRS
5.KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"指java堆内存大小的设置,6G大小是kafka官网给出的数值,此数值要结合机器的内存大小给出。超过6G的内存,可以设置为6G;若机器的内存低于6G而设置6G,则会报错。
5.启动成功后,可以通过"docker logs kafka"命令查看日志
1.ZK_HOSTS:ZooKeeper访问地址(需指定机器的ip,localhost:2181或127.0.0.1:2181均会报 "java.net.ConnectException: Connection refused" 异常)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)