- 1 生产者
- 2 消费者
import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.LoggerContext; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.ScramMechanism; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.Recordmetadata; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; public class KafkaProducerTest { static { //设置log级别 LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); ListloggerList = loggerContext.getLoggerList(); loggerList.forEach(logger -> logger.setLevel(Level.WARN)); } public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.80.130:9092"); // leader将等待所有副本同步完成后,才确认发送成功 取值:all, -1 , 1, 0 props.put(ProducerConfig.ACKS_CONFIG, "-1"); // 限制单条消息大小,限制发送请求大小10M,默认值1048576 -> 1M props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 10); // 批量处理请求数3M, 默认值16384 -> 16k props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024 * 1024 * 3); // 批处理等待时间1s,请求没有达到batch.size,将最多等待这么长时间然后批处理发送,默认值0 // props.put(ProducerConfig.LINGER_MS_CONFIG, 1000); // 设置失败重试次数。 props.put(ProducerConfig.RETRIES_CONFIG, 3); // 等待请求响应的最长时间,如果超时时间内未收到响应,则会重试或者返回失败,默认值:30000 -> 30s props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); // Kafka消息的序列化方式。 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // kafka安全认证相关配置,kafka没有配置认证授权可忽略 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_256.mechanismName()); props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writer";"); KafkaProducer producer = new KafkaProducer<>(props); for (int i = 0; i < 5; i++) { Recordmetadata test = producer.send(new ProducerRecord<>("test", "{"data":"test" + i + ""}")).get(); System.out.println(test.offset() + "--------" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); } producer.close(); } }
关于生产者常用的配置参数如下:
3.0.0之前版本,默认值为1producer需要server接收到数据之后发出的确认接收的信号,此项配置就是指procuder需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项:
(1)acks=0:设置为0表示producer不需要等待来自服务器的任何确认。消息将立即加到buffer并认为已经发送。没有任何保障可以保证此种情况下broker已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败),每条消息回馈的offset会总是设置为-1;
(2)acks=1:这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失;
(3)acks=all:这意味着leader需要等待所有副本都成功写入日志,这种策略会保证只要有一个副本存活就不会丢失数据。这是最强的保证。和acks=-1效果相同。
import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.LoggerContext; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ScramMechanism; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Properties; public class KafkaConsumerTest { static { //设置log级别 LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); ListloggerList = loggerContext.getLoggerList(); loggerList.forEach(logger -> logger.setLevel(Level.WARN)); } public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.80.130:9092"); //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认3s。 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 * 8); //获取消息后提交偏移量的最大时间(默认5分钟),超时服务端会认为消费者失效,触发Rebalance,并且提交offset会失败 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000); //是否自动提交offset props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //每次Poll的最大数量。注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。默认50 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); //消息的反序列化方式。 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //属于同一个组的消费实例,会负载消费消息。 props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test"); // kafka安全认证相关配置,kafka没有配置认证授权可忽略 props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_256.mechanismName()); props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader";"); KafkaConsumer consumer = new KafkaConsumer<>(props); // 订阅主题 consumer.subscribe(Collections.singletonList("test")); while (true) { ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord consumerRecord : consumerRecords) { System.out.println("offset: " + consumerRecord.offset() + "----- message: " + consumerRecord.value()); } // 提交offset consumer.commitSync(); } } }
关于生产者常用的配置参数如下:
earliest: 从最开始的位置开始消费
latest: 消费者启用后,从最新的位置开始消费
none: 如果没有消费者组之前的偏移量,抛出异常
有疑问的同学可以参考这篇博客: Kafka auto.offset.reset值详解
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)