- 一、引入pom.xml依赖
- 二、java实现kafka生产者
- 三、发送消息到指定分区上
二、java实现kafka生产者org.apache.kafka kafka-clients2.7.2
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.Recordmetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; public class MyProducer { private final static String TOPIC_NAME = "optics-topic"; public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092"); props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " + "required username="debezium" password="NGFlM2I1NTJlNmFk";"); props.put("security.protocol","SASL_PLAINTEXT"); props.put("sasl.mechanism","PLAIN"); //ack模式,all是最慢但最安全的 props.put("acks", "-1"); //失败重试次数 props.put("retries", 0); //每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端 props.put("batch.size", 10); //props.put("max.request.size",10); //消息在缓冲区保留的时间,超过设置的值就会被提交到服务端 props.put("linger.ms", 10000); //整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端 //buffer.memory要大于batch.size,否则会报申请内存不足的错误 props.put("buffer.memory", 10240); //序列化器 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducerproducer = new KafkaProducer (props); //key:作用是决定了往哪个分区上发,value:具体要发送的消息内容 for (int i = 0; i < 10; i++) { Recordmetadata metadata = producer.send(new ProducerRecord (TOPIC_NAME, Integer.toString(i), "dd:" + i)).get(); System.out.println("同步方式发送消息结果:" + "topic名称:" + metadata.topic() + " | partition分区:" + metadata.partition() + " | offset偏移量:" + metadata.offset()); } } }
输出如下所示:
同步方式发送消息结果:topic名称:optics-topic | partition分区:0 | offset偏移量:6 同步方式发送消息结果:topic名称:optics-topic | partition分区:2 | offset偏移量:8 同步方式发送消息结果:topic名称:optics-topic | partition分区:2 | offset偏移量:9 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:6 同步方式发送消息结果:topic名称:optics-topic | partition分区:0 | offset偏移量:7 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:7 同步方式发送消息结果:topic名称:optics-topic | partition分区:0 | offset偏移量:8 同步方式发送消息结果:topic名称:optics-topic | partition分区:0 | offset偏移量:9 同步方式发送消息结果:topic名称:optics-topic | partition分区:2 | offset偏移量:10三、发送消息到指定分区上
KafkaProducerproducer = new KafkaProducer (props); //key:作用是决定了往哪个分区上发,value:具体要发送的消息内容 for (int i = 0; i < 10; i++) { Recordmetadata metadata = producer.send(new ProducerRecord (TOPIC_NAME, 1,Integer.toString(i), "dd:" + i)).get(); System.out.println("同步方式发送消息结果:" + "topic名称:" + metadata.topic() + " | partition分区:" + metadata.partition() + " | offset偏移量:" + metadata.offset()); }
输出如下所示:
同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:9 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:10 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:11 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:12 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:13 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:14 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:15 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:16 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:17
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)