创建类实现ProducerInterceptor接口
1.时间拦截器
package com.zch.kafka.interceptor; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.Recordmetadata; import java.util.Map; public class TimeInterceptor implements ProducerInterceptor{ public void configure(Map configs) { } public ProducerRecord onSend(ProducerRecord record) { // 1、取出数据 String value = record.value(); // 2、创建一个新的record对象 ProducerRecord producerRecord = new ProducerRecord (record.topic(), record.partition() , record.key(), System.currentTimeMillis() + "," + record.value()); return producerRecord; } public void onAcknowledgement(Recordmetadata metadata, Exception exception) { } public void close() { } }
2.次数拦截器
package com.zch.kafka.interceptor; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.Recordmetadata; import java.util.Map; public class CountInterceptor implements ProducerInterceptor{ int success = 0; int error = 0; public ProducerRecord onSend(ProducerRecord record) { return record; } public void onAcknowledgement(Recordmetadata metadata, Exception exception) { if (metadata != null){ success ++; }else { error ++; } } public void close() { System.out.println("success"+success); System.out.println("error"+error); } public void configure(Map configs) { } }
3.使用自定义拦截器
一个生产者可以使用多个拦截器,可将拦截器放在一个list数组中,将数组放置到配置文件中
package com.zch.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.ArrayList; import java.util.Properties; public class InterceptorProducer { public static void main(String[] args) { // 创建Kafka生产者的配置信息 Properties properties = new Properties(); properties.put("bootstrap.servers","zhaohui01:9092"); // 序列化 properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); // 添加拦截器 ArrayListstrings = new ArrayList (); strings.add("com.zch.kafka.interceptor.TimeInterceptor"); strings.add("com.zch.kafka.interceptor.CountInterceptor"); properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,strings); // 创建生产者对象 KafkaProducer producer = new KafkaProducer (properties); // 发送数据 for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord ("first02","bigdata","zhaochaohui--"+i)); } // 关闭资源 producer.close(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)