第六章 kafka专题之SpringBoot整合KAFKA之生产者代码实战案例

第六章 kafka专题之SpringBoot整合KAFKA之生产者代码实战案例,第1张

第六章 kafka专题之SpringBoot整合KAFKA之生产者代码实战案例 1、Java实现KafkaProducer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class SimpleKafkaProducer {
    private static KafkaProducer producer;
    private final static String TOPIC = "adienTest2";
    public SimpleKafkaProducer(){
        Properties props = new Properties();
        //服务器IP
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        //序列化器,序列化成字节数组byte[]
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //设置分区类,根据key进行数据分区
        producer = new KafkaProducer(props);
    }
    public void produce(){
        for (int i = 30;i<40;i++){
            String key = String.valueOf(i);
            String data = "hello kafka message:"+key;
            producer.send(new ProducerRecord(TOPIC,key,data));
            System.out.println(data);
        }
        producer.close();
    }

    public static void main(String[] args) {
        new SimpleKafkaProducer().produce();
    }
}
2、生产者到Broker发送流程

①kafkaProcuder发送的消息先进入客户端本地内存缓冲区(默认是16kb)

②之后把很多消息收集到Bath里

③最后一次性发送到Broker上

//设置发送消息本地缓冲大小,消息会优先发送到本地缓冲区
props.put("buffer.memory", 33554432);

//设置批量发送消息的大小,如果一个batch满了,即达到16k就会发送出去
props.put("batch.size", 16384);

//设置消息延迟发送时间,

props.put("linger.ms", 1);
3、同步发送&异步发送
  • 消息发送主要涉及两个线程:Main用户主线程,Sender线程
    • Main线程:发送消息到消息内存缓冲区后立即返回
    • sender线程:从消息内存缓冲区拉取数据到broker
3.1、同步发送
  • 原理:生产者发送消息后没有收到ack,生产者会阻塞3s,之后重试发送3次
  • 返回:发送消息后返回的是一个Future对象,调用get进行阻塞

(1)编写脚本 – 同步发送

package net.testclass.testclasskafka;

import com.sun.org.apache.xpath.internal.functions.FuncTrue;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Recordmetadata;
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class MyProducer {
    private static final String TOPIC_NAME = "first";
    // 1、封装配置属性
    public static Properties getProperties(){
        Properties props = new Properties();
        props.put("bootstrap.servers","192.168.6.102:9092");

        props.put("acks","all");
        props.put("retries",0);
        props.put("linger.ms","1");
        props.put("batch.size",16384);
        props.put("buffer.memory",3554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }
    // 2、生产者同步发送
    @Test
    public void testSend(){
        Properties properties = getProperties();

        // 传递参数
        Producer producer = new KafkaProducer(properties);

        // 发送消息
        for (int i=0;i<3 ;i++){
            Future future =  producer.send(new ProducerRecord<>(TOPIC_NAME,"key","value"+i));

            try{
                //使用get方法进行阻塞
                Recordmetadata recordmetadata = future.get();

                //打印发送内容:topi-partition@offset
                System.out.println("发送装:"+recordmetadata.toString());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        // 关闭生产者
        producer.close();
    }
}

(2)运行结果

3.2、异步发送
  • 原理:生产者发送完消息后就可以执行之后的任务,broker在收到消息后异步调用生产者提供的callback方法
  • 返回:配置回调函数,在Prducer收到ack时被调用

(1)编写脚本 – 异步发送

  • 回调函数的两个参数:Recordmetadata和Exception,如果Exception是null,则消息发送成功,否则发送失败
package net.testclass.testclasskafka;

import com.sun.org.apache.xpath.internal.functions.FuncTrue;
import org.apache.kafka.clients.producer.*;
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class MyProducer {
    private static final String TOPIC_NAME = "first";
    // 1、封装配置属性
    public static Properties getProperties(){
        Properties props = new Properties();
        props.put("bootstrap.servers","192.168.6.102:9092");

        props.put("acks","all");
        props.put("retries",0);
        props.put("linger.ms","1");
        props.put("batch.size",16384);
        props.put("buffer.memory",3554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }
    // 2、生产者同步发送
    @Test
    public void testSend(){
        Properties properties = getProperties();

        // 传递参数
        Producer producer = new KafkaProducer(properties);

        // 发送消息
        for (int i=0;i<3 ;i++){
            producer.send(new ProducerRecord<>(TOPIC_NAME, "key", "value" + i), new Callback() {
                @Override
                public void onCompletion(Recordmetadata recordmetadata, Exception exception) {
                    if(exception == null) {
                        System.out.println("发送状态:"+recordmetadata.toString());
                    }
                    else{
                        exception.printStackTrace();
                    }
                }
            });
        }
        // 关闭生产者
        producer.close();
    }
}

(2)运行结果

4、发送消息到Broker的分区配置

(1)分区配置

  • 如果指定Partition ID,则Record被发送至指定Partition

  • 如果未指定Partition但指定了Key,则Record按照hash(key)发送至对应key

  • 如果未指定PartitionID,也没指定Key,Record会按照轮询模式发送到每个Partition

  • 如果同时指定了Partition和key,Record只会发送到指定的Partition,key不起作用

(2)PeoducerRecord概述

  • 发送给Kafka Broker的key/value值对,封装基础数据信息
    • 数据格式:topic+Partition+Key+Value

(3)Key概述

  • 如果key为空,kafka使用默认的partitioner,使用RoundRobin算法将消息均衡分布到各个partition上
  • 如果key不为空,kafka使用自己实现的hash方法对key进行散列,决定消息该被写到topic的那个partition上
    • 拥有相同key的消息会被写到同一个partition,实现顺序消息

(4)代码实战 - 指定分区发送

  • 查看源码

  • 编写代码
package net.testclass.testclasskafka;

import com.sun.org.apache.xpath.internal.functions.FuncTrue;
import org.apache.kafka.clients.producer.*;
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class MyProducer {
    private static final String TOPIC_NAME = "sptest-topic";
    // 1、封装配置属性
    public static Properties getProperties(){
        Properties props = new Properties();
        props.put("bootstrap.servers","192.168.6.102:9092");

        props.put("acks","all");
        props.put("retries",0);
        props.put("linger.ms","1");
        props.put("batch.size",16384);
        props.put("buffer.memory",3554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }
    // 2、生产者同步发送
    @Test
    public void testSend(){
        Properties properties = getProperties();

        // 传递参数
        Producer producer = new KafkaProducer(properties);

        // 发送消息
        for (int i=0;i<3 ;i++){
            // 将消息发送到指定分区4
            producer.send(new ProducerRecord<>(TOPIC_NAME, 4,"key", "value" + i), new Callback() {
                @Override
                public void onCompletion(Recordmetadata recordmetadata, Exception exception) {
                    if(exception == null) {
                        System.out.println("发送状态:"+recordmetadata.toString());
                    }
                    else{
                        exception.printStackTrace();
                    }
                }
            });
        }
        // 关闭生产者
        producer.close();
    }
}

(5)运行结果

  • 输出数据:topic-分区编号@offset

5、ACK机制
  • ACK机制:消息持久化机制
put(Produceconfig.ACKS_CONFIG,"1")

(1)ACK=0

  • 含义:producer不需要等待任何Broker确认收到消息,就可以继续发送下一条消息
  • 优缺点:性能最高,但容易丢消息

(2)ACKS=1

  • 含义:最少等待leader成功将数据写入到本地log,但不需要等待所有的follower是否成功写入,就可以继续发送下一条消息

  • 优缺点:如果folloer没有成功备份数据,同时leader挂掉,则消息会丢失

(3)ACKS=all/-1

  • 含义:将生产者发送的消息写入leader和所有的folloer,才可以发送下一条消息,这种策略会保证只要有一个备份存活就不会丢数据

  • 优缺点:最强的数据保证,性能比较差。

  • 注意事项:此类型情况常常与min.insync.replicas=n一起配置,当n为1时与ack=1相当。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5690632.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存