Kafka的生产和消费(JAVA Apache Kafka)

Kafka的生产和消费(JAVA Apache Kafka),第1张

Kafka的生产和消费(JAVA Apache Kafka)

Kafka的生产和消费
  • 1.pom依赖
  • 2.application.properties 配置文件
  • 3. 消费者 配置自动读取
  • 4. 生产者 配置自动读取
  • 5. 消费者工具
  • 6.生产者工具
  • 7.测试 生产消费


1.pom依赖

如果是springboot项目可以不指定版本,自动匹配

		
		
			org.apache.kafka
			kafka-clients
			2.6.0
		
		
		
			org.apache.kafka
			kafka-streams
			2.6.0
		
2.application.properties 配置文件
server.port=2333

#—————————————————————————————————生产者———————————————————————————————————————————
#xxx服务器ip
jmw.kafka.producer.servers=xxxx:xxx
jmw.kafka.producer.topic=xxx

#所有follower都响应了才认为消息提交成功,即"committed"
jmw.kafka.ack=all
#retries = MAX 无限重试,直到你意识到出现了问题:
jmw.kafka.retries=0 
#producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数
jmw.kafka.batch.size=16384
#batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms
#延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理
jmw.kafka.batch.linger.ms=1
#producer可以用来缓存数据的内存大小。
jmw.kafka.buffer.memory=33554432
producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer


#—————————————————————————————————消费者———————————————————————————————————————————
jmw.kafka.consumer.servers=xxxx:xxx
jmw.kafka.consumer.topic=xxx
jmw.kafka.consumer.group.id=xxx

jmw.kafka.enable.auto.commit=true
jmw.kafka.auto.commit.interval.ms=1000
jmw.kafka.auto.offset.reset=latest
jmw.kafka.session.timeout.ms=30000
consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
3. 消费者 配置自动读取
package cn.com.kaf.configuration;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


@Component
public class ConsumerApplicaitonProperties implements InitializingBean {

    @Value("${jmw.kafka.consumer.servers}")
    private String serverHostPort;

    @Value("${jmw.kafka.enable.auto.commit}")
    private String enableAutoCommit;

    @Value("${jmw.kafka.auto.commit.interval.ms}")
    private String autoCommitInterval;

    @Value("${jmw.kafka.auto.offset.reset}")
    private String autoOffsetReset;

    @Value("${jmw.kafka.session.timeout.ms}")
    private String sessionTimeout;

    @Value("${jmw.kafka.consumer.topic}")
    private String consumerTopic;

    @Value("${jmw.kafka.consumer.group.id}")
    private String consumerGroupId;

    @Value("${consumer.key.deserializer}")
    private String key;

    @Value("${consumer.value.deserializer}")
    private String value;

    
    public static String KAFKA_CONSUMER_SERVER_HOST_PORT;

    
    public static String KAFKA_CONSUMER_ENABLE_AUTO_COMMIT;

    
    public static String KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL;

    
    public static String KAFKA_CONSUMER_AUTO_OFFSET_RESET;

    
    public static String KAFKA_CONSUMER_SESSION_TIMEOUT;

    
    public static String KAFKA_CONSUMER_TOPIC;

    
    public static String KAFKA_CONSUMER_GROUP_ID;

    public static String KAFKA_KEY_SERIALIZER ;
    public static String KAFKA_VALUE_SERIALIZER ;

    @Override
    public void afterPropertiesSet() throws Exception {
        KAFKA_CONSUMER_ENABLE_AUTO_COMMIT = enableAutoCommit;
        KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL = autoCommitInterval;
        KAFKA_CONSUMER_AUTO_OFFSET_RESET = autoOffsetReset;
        KAFKA_CONSUMER_SESSION_TIMEOUT = sessionTimeout;
        KAFKA_CONSUMER_TOPIC = consumerTopic;
        KAFKA_CONSUMER_GROUP_ID = consumerGroupId;
        KAFKA_KEY_SERIALIZER = key;
        KAFKA_VALUE_SERIALIZER = value;
        KAFKA_CONSUMER_SERVER_HOST_PORT = serverHostPort;
    }
}

4. 生产者 配置自动读取
package cn.com.kaf.configuration;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


@Component
public class ProducergApplicationProperties implements InitializingBean {

    @Value("${jmw.kafka.producer.servers}")
    private String serverHostPort;

    @Value("${jmw.kafka.producer.topic}")
    private String producerTopic;

    @Value("${jmw.kafka.ack}")
    private String acks;

    @Value("${jmw.kafka.retries}")
    private String retries;

    @Value("${jmw.kafka.batch.size}")
    private String batchSize;

    @Value("${jmw.kafka.batch.linger.ms}")
    private String batchLingerMs;

    @Value("${jmw.kafka.buffer.memory}")
    private String bufferMemory;

    @Value("${producer.key.serializer}")
    private String key;

    @Value("${producer.value.serializer}")
    private String value;

    
    public static String KAFKA_SERVER_HOST_PORT;


    
    public static String KAFKA_ACKS;

    
    public static String KAFKA_RETRIES;

    
    public static String KAFKA_BATCH_SIZE;

    
    public static String KAFKA_BATCH_LINGER;

    
    public static String KAFKA_CACHE_MEMORY;

    
    public static String KAFKA_PRODUCER_TOPIC;

    public static String KAFKA_KEY_SERIALIZER ;
    public static String KAFKA_VALUE_SERIALIZER ;

    @Override
    public void afterPropertiesSet() throws Exception {
        KAFKA_SERVER_HOST_PORT = serverHostPort;
        KAFKA_ACKS = acks;
        KAFKA_RETRIES = retries;
        KAFKA_BATCH_SIZE = batchSize;
        KAFKA_BATCH_LINGER = batchLingerMs;
        KAFKA_CACHE_MEMORY = bufferMemory;
        KAFKA_PRODUCER_TOPIC = producerTopic;
        KAFKA_KEY_SERIALIZER = key;
        KAFKA_VALUE_SERIALIZER = value;
    }
}

5. 消费者工具
package cn.com.kaf.consumer;

import cn.com.kaf.configuration.ConsumerApplicaitonProperties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.Properties;


@Component
public class ConsumerFactoryTool extends ConsumerApplicaitonProperties{

    private static KafkaConsumer consumer = null;

    
    public static KafkaConsumer SingleCase(){
        KafkaConsumer kafkaConsumer = consumer;
        kafkaConsumer.subscribe(Arrays.asList(KAFKA_CONSUMER_TOPIC));
        return kafkaConsumer;
    }

    private ConsumerFactoryTool() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_CONSUMER_SERVER_HOST_PORT);
        properties.put("key.deserializer", KAFKA_KEY_SERIALIZER);
        properties.put("value.deserializer", KAFKA_VALUE_SERIALIZER);
        properties.put("group.id", KAFKA_CONSUMER_GROUP_ID);
        properties.put("enable.auto.commit", KAFKA_CONSUMER_ENABLE_AUTO_COMMIT);
        properties.put("auto.commit.interval.ms", KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL);
        properties.put("auto.offset.reset", KAFKA_CONSUMER_AUTO_OFFSET_RESET);
        properties.put("session.timeout.ms", KAFKA_CONSUMER_SESSION_TIMEOUT);

        consumer = new KafkaConsumer<>(properties);
    }


}

6.生产者工具
package cn.com.kaf.producer;

import cn.com.kaf.configuration.ProducergApplicationProperties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Component;

import java.util.Properties;


@Component
public class ProducerFactoryTool extends ProducergApplicationProperties {

    private static KafkaProducer producer = null;

    
    public static KafkaProducer SingleCase(String data){
        KafkaProducer kafkaProducer = producer;
        kafkaProducer.send(new ProducerRecord(KAFKA_PRODUCER_TOPIC, data));
        return kafkaProducer;
    }

    private ProducerFactoryTool() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers",KAFKA_SERVER_HOST_PORT);//xxx服务器ip
        properties.put("acks",KAFKA_ACKS);//所有follower都响应了才认为消息提交成功,即"committed"
        properties.put("retries",KAFKA_RETRIES);//retries = MAX 无限重试,直到你意识到出现了问题:)
        properties.put("batch.size", KAFKA_BATCH_SIZE);//producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数
        //batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms
        properties.put("linger.ms", KAFKA_BATCH_LINGER);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理
        properties.put("buffer.memory", KAFKA_CACHE_MEMORY);//producer可以用来缓存数据的内存大小。
        properties.put("key.serializer", KAFKA_KEY_SERIALIZER);
        properties.put("value.serializer", KAFKA_KEY_SERIALIZER);

        producer = new KafkaProducer<>(properties);
    }
}

7.测试 生产消费
import cn.com.kaf.DemoApplication;
import cn.com.kaf.consumer.ConsumerFactoryTool;
import cn.com.kaf.producer.ProducerFactoryTool;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.SpringApplication;


public class Te {

    @Before
    public void StBefore() {
        String[] args = new String[0];
        SpringApplication.run(DemoApplication.class, args);
    }

    @Test
    public void pullinkafka() throws InterruptedException {

        KafkaConsumer kafkaConsumer = ConsumerFactoryTool.SingleCase();

        while (true) {
            ConsumerRecords records = kafkaConsumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.println("-----------------");
                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                System.out.println();
            }
        }
    }

    @Test
    public void pushInKafka() throws InterruptedException {
        String data = "";
        KafkaProducer kafkaProducer = ProducerFactoryTool.SingleCase(data);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5638393.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存