Kafka生产者

Kafka生产者,第1张

Kafka生产者

Kafka生产者_消费者实战
  • 1 生产者
  • 2 消费者

1 生产者
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Recordmetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerTest {

    static {
        //设置log级别
        LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
        List loggerList = loggerContext.getLoggerList();
        loggerList.forEach(logger -> logger.setLevel(Level.WARN));
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.80.130:9092");
        // leader将等待所有副本同步完成后,才确认发送成功 取值:all, -1 , 1, 0
        props.put(ProducerConfig.ACKS_CONFIG, "-1");
        // 限制单条消息大小,限制发送请求大小10M,默认值1048576 -> 1M
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 10);
        // 批量处理请求数3M, 默认值16384 -> 16k
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 1024 * 3);
        // 批处理等待时间1s,请求没有达到batch.size,将最多等待这么长时间然后批处理发送,默认值0
//        props.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
        // 设置失败重试次数。
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 等待请求响应的最长时间,如果超时时间内未收到响应,则会重试或者返回失败,默认值:30000 -> 30s
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
        // Kafka消息的序列化方式。
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // kafka安全认证相关配置,kafka没有配置认证授权可忽略
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
        props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_256.mechanismName());
        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writer";");

        KafkaProducer producer = new KafkaProducer<>(props);
        for (int i = 0; i < 5; i++) {
            Recordmetadata test = producer.send(new ProducerRecord<>("test", "{"data":"test" + i + ""}")).get();
            System.out.println(test.offset() + "--------" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        }
        producer.close();
    }
}

关于生产者常用的配置参数如下:

属性默认值描述key.serializer默认为消息key的class消息key序列化使用的类,class类型value.serializer默认为消息value的class消息value序列化使用的类,class类型acks3.0.0 版本,默认值为all,等效于-1
3.0.0之前版本,默认值为1producer需要server接收到数据之后发出的确认接收的信号,此项配置就是指procuder需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项:
(1)acks=0:设置为0表示producer不需要等待来自服务器的任何确认。消息将立即加到buffer并认为已经发送。没有任何保障可以保证此种情况下broker已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败),每条消息回馈的offset会总是设置为-1;
(2)acks=1:这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失;
(3)acks=all:这意味着leader需要等待所有副本都成功写入日志,这种策略会保证只要有一个副本存活就不会丢失数据。这是最强的保证。和acks=-1效果相同。bootstrap.servers”“kafka集群的连接地址,这个列表格式: host1:port1,host2:port2,… 。这些 server 仅仅是用于初始化的连接,以发现集群所有成员关系(可能会动态的变化),这个列表不需要包含所有的 servers(你可能想要不止一个server,尽管这样,可能某个server宕机了)。如果没有 server 在这个列表出现,则发送数据会一直失败,直到列表可用。buffer.memory33554432producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。max.request.size1048576请求的最大字节数。此设置将限制producer在单个请求中批处理发送的记录个数,以避免发送大量请求。batch.size16384producer将试图批处理消息记录,以减少请求次数。这将改善client与server之间的性能。这项配置控制默认的批量处理消息字节数。不会试图处理大于这个字节数的消息字节数。发送到brokers的请求将包含多个批量处理,其中会包含对每个partition的一个请求。较小的批量处理数值比较少用,并且可能降低吞吐量(0则会仅用批量处理)。较大的批量处理数值将会浪费更多内存空间,这样就需要分配特定批量处理数值的内存大小。linger.ms0producer把在请求传输之间到达的消息组合到一个批处理请求中。通常来说,这只有在记录产生速度大于发送速度的时候才能发生。然而,在某些条件下,客户端也可能希望减少请求的数量。这项设置将通过增加小的延迟来实现这一点,即producer不会立即发送记录,而是等待给定的延迟,从而可以将发送的记录批处理在一起,这些消息记录可以批量发送处理。这可以认为是TCP种Nagle的算法类似。这项设置设定了批量处理的更高的延迟边界:一旦我们达到batch.size,它将会立即发送而不管这项设置,但是如果我们为该分区累积的字节数少于此数量,我们将“逗留”指定的时间,等待更多记录显示。这个设置默认为0,即没有延迟。例如,设定linger.ms=5将会减少请求的发送数,但是发送的记录会增加5毫秒的延迟。retries0重试次数,设置大于0的值,如果数据发送失败,将使客户端重新发送这条数据。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。request.timeout.ms30000配置客户端等待请求响应的最长时间。如果在超时时间过去之前未收到响应,则客户端将在必要时重新发送请求,或者在重试次数用尽时使请求失败。delivery.timeout.ms120000调用send()返回成功或失败的时间上限。这限制了记录在发送前延迟的总时间、等待代理确认的时间(如果预期)以及可重试发送失败的允许时间。如果遇到不可恢复的错误,重试次数已用尽,或者记录添加到达到较早交付到期期限的批次中,则生产者可能会报告未能在此配置之前发送记录。此配置的值应大于或等于request.timeout.ms和linger.ms之和。 2 消费者
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

public class KafkaConsumerTest {

    static {
        //设置log级别
        LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
        List loggerList = loggerContext.getLoggerList();
        loggerList.forEach(logger -> logger.setLevel(Level.WARN));
    }

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.80.130:9092");
        //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认3s。
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 * 8);
        //获取消息后提交偏移量的最大时间(默认5分钟),超时服务端会认为消费者失效,触发Rebalance,并且提交offset会失败
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000);
        //是否自动提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //每次Poll的最大数量。注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。默认50
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        //消息的反序列化方式。
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //属于同一个组的消费实例,会负载消费消息。
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");

        // kafka安全认证相关配置,kafka没有配置认证授权可忽略
        props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
        props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_256.mechanismName());
        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader";");

        KafkaConsumer consumer = new KafkaConsumer<>(props);
        // 订阅主题
        consumer.subscribe(Collections.singletonList("test"));


        while (true) {
            ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord consumerRecord : consumerRecords) {
                System.out.println("offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value());
            }
            // 提交offset
            consumer.commitSync();
        }
    }
}

关于生产者常用的配置参数如下:

属性默认值描述group.idnull消费者组名称,如果多个consumer设置同一个group.id,那么它们属于同一个Consumer Group。如果consumer 使用subscribe(topic)订阅或者基于偏移量管理测策略,则需要设置此值。key.deserializer消息key反序列化的类,class类型value.deserializer消息value反序列化的类,class类型heartbeat.interval.ms3000消费者协调员之间的心跳间隔时间。心跳用于确保消费者会话保持活跃,并在新消费者加入或离开群组时促进重平衡rebalance。该值必须设置为低于session.timeout.ms,但通常应设置为不高于该值的1/3。它可以调整得更低,以控制正常情况下再平衡的预期时间。max.poll.interval.ms300000poll()方式获取消息后提交偏移量的最大时间(默认5分钟),超时服务端会认为消费者失效,触发Rebalance,并且提交offset会失败enable.auto.committrue是否自动提交offsetauto.offset.resetlatest如果Kafka中没有初始偏移量,配置消费策略,取值如下:
earliest: 从最开始的位置开始消费
latest: 消费者启用后,从最新的位置开始消费
none: 如果没有消费者组之前的偏移量,抛出异常
有疑问的同学可以参考这篇博客: Kafka auto.offset.reset值详解max.poll.records500每次poll()返回的最大数量。consumer缓存消息记录,记录数达到此值或者最新offset返回。session.timeout.ms10000 (10 seconds)worker定期向broker发送心跳信号,以表明其活跃程度。如果在此会话超时过期之前broker未收到心跳,则broker将从group中删除该worker并启动重平衡。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5574251.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存