java版的producer开发流程
1、构造 Properties 对象2、构造 Kafka Producer 对象3、 构造 ProducerRecord 对象4、发送消息
4.1、异步发送4.2、同步发送4.3、异常信息
5、关闭producer
Java 版本 producer 工作流程如图:
Java 版本 producer 工作流程
import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class ProducerTest { public static void main(String[] args) throws Exception { // 1、构造 Properties 对象 Properties props = new Properties(); props.put("bootstrap.servers", " localhost:9092");// 必填 props.put("key.serializer", "org.apache.kafka.common.serialization. StringSerializer");// 必填 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 必填 props.put("acks", "-1"); props.put("retries", 3); props.put("batch.size", 323840); props.put("linger.ms", 10); props.put("buffer.memory", 33554432); props.put("max.block.ms", 3000); // 2、构造 Kafka Producer 对象 Producer1、构造 Properties 对象producer = new KafkaProducer<>(props); //3、 构造 ProducerRecord 对象 ProducerRecord producerRecord = new ProducerRecord<>("my-topic ", "我是第一条数据"); //4、 发送消息 producer.send(producerRecord); // 5、关闭连接 producer.close(); } }
bootstrap.servers:必填的属性,该参数指定了一组host:port 对,用于创建向 Kafka broker 服务器的连接,比如:kl:9092,k2:9092,k3:9092。上面的代码清单中指定了localhost:9092, producer 使用时需要替换成实际的broker 列表。
key.serializer:key的序列化类,也可以在第二步,构造KafkaProducer对象时候,通过构造方法传入
value.serializer:value的序列化类,也可以在第二步,构造KafkaProducer对象时候,通过构造方法传入
其他参数,详细参看《kafka–producer参数详解》
参数的名字,可以对照org.apache.kafka.clients.producer.ProducerConfig.class里的静态属性
2、构造 Kafka Producer 对象 3、 构造 ProducerRecord 对象 4、发送消息 4.1、异步发送实际上所有的写入 *** 作默认都是异步的。 Java 版本 producer 的send 方法会返回一个 Java Future 对象供用户稍后获取发送结果,
这就是所谓的回调机制。 send 方法提供了回调类参数来实现异步发送以及对发送结果的响应,具体代码如下:
producer.send(producerRecord, new Callback() { @Override public void onCompletion(Recordmetadata metadata, Exception exception) { if (exception == null) { // 消息发送成功 } else { // 执行错误处理逻辑 } } });4.2、同步发送
同步发送和异步发送其实就是通过 Java Future 来区分的,调用Future .get()无限等待结果返回,
即实现同步发送的效果,具体代码如下:
producer.send(producerRecord).get();
使用 Futur.get 会一直等待下去直至 Kafka broker 发送结果返回给 producer 程序。
当结果从 broker 处返回时 get 方法要么返回发送结果,要么抛出异常交由 producer 自行处理。
如果没有错误, get 将返回对应的 Recordmetadata 实例(包含了己发送消息的所有元数据信息),
包括topic 、分区以及该消息在对应分区的位移信息。
4.3、异常信息不管是同步发送还是异步发送,发送都有可能失败,导致返回异常错误。
当前 Kafka 的错误类型包含了两类:可重试异常和不可重试异常。
1、常见的可重试异常如下:
LeaderNotAvailableException :分区的 leader 副本不可用,这通常出现在 leader 换届选举期间,重试之后可以自行恢复。
NotControllerException: controller不可用, 这通常表明 controller在经历新一轮的选举,这也是可以通过重试机制自行恢复的。
NetworkException :网络瞬时故障导致的异常,可重试.。
对于这种可重试的异常,如果在 producer 程序中配置了重试次数,
那么只要在规定的重试次数内自行恢复了,便不会出现在 onCompletion的exception中。
不过若超过了重试次数仍没有成功,则仍然会被封装进 exception 中。此时就需要 producer 程序自行处理这种异常。
所有可重试异常都继承自 org.apache.kafka .cornmon errors RetriableException抽象类;
2、不可重试异常:
RecordTooLargeException :发送的消息尺寸过大,超过了规定的大小上限;
SerializationException:序列化失败异常;
KafkaException :其他类型的异常;
所有这些不可重试异常一旦被捕获都会被封装进 Future 的计算结果井返回给 producer 程序。
代码解决方式:
// 4、 发送消息 producer.send(producerRecord, (metadata, exception) -> { if (exception == null) { // 正常的业务 } else { if (exception instanceof RetriableException) { // 处理可重试瞬时异常 } else { // 处理不可重试异常 } } });5、关闭producer
producer 程序结束时一定要关闭 producer !
毕竟 producer 程序运行时占用了系统资源,因此必须要显式地调用KafkaProducer.close 方法关闭 producer 。
如果是调用无参数 close 方法, producer 会先处理完后再关闭,即所谓的“优雅”关闭退出 ;
同时, KafkaProducer 还提供了 个带超时参数的 close 方法 close(timeout);
如果调用此方法, producer 会等待 timeout 时间来完成所有处理中的请求,然后强行退出。
这就是说,若 timeout 超时,则 producer 会强制结束,并立即丢弃所有未发送以及未应答的发送请求。
因此在实际场景中一定要谨慎使用带超时的 close 方法。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)