SpringBoot实战(十四)之整合KafKa

SpringBoot实战(十四)之整合KafKa,第1张

概述本人今天上午参考了不少博文,发现不少博文不是特别好,不是因为依赖冲突问题就是因为版本问题。 于是我结合相关的博文和案例,自己改写了下并参考了下,于是就有了这篇文章。希望能够给大家帮助,少走一些弯路。

 本人今天上午参考了不少博文,发现不少博文不是特别好,不是因为依赖冲突问题就是因为版本问题。

于是我结合相关的博文和案例,自己改写了下并参考了下,于是就有了这篇文章。希望能够给大家帮助,少走一些弯路。

 

一、KafKa的介绍1.主要功能

根据官网的介绍,ApacheKafka®是一个分布式流媒体平台,它主要有3种功能:

  a.发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因。

  b.以容错的方式记录消息流,kafka以文件的方式来存储消息流。

  c.可以再消息发布的时候进行处理。

 

2.使用场景

a.在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能。

b.构建实时的流数据处理程序来变换或处理数据流,数据处理功能。

 

3.详细介绍

 Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下面简单介绍一下kafka的基本机制

消息传输过程:

 

Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。

 

topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的topic中的消息

 

Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。

 

二、安装

安装包下载地址:http://kafka.apache.org/downloads

找到0.11.0.1版本,如图:

1.下载

wget https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz

 

2.解压

tar -xzvf kafka_2.11-0.11.0.1.tgz

配置说明:

    consumer.properites 消费者配置,这个配置文件用于配置开启的消费者,此处我们使用默认的即可。

    producer.propertIEs 生产者配置,这个配置文件用于配置开启的生产者,此处我们使用默认的即可。

  server.propertIEs kafka服务器的配置,此配置文件用来配置kafka服务器,目前仅介绍几个最基础的配置。

       a.broker.ID 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的ID都应是唯一的,我们这里采用默认配置即可。

       b.Listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,

例如:Listeners=PLAINTEXT:// 192.168.126.143:9092。并确保服务器的9092端口能够访问。

  c.zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,

使用默认配置即可,zookeeper.connect=localhost:2181。

 

3.运行

首先运行zookeeper

bin/zookeeper-server-start.sh config/zookeeper.propertIEs

运行成功,显示如图:

 

然后运行kafka

bin/kafka-server-start.sh config/server.propertIEs

 运行成功,显示如图:

 

三、整合KafKa1.新建Maven项目导入Maven依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  modelVersion>4.0.0</groupID>cn.testartifactID>kafka_demoversion>0.0.1-SNAPSHOT>    parent>        >org.springframework.boot>spring-boot-starter-parent>1.5.9.RELEASErelativePath/> <!-- lookup parent from repository -->    propertIEsproject.build.sourceEnCoding>UTF-8project.build.sourceEnCoding.reporting.outputEnCodingproject.reporting.outputEnCodingjava.version>1.8java.version>    dependencIEsdependency>            >spring-boot-starter-web>        >org.projectlombok>lombokoptional>true>spring-boot-starter-testscope>test>org.springframework.kafka>spring-kafka>1.1.1.RELEASE>com.Google.code.gson>gson>2.8.2>         buildpluginsplugin>                >spring-boot-maven-plugin>                指定编译版本 -->            >org.apache.maven.plugins>maven-compiler-pluginconfiguration>                    sourcetarget>                                               finalname>${project.artifactID}>                >   project>

 

2.编写消息实体
package com.springboot.kafka.bean;import java.util.Date;import lombok.Data; @Datapublic class Message {    private Long ID;    //ID    private String msg; //消息    private Date sendTime;  //时间戳}

 有了lombok,每次编写实体不必要使用快捷键生成seter或geter方法了,代码看起来更加简洁了。

 

3.编写消息发送者(可以理解为生产者,最好联系详细介绍中的图)
package com.springboot.kafka.producer;import java.util.Date; java.util.UUID; org.springframework.beans.factory.annotation.autowired; org.springframework.kafka.core.KafkaTemplate; org.springframework.stereotype.Component; com.Google.gson.Gson; com.Google.gson.GsonBuilder; com.springboot.kafka.bean.Message; lombok.extern.slf4j.Slf4j;@Component@Slf4jpublic class KafkaSender {    @autowired    private KafkaTemplate<String,String> kafkaTemplate;    private Gson gson = new GsonBuilder().create();    //发送消息方法    voID send() {        Message message =  Message();        message.setID(System.currentTimeMillis());        message.setMsg(UUID.randomUUID().toString());        message.setSendTime( Date());        log.info("+++++++++++++++++++++  message = {}",gson.toJson(message));        kafkaTemplate.send("zhisheng"

 

5.编写启动类
 com.springboot.kafka;  org.springframework.boot.SpringApplication; org.springframework.boot.autoconfigure.SpringBootApplication; org.springframework.context.ConfigurableApplicationContext; com.springboot.kafka.producer.KafkaSender;@SpringBootApplication KafkaApplication {    static  main(String[] args) {        ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.);        for (int i = 0; i < 3; i++) {            调用消息发送类中的消息发送方法            sender.send();            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printstacktrace();            }        }    }}

 

 

6.编写application.propertIEs配置文件
#============== kafka ===================# \u6307\u5B9Akafka \u4EE3\u7406\u5730\u5740\uFF0C\u53EF\u4EE5\u591A\u4E2Aspring.kafka.bootstrap-servers=192.168.126.143:9092#=============== provIDer  =======================spring.kafka.producer.retrIEs=0# \u6BCF\u6B21\u6279\u91CF\u53D1\u9001\u6D88\u606F\u7684\u6570\u91CFspring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432# \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0Fspring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#=============== consumer  =======================# \u6307\u5B9A\u9ED8\u8BA4\u6D88\u8D39\u8005group IDspring.kafka.consumer.group-ID=test-consumer-groupspring.kafka.consumer.auto-offset-reset=earlIEstspring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=100# \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0Fspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

7.运行结果

 

示例代码地址:https://github.com/youcong1996/study_simple_demo/tree/kafka_demo

如果按照上述流程没有达到预计的效果可以git clone到本地。

 

总结

以上是内存溢出为你收集整理的SpringBoot实战(十四)之整合KafKa全部内容,希望文章能够帮你解决SpringBoot实战(十四)之整合KafKa所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1217130.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-05
下一篇 2022-06-05

发表评论

登录后才能评论

评论列表(0条)

保存