Producer API:
4.1.1 消息发送流程:
kafka的producer发送消息采用的是异步放松的方式。在消息发送过程中,涉及到两个线程——main线程和sender线程,以及一个线程共享变量RecordAccumulator。main线程把消息发送给RecordAccumulator,sender线程再不断地从RecordAccumulator里面拉取消息发送到kafka broker。
main线程里面经过send(ProducerRecord)方法——拦截器——序列化器——分区器,将消息放到RecordAccumulator里面,sender线程再将消息不断地发送到kafka broker里面。
简单生产者
package com.jin.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class MyProducer { public static void main(String[] args) { //1、创建kafka生产者的配置信息 Properties properties = new Properties(); //2、kafka 集群, broker-list properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); //3、acks properties.put(ProducerConfig.ACKS_CONFIG, "all"); //4、重试次数 properties.put(ProducerConfig.RETRIES_CONFIG, 1); //5、批次大小 properties.put("batch.size", 16384); //6、等待时间 properties.put("linger.ms", 1); //7、RecordAccumulator 缓冲区大小 properties.put("buffer.memory", 33554432); //8、指定序列化 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //9、创建生产者对象 KafkaProducerproducer = new KafkaProducer (properties); //10、发送数据 for(int i = 0; i < 10; i++){ producer.send(new ProducerRecord ("first","jin--" + i)); } //11、关闭资源 producer.close(); } }
普通消费者
package com.jin.producer.Consumer; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import java.util.Arrays; import java.util.Collections; import java.util.Properties; public class MyConsumer { public static void main(String[] args) { Properties properties = new Properties(); //连接的集群 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); //开启自动提交offset,即消费到什么地方 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); //自动提交的延时,默认一秒提交一次offset properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); //消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"bigdata"); //创建消费者 KafkaConsumerconsumer = new KafkaConsumer (properties); //订阅主题 consumer.subscribe(Collections.singletonList("first")); //获取数据 poll(time):如果某次拉取数据时为空就会延时time秒再去拉取 ConsumerRecords consumerRecords = consumer.poll(100); for(ConsumerRecord consumerRecord : consumerRecords){ System.out.println(consumerRecord.key()+"---"+consumerRecord.value()); } consumer.close(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)