Kafka API

Kafka API,第1张

Kafka API

Producer API:

 

4.1.1 消息发送流程:

kafka的producer发送消息采用的是异步放松的方式。在消息发送过程中,涉及到两个线程——main线程和sender线程,以及一个线程共享变量RecordAccumulator。main线程把消息发送给RecordAccumulator,sender线程再不断地从RecordAccumulator里面拉取消息发送到kafka broker。

 main线程里面经过send(ProducerRecord)方法——拦截器——序列化器——分区器,将消息放到RecordAccumulator里面,sender线程再将消息不断地发送到kafka broker里面。

简单生产者

package com.jin.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;


public class MyProducer {
    public static void main(String[] args) {
        //1、创建kafka生产者的配置信息
        Properties properties = new Properties();
        //2、kafka 集群, broker-list
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        //3、acks
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        //4、重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 1);
        //5、批次大小
        properties.put("batch.size", 16384);
        //6、等待时间
        properties.put("linger.ms", 1);
        //7、RecordAccumulator 缓冲区大小
        properties.put("buffer.memory", 33554432);
        //8、指定序列化
        properties.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        //9、创建生产者对象
        KafkaProducer producer = new KafkaProducer(properties);

        //10、发送数据
        for(int i = 0; i < 10; i++){
            producer.send(new ProducerRecord("first","jin--" + i));
        }

        //11、关闭资源
        producer.close();




    }
}

普通消费者

package com.jin.producer.Consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;


public class MyConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //连接的集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //开启自动提交offset,即消费到什么地方
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        //自动提交的延时,默认一秒提交一次offset
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"bigdata");
        //创建消费者
        KafkaConsumer consumer = new KafkaConsumer(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("first"));
        //获取数据 poll(time):如果某次拉取数据时为空就会延时time秒再去拉取
        ConsumerRecords consumerRecords = consumer.poll(100);

        for(ConsumerRecord consumerRecord : consumerRecords){
            System.out.println(consumerRecord.key()+"---"+consumerRecord.value());
        }

        consumer.close();
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5653684.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存