import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class SimpleKafkaProducer { private static KafkaProducer2、生产者到Broker发送流程producer; private final static String TOPIC = "adienTest2"; public SimpleKafkaProducer(){ Properties props = new Properties(); //服务器IP props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); //序列化器,序列化成字节数组byte[] props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //设置分区类,根据key进行数据分区 producer = new KafkaProducer (props); } public void produce(){ for (int i = 30;i<40;i++){ String key = String.valueOf(i); String data = "hello kafka message:"+key; producer.send(new ProducerRecord (TOPIC,key,data)); System.out.println(data); } producer.close(); } public static void main(String[] args) { new SimpleKafkaProducer().produce(); } }
①kafkaProcuder发送的消息先进入客户端本地内存缓冲区(默认是16kb)
②之后把很多消息收集到Bath里
③最后一次性发送到Broker上
//设置发送消息本地缓冲大小,消息会优先发送到本地缓冲区 props.put("buffer.memory", 33554432); //设置批量发送消息的大小,如果一个batch满了,即达到16k就会发送出去 props.put("batch.size", 16384); //设置消息延迟发送时间, props.put("linger.ms", 1);3、同步发送&异步发送
- 消息发送主要涉及两个线程:Main用户主线程,Sender线程
- Main线程:发送消息到消息内存缓冲区后立即返回
- sender线程:从消息内存缓冲区拉取数据到broker
- 原理:生产者发送消息后没有收到ack,生产者会阻塞3s,之后重试发送3次
- 返回:发送消息后返回的是一个Future对象,调用get进行阻塞
(1)编写脚本 – 同步发送
package net.testclass.testclasskafka; import com.sun.org.apache.xpath.internal.functions.FuncTrue; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.Recordmetadata; import org.junit.jupiter.api.Test; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class MyProducer { private static final String TOPIC_NAME = "first"; // 1、封装配置属性 public static Properties getProperties(){ Properties props = new Properties(); props.put("bootstrap.servers","192.168.6.102:9092"); props.put("acks","all"); props.put("retries",0); props.put("linger.ms","1"); props.put("batch.size",16384); props.put("buffer.memory",3554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); return props; } // 2、生产者同步发送 @Test public void testSend(){ Properties properties = getProperties(); // 传递参数 Producerproducer = new KafkaProducer (properties); // 发送消息 for (int i=0;i<3 ;i++){ Future future = producer.send(new ProducerRecord<>(TOPIC_NAME,"key","value"+i)); try{ //使用get方法进行阻塞 Recordmetadata recordmetadata = future.get(); //打印发送内容:topi-partition@offset System.out.println("发送装:"+recordmetadata.toString()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 关闭生产者 producer.close(); } }
(2)运行结果
- 原理:生产者发送完消息后就可以执行之后的任务,broker在收到消息后异步调用生产者提供的callback方法
- 返回:配置回调函数,在Prducer收到ack时被调用
(1)编写脚本 – 异步发送
- 回调函数的两个参数:Recordmetadata和Exception,如果Exception是null,则消息发送成功,否则发送失败
package net.testclass.testclasskafka; import com.sun.org.apache.xpath.internal.functions.FuncTrue; import org.apache.kafka.clients.producer.*; import org.junit.jupiter.api.Test; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class MyProducer { private static final String TOPIC_NAME = "first"; // 1、封装配置属性 public static Properties getProperties(){ Properties props = new Properties(); props.put("bootstrap.servers","192.168.6.102:9092"); props.put("acks","all"); props.put("retries",0); props.put("linger.ms","1"); props.put("batch.size",16384); props.put("buffer.memory",3554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); return props; } // 2、生产者同步发送 @Test public void testSend(){ Properties properties = getProperties(); // 传递参数 Producerproducer = new KafkaProducer (properties); // 发送消息 for (int i=0;i<3 ;i++){ producer.send(new ProducerRecord<>(TOPIC_NAME, "key", "value" + i), new Callback() { @Override public void onCompletion(Recordmetadata recordmetadata, Exception exception) { if(exception == null) { System.out.println("发送状态:"+recordmetadata.toString()); } else{ exception.printStackTrace(); } } }); } // 关闭生产者 producer.close(); } }
(2)运行结果
4、发送消息到Broker的分区配置(1)分区配置
-
如果指定Partition ID,则Record被发送至指定Partition
-
如果未指定Partition但指定了Key,则Record按照hash(key)发送至对应key
-
如果未指定PartitionID,也没指定Key,Record会按照轮询模式发送到每个Partition
-
如果同时指定了Partition和key,Record只会发送到指定的Partition,key不起作用
(2)PeoducerRecord概述
- 发送给Kafka Broker的key/value值对,封装基础数据信息
- 数据格式:topic+Partition+Key+Value
(3)Key概述
- 如果key为空,kafka使用默认的partitioner,使用RoundRobin算法将消息均衡分布到各个partition上
- 如果key不为空,kafka使用自己实现的hash方法对key进行散列,决定消息该被写到topic的那个partition上
- 拥有相同key的消息会被写到同一个partition,实现顺序消息
(4)代码实战 - 指定分区发送
- 查看源码
- 编写代码
package net.testclass.testclasskafka; import com.sun.org.apache.xpath.internal.functions.FuncTrue; import org.apache.kafka.clients.producer.*; import org.junit.jupiter.api.Test; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class MyProducer { private static final String TOPIC_NAME = "sptest-topic"; // 1、封装配置属性 public static Properties getProperties(){ Properties props = new Properties(); props.put("bootstrap.servers","192.168.6.102:9092"); props.put("acks","all"); props.put("retries",0); props.put("linger.ms","1"); props.put("batch.size",16384); props.put("buffer.memory",3554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); return props; } // 2、生产者同步发送 @Test public void testSend(){ Properties properties = getProperties(); // 传递参数 Producerproducer = new KafkaProducer (properties); // 发送消息 for (int i=0;i<3 ;i++){ // 将消息发送到指定分区4 producer.send(new ProducerRecord<>(TOPIC_NAME, 4,"key", "value" + i), new Callback() { @Override public void onCompletion(Recordmetadata recordmetadata, Exception exception) { if(exception == null) { System.out.println("发送状态:"+recordmetadata.toString()); } else{ exception.printStackTrace(); } } }); } // 关闭生产者 producer.close(); } }
(5)运行结果
- 输出数据:topic-分区编号@offset
- ACK机制:消息持久化机制
put(Produceconfig.ACKS_CONFIG,"1")
(1)ACK=0
- 含义:producer不需要等待任何Broker确认收到消息,就可以继续发送下一条消息
- 优缺点:性能最高,但容易丢消息
(2)ACKS=1
-
含义:最少等待leader成功将数据写入到本地log,但不需要等待所有的follower是否成功写入,就可以继续发送下一条消息
-
优缺点:如果folloer没有成功备份数据,同时leader挂掉,则消息会丢失
(3)ACKS=all/-1
-
含义:将生产者发送的消息写入leader和所有的folloer,才可以发送下一条消息,这种策略会保证只要有一个备份存活就不会丢数据
-
优缺点:最强的数据保证,性能比较差。
-
注意事项:此类型情况常常与min.insync.replicas=n一起配置,当n为1时与ack=1相当。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)