简介:本篇博客是对kafka produce 生产者分区器的API(Java)
包含以下内容:分区使用原则,分区器使用原则,分区器相关代码编写及pom.xml配置文件编写,到最后的运行结果。
使用kafka producer分区器的好处:分区原则分区器API
相关配置代码编写
消费者代码编写自定义partition代码创建CustomProducerCallBackDemo类使用自定义分区 运行测试结果
使用kafka producer分区器的好处:1、方便在集群中扩展
2、可以提高并发性
分区器API 相关配置1、 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
2、没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
3、 既没有 partition 值又没有 key 值的情况下, kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,kafka再随机一个分区进行使用.(以前是一条条的轮询,现在是一批次的轮询)
1、创建kafka api的maven项目
2、给pom.xml配置成如下样子:
4.0.0 org.example kafkaDemmo1.0-SNAPSHOT 8 8 org.apache.kafka kafka-clients2.4.1 org.slf4j slf4j-nop1.7.2
3、在main下的resources中添加log4j.properties文件,内容如下:
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n代码编写 消费者代码编写
代码内容如下:
package com.lqs.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; public class CustomConsumerDemo { public static void main(String[] args) { Properties properties = new Properties(); //给消费者配置对象添加参数 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "bdc112:9092"); //配置序列化,必须要配置 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //配置消费者组对象,这也是必须要配置的 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo"); //创建消费者对象 KafkaConsumer自定义partition代码stringStringKafkaConsumer = new KafkaConsumer<>(properties); //注册主题 ArrayList strings = new ArrayList<>(); strings.add("first01"); stringStringKafkaConsumer.subscribe(strings); //拉取数据打印 while (true) { ConsumerRecords consumerRecords = stringStringKafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }
package com.lqs.kafka.partitioner; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public class PartitionerDemo implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取消息 String s1 = value.toString(); //创建partition int partition; //判断消息是否包含lqs if (s1.contains("lqs")) { partition = 0; //判断消息是否包含test } else if (s1.contains("test")) { partition = 1; } else { partition = 2; } return partition; } @Override public void close() { } @Override public void configure(Map创建CustomProducerCallBackDemo类使用自定义分区configs) { } }
package com.lqs.kafka.partitioner; import org.apache.kafka.clients.producer.*; import java.util.Properties; public class CustomProducerCallBackDemo { public static void main(String[] args) { //创建配置对象 Properties properties = new Properties(); //给配置对象添加链接 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bdc112:9092"); //设置批次大小,默认为16k properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //设置等待时间为1毫秒 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); //设置RecordAccumulator(记录累加器)缓冲区大小为默认值32m properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); //设置key和value的序列化,注意,这个是必须要设置的 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //自定义分区 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.lqs.kafka.partitioner.PartitionerDemo"); //创建kafka生产者对象 KafkaProducer运行测试结果stringStringKafkaProducer = new KafkaProducer<>(properties); for (int i = 0; i < 12; i++) { if (i % 2 == 0) { //调用send方法发送消息 stringStringKafkaProducer.send(new ProducerRecord<>("first01", "lqs" + i), new Callback() { @Override public void onCompletion(Recordmetadata metadata, Exception exception) { //判断是否发送成功 if (exception != null) { exception.printStackTrace(); } else { System.out.println(metadata.toString()); } } }); } else { stringStringKafkaProducer.send(new ProducerRecord<>("first01", "sfa" + i), new Callback() { @Override public void onCompletion(Recordmetadata metadata, Exception exception) { if (exception != null) { exception.printStackTrace(); } else { System.out.println(metadata.toString()); } } }); } } //关闭资源链接 stringStringKafkaProducer.close(); } }
注意:在运行前记得先启动Zookeeper和kafka!!!
1、先运行消费者代码
2、运行生产者代码
3、查看刚运行的消费者代码结果里面出现了新的内容:
出现一下内容,说明手写成功。
F:installSoftwareIDEJavajdk1.8.0_202binjava.exe "-javaagent:F:installSoftwareIDEIntelliJ IDEA 2021.2libidea_rt.jar=4587:F:installSoftwareIDEIntelliJ IDEA 2021.2bin" -Dfile.encoding=UTF-8 -classpath F:installSoftwareIDEJavajdk1.8.0_202jrelibcharsets.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibdeploy.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibextaccess-bridge-64.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibextcldrdata.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibextdnsns.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibextjaccess.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibextjfxrt.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibextlocaledata.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibextnashorn.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibextsunec.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibextsunjce_provider.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibextsunmscapi.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibextsunpkcs11.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibextzipfs.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibjavaws.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibjce.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibjfr.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibjfxswt.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibjsse.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibmanagement-agent.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibplugin.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibresources.jar;F:installSoftwareIDEJavajdk1.8.0_202jrelibrt.jar;F:DatacodeWorkSpacemyJDBCCodejdbclibdruid-1.1.10.jar;F:DatacodeWorkSpaceKafkaCodekafkaDemmotargetclasses;F:installSoftwareBigDatasapache-maven-3.5.4data-repositoryorgapachekafkakafka-clients2.4.1kafka-clients-2.4.1.jar;F:installSoftwareBigDatasapache-maven-3.5.4data-repositorycomgithublubenzstd-jni1.4.3-1zstd-jni-1.4.3-1.jar;F:installSoftwareBigDatasapache-maven-3.5.4data-repositoryorglz4lz4-java1.6.0lz4-java-1.6.0.jar;F:installSoftwareBigDatasapache-maven-3.5.4data-repositoryorgxerialsnappysnappy-java1.1.7.3snappy-java-1.1.7.3.jar;F:installSoftwareBigDatasapache-maven-3.5.4data-repositoryorgslf4jslf4j-api1.7.28slf4j-api-1.7.28.jar;F:installSoftwareBigDatasapache-maven-3.5.4data-repositoryorgslf4jslf4j-nop1.7.2slf4j-nop-1.7.2.jar com.lqs.kafka.consumer.CustomConsumerDemo ConsumerRecord(topic = first01, partition = 2, leaderEpoch = 2, offset = 24, CreateTime = 1640754679164, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = sfa1) ConsumerRecord(topic = first01, partition = 2, leaderEpoch = 2, offset = 25, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = sfa3) ConsumerRecord(topic = first01, partition = 2, leaderEpoch = 2, offset = 26, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = sfa5) ConsumerRecord(topic = first01, partition = 2, leaderEpoch = 2, offset = 27, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = sfa7) ConsumerRecord(topic = first01, partition = 2, leaderEpoch = 2, offset = 28, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = sfa9) ConsumerRecord(topic = first01, partition = 2, leaderEpoch = 2, offset = 29, CreateTime = 1640754679166, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = sfa11) ConsumerRecord(topic = first01, partition = 0, leaderEpoch = 2, offset = 213, CreateTime = 1640754679156, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = lqs0) ConsumerRecord(topic = first01, partition = 0, leaderEpoch = 2, offset = 214, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = lqs2) ConsumerRecord(topic = first01, partition = 0, leaderEpoch = 2, offset = 215, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = lqs4) ConsumerRecord(topic = first01, partition = 0, leaderEpoch = 2, offset = 216, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = lqs6) ConsumerRecord(topic = first01, partition = 0, leaderEpoch = 2, offset = 217, CreateTime = 1640754679165, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = lqs8) ConsumerRecord(topic = first01, partition = 0, leaderEpoch = 2, offset = 218, CreateTime = 1640754679166, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadonly = false), key = null, value = lqs10)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)