kafka是一个高吞吐量的分布式订阅消息系统,可以处理消费者在网站中的所有动作流数据,像hadoop一样的日志数据和离线分析系统,但是又要求实时处理的限制,kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
kafka的特性-
通过O(1)的磁盘数据提供消息的持久化,这种结构对于即使数据量为TB级的消息存储也能够保持长时间的稳定性能。
-
高吞吐量:即使是非常普通的硬件Kafka也能支持每秒数百万计的消息。
-
支持通过Kafka服务器和消费机集群来区分消息。
-
这次Hadoop并行数据加载。
-
发布和订阅记录流,类似于消息队列或企业消息传递系统
-
以容错的持久方式存储记录流
-
记录发生时处理流
- 构建可在系统或应用程序之间可靠获取的实时流数据管道
- 构建转换响应数据的实时流应用程序
例如:
-
日志收集:可以用kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer(消费者)
-
消息系统:解耦生产者和消费者、缓存消息等
-
用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动消息可以被各个服务器发布到kafka的topic中,然后消费者通过订阅这些topic来做实时的监控分析,亦可保存到数据库。
-
运营指标:kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种 *** 作的集中反馈,比如报警和报告
-
流式处理:比如spark streaming和storm。
kafka的核心1、Kafka作为一个集群运行在一个或者多个可跨多个数据中心的服务器上
2、Kafka集群以称为** topics主题**的类别存储记录流
3、每条记录都包含一个键,一个值和一个时间戳
-
Producer API(生产者API)允许应用程序发布记录流至一个或多个kafka的topics(主题)。
-
Consumer API(消费者API)允许应用程序订阅一个或多个topics(主题),并处理所产生的对他们记录的数据流。
-
Streams API(流API)允许应用程序充当流处理器,从一个或多个topics(主题)消耗的输入流,并产生一个输出流至一个或多个输出的topics(主题),有效的变换所述的输入流,以输出流。
-
Connector API(连接器API)允许构建和运行kafka topics(主题)连接到现有的应用程序或数据系统中重用生产者或消费者。例如,关系型数据库的连接器可能捕获对表的每个更改。
Kafka的消费模式在Kafka中,客户端和服务器之间的通信是通过简单,高性能,语言无关的TCP协议完成的,Kafka提供Java客户端,但是客户端提供多语言版本。
kafka的消费模式主要有两种,一种是一对一的消费,也就是点对点的通信,即一个发送一个接收。第二种为一对多消费,即一个消息发送到消息队列,消费者根据消息队列的订阅拉取消息消费。
-
一对一消费:消息生产者发布消息到队列中,通知消费者从队列中拉取消息进行消费。`消息被消费之后则删除`,Queue支持多个消费者,但对于一条消息而言,只有一个消费者可以消费,即一条消息只能被一个消费者消费。
-
一对多消费:这种模式称为发布/订阅模式,即利用Topic存储消息,消息生产者将消息发布到Topic中,同时有多个消费者订阅此Topic,消费者可以从中消费消息,注意发布到Topic中的消息会被多个消费者消费,`消费者消费数据之后,数据不会被清除`,Kafka会默认保存一段时间,然后再删除。
Kafka像其他MQ一样,也有自己的基础架构,主要存在生产者Producer、Kafka集群的Broker、消费者Consumer,注册消息Zookeeper等。
zookeeper(Linux)部署与应用
-
Producer:消息生产者,向kafka发送消息的角色。
-
Consumer:消息消费者,即从Kafka中拉取消息消费的客户端。
-
Consumer Group:消费者组,消费者组则是一组中存在多个消费者,消费者消费Broker中当前Topic的不同分区中的消息,消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够一个消费者组中的一个消费者所消费。
-
Broker:代理,经纪人,一台kafka服务器就是一个Broker,一个集群由多个Broker组成,一个Broker可以容纳多个Topic。
-
Topic:主题,可以理解成一个队列,生产者和消费者都是面向一个Topic。
-
Partition:分区,为了实现拓展性,一个非常大的Topic可以分布到多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列(分区内部有序,但不能保证全局有序)
-
Replica:副本Replication,为保证集群中某个节点发生故障,节点上的Partition数据不丢失,Kafka可以正常工作,Kafka提供了副本机制,一个Topic的每个分区有若干个副本,一个Leader和多个Follwer。
-
Leader:每个分区多个副本的主角色,生产者发送数据对象,以及消费者消费数据的对象都是Leader。
-
Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follwer会成为新的Leader。
消息队列的特性上述一个Topic会产生多个分区Partition,分区中区分为Leader和Follwer,消息一般发送到Lerder,Follwer通过数据的同步与Leader保持同步,消费的话也是再Leader中发生消费,如果多个消费者,则分别消费Leader和各个Follwer中的消息,当Leader发生故障的时候,某个Follwer会成为主节点,此时会对齐消息的偏移量。
耦合的状态表示当你实现某个功能的时候,是直接接入当前接口,而利用消息队列,可以将相应的消息发送到消息队列,这样的话,如果接口出了问题,也不会影响到当前的功能。
传统调用方式:A -调用-> B
中间件:A-发送->kafka->订阅->B
异步处理,代替了之前的同步处理,异步处理不需要让流程走完就返回结果,可以将消息发送到消息队列中,然后返回结果,剩下的让其他业务处理接口从消息队列中拉取消费处理即可。
流量削峰,高流量的时候,使用消息队列作为中间件可以将流量的高峰保存在消息队列中,从而防止了系统的高请求,减轻服务器的请求处理压力。
kafka的部署应用(单机环境安装,基于Linux)kafka需要依赖java环境运行,kafka的安装包可以在这里下载:
kafka和zookeeper的安装包(Liunx)
版本是:kafka_2.13-3.0.0
将包下载到相关的目录,这里新建了一个kafka的文件夹,然后解压到指定目录中;
cd /kafka tar -zxvf kafka_2.13-3.0.0.tgz` 重命名为:`mv kafka_2.13-3.0.0 kafka_2.13
配置日志:进入到kafka/kafka_2.13中,创建日志目录logs;
cd /kafka/kafka_2.13 mkdir logs
修改kafka配置文件;
进入到目录 /kafka/kafka_2.13/config 修改server.properties文件 编辑相应的参数vim server.properties broker.id=0 port=9092 host.name=127.0.0.1 # 服务器ip地址,修改为自己的服务器ip log.dirs=/kafka/kafka_2.13/logs # 日志存放路径,上面创建的目录 zookeeper.connect=localhost:2181 #zookeeper地址和端口,单机配置部署,localhost:2181 然后保存退出 kafka需要基于Zookeeper服务使用,因此需要安装zookeeper环境先`;(详见zookeeper部署与应用) 注释去掉,`listeners=PLAINTEXT://:9092 注释去掉,把`advertised.listeners`值改为`PLAINTEXT://host_ip:9092(改成服务器ip)
启动kafka
Kafka支持内置的Zookeeper和引用外部的Zookeeper,这里使用远程服务器的Zookeeper。
先启动zookeeper
进入bin目录,启动:
./kafka-server-start.sh /config/server.properties &应用整合及使用(基于SpringBoot、SpringCloud、Eruka) Springboot中使用kafka pom文件依赖
配置yml信息org.springframework.kafka spring-kafka
spring: # kafka配置信息 kafka: bootstrap-servers: xxx.xxx.xxx.xx:9092 # 指定kafka服务地址 集群用逗号隔开 producer: retries: 1 # 发生错误后,消息重发次数。 #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 batch-size: 16384 buffer-memory: 33554432 # 设置生产者内存缓冲区的大小。 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 # =0:生产者在成功写入消息之前不会等待任何来自服务器的响应。 # =1:只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 # =all:只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 acks: 1 consumer: # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D auto-commit-interval: 1s # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录 auto-offset-reset: earliest # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false # 键的反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: # 在侦听器容器中运行的线程数。 concurrency: 5 #listner负责ack,每调用一次,就立即commit ack-mode: manual_immediate missing-topics-fatal: false封装生产者_KafkaProducer
@Component @Slf4j public class KafkaProducer { @Autowired private KafkaTemplate封装消费者_KafkaConsumerkafkaTemplate; public static final String TOPIC_TEST1 = "topic_test1"; public static final String TOPIC_TEST2 = "topic_test2"; public static final String TOPIC_GROUP1 = "topic_group1"; public static final String TOPIC_GROUP2 = "topic_group2"; public void send(Object obj) { String obj2String = JSONObject.toJSonString(obj); log.info("准备发送消息为:{}",obj2String); // 发送消息 ListenableFuture > future = kafkaTemplate.send(TOPIC_TEST1,obj); // 监听消息加入队列结果返回 future.addCallback(new ListenableFutureCallback >() { @Override public void onFailure(Throwable throwable) { log.info(TOPIC_TEST1 + " - 生产者 发送消息失败:" + throwable.getMessage()); } @Override public void onSuccess(SendResult stringObjectSendResult) { // 发送成功处理 log.info(TOPIC_TEST1 + " - 生产者 发送消息成功:" + stringObjectSendResult.toString()); } }); } }
@Component @Slf4j public class KafkaConsumer { @KafkaListener(topics = KafkaProducer.TOPIC_TEST1,groupId = KafkaProducer.TOPIC_GROUP1) public void topic_test1(ConsumerRecord,?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { Object msg = message.get(); log.info("topic_test1 消费了:Topic:" + topic + ",Message" + msg); ack.acknowledge(); } } @KafkaListener(topics = KafkaProducer.TOPIC_TEST2,groupId = KafkaProducer.TOPIC_GROUP2) public void topic_test2(ConsumerRecord,?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { Object msg = message.get(); log.info("topic_test2 消费了:Topic:" + topic + ",Message" + msg); ack.acknowledge(); } } }测试使用_KafkaController
@RestController @Slf4j @RequestMapping("kafka") @Api(value = "测试kafka接口",tags = "测试kafka接口实现") public class KafkaController { @Autowired private KafkaProducer kafkaProducer; @GetMapping("send") @Transactional(rollbackFor = Exception.class) public void send() { kafkaProducer.send("这是 kafka 的测试 topic 数据"); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)