- 1.pom依赖
- 2.application.properties 配置文件
- 3. 消费者 配置自动读取
- 4. 生产者 配置自动读取
- 5. 消费者工具
- 6.生产者工具
- 7.测试 生产消费
1.pom依赖
如果是springboot项目可以不指定版本,自动匹配
2.application.properties 配置文件org.apache.kafka kafka-clients2.6.0 org.apache.kafka kafka-streams2.6.0
server.port=2333 #—————————————————————————————————生产者——————————————————————————————————————————— #xxx服务器ip jmw.kafka.producer.servers=xxxx:xxx jmw.kafka.producer.topic=xxx #所有follower都响应了才认为消息提交成功,即"committed" jmw.kafka.ack=all #retries = MAX 无限重试,直到你意识到出现了问题: jmw.kafka.retries=0 #producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数 jmw.kafka.batch.size=16384 #batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms #延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理 jmw.kafka.batch.linger.ms=1 #producer可以用来缓存数据的内存大小。 jmw.kafka.buffer.memory=33554432 producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer #—————————————————————————————————消费者——————————————————————————————————————————— jmw.kafka.consumer.servers=xxxx:xxx jmw.kafka.consumer.topic=xxx jmw.kafka.consumer.group.id=xxx jmw.kafka.enable.auto.commit=true jmw.kafka.auto.commit.interval.ms=1000 jmw.kafka.auto.offset.reset=latest jmw.kafka.session.timeout.ms=30000 consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer3. 消费者 配置自动读取
package cn.com.kaf.configuration; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class ConsumerApplicaitonProperties implements InitializingBean { @Value("${jmw.kafka.consumer.servers}") private String serverHostPort; @Value("${jmw.kafka.enable.auto.commit}") private String enableAutoCommit; @Value("${jmw.kafka.auto.commit.interval.ms}") private String autoCommitInterval; @Value("${jmw.kafka.auto.offset.reset}") private String autoOffsetReset; @Value("${jmw.kafka.session.timeout.ms}") private String sessionTimeout; @Value("${jmw.kafka.consumer.topic}") private String consumerTopic; @Value("${jmw.kafka.consumer.group.id}") private String consumerGroupId; @Value("${consumer.key.deserializer}") private String key; @Value("${consumer.value.deserializer}") private String value; public static String KAFKA_CONSUMER_SERVER_HOST_PORT; public static String KAFKA_CONSUMER_ENABLE_AUTO_COMMIT; public static String KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL; public static String KAFKA_CONSUMER_AUTO_OFFSET_RESET; public static String KAFKA_CONSUMER_SESSION_TIMEOUT; public static String KAFKA_CONSUMER_TOPIC; public static String KAFKA_CONSUMER_GROUP_ID; public static String KAFKA_KEY_SERIALIZER ; public static String KAFKA_VALUE_SERIALIZER ; @Override public void afterPropertiesSet() throws Exception { KAFKA_CONSUMER_ENABLE_AUTO_COMMIT = enableAutoCommit; KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL = autoCommitInterval; KAFKA_CONSUMER_AUTO_OFFSET_RESET = autoOffsetReset; KAFKA_CONSUMER_SESSION_TIMEOUT = sessionTimeout; KAFKA_CONSUMER_TOPIC = consumerTopic; KAFKA_CONSUMER_GROUP_ID = consumerGroupId; KAFKA_KEY_SERIALIZER = key; KAFKA_VALUE_SERIALIZER = value; KAFKA_CONSUMER_SERVER_HOST_PORT = serverHostPort; } }4. 生产者 配置自动读取
package cn.com.kaf.configuration; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class ProducergApplicationProperties implements InitializingBean { @Value("${jmw.kafka.producer.servers}") private String serverHostPort; @Value("${jmw.kafka.producer.topic}") private String producerTopic; @Value("${jmw.kafka.ack}") private String acks; @Value("${jmw.kafka.retries}") private String retries; @Value("${jmw.kafka.batch.size}") private String batchSize; @Value("${jmw.kafka.batch.linger.ms}") private String batchLingerMs; @Value("${jmw.kafka.buffer.memory}") private String bufferMemory; @Value("${producer.key.serializer}") private String key; @Value("${producer.value.serializer}") private String value; public static String KAFKA_SERVER_HOST_PORT; public static String KAFKA_ACKS; public static String KAFKA_RETRIES; public static String KAFKA_BATCH_SIZE; public static String KAFKA_BATCH_LINGER; public static String KAFKA_CACHE_MEMORY; public static String KAFKA_PRODUCER_TOPIC; public static String KAFKA_KEY_SERIALIZER ; public static String KAFKA_VALUE_SERIALIZER ; @Override public void afterPropertiesSet() throws Exception { KAFKA_SERVER_HOST_PORT = serverHostPort; KAFKA_ACKS = acks; KAFKA_RETRIES = retries; KAFKA_BATCH_SIZE = batchSize; KAFKA_BATCH_LINGER = batchLingerMs; KAFKA_CACHE_MEMORY = bufferMemory; KAFKA_PRODUCER_TOPIC = producerTopic; KAFKA_KEY_SERIALIZER = key; KAFKA_VALUE_SERIALIZER = value; } }5. 消费者工具
package cn.com.kaf.consumer; import cn.com.kaf.configuration.ConsumerApplicaitonProperties; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.Properties; @Component public class ConsumerFactoryTool extends ConsumerApplicaitonProperties{ private static KafkaConsumer consumer = null; public static KafkaConsumer SingleCase(){ KafkaConsumer kafkaConsumer = consumer; kafkaConsumer.subscribe(Arrays.asList(KAFKA_CONSUMER_TOPIC)); return kafkaConsumer; } private ConsumerFactoryTool() { Properties properties = new Properties(); properties.put("bootstrap.servers", KAFKA_CONSUMER_SERVER_HOST_PORT); properties.put("key.deserializer", KAFKA_KEY_SERIALIZER); properties.put("value.deserializer", KAFKA_VALUE_SERIALIZER); properties.put("group.id", KAFKA_CONSUMER_GROUP_ID); properties.put("enable.auto.commit", KAFKA_CONSUMER_ENABLE_AUTO_COMMIT); properties.put("auto.commit.interval.ms", KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL); properties.put("auto.offset.reset", KAFKA_CONSUMER_AUTO_OFFSET_RESET); properties.put("session.timeout.ms", KAFKA_CONSUMER_SESSION_TIMEOUT); consumer = new KafkaConsumer<>(properties); } }6.生产者工具
package cn.com.kaf.producer; import cn.com.kaf.configuration.ProducergApplicationProperties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.stereotype.Component; import java.util.Properties; @Component public class ProducerFactoryTool extends ProducergApplicationProperties { private static KafkaProducer producer = null; public static KafkaProducer SingleCase(String data){ KafkaProducer kafkaProducer = producer; kafkaProducer.send(new ProducerRecord(KAFKA_PRODUCER_TOPIC, data)); return kafkaProducer; } private ProducerFactoryTool() { Properties properties = new Properties(); properties.put("bootstrap.servers",KAFKA_SERVER_HOST_PORT);//xxx服务器ip properties.put("acks",KAFKA_ACKS);//所有follower都响应了才认为消息提交成功,即"committed" properties.put("retries",KAFKA_RETRIES);//retries = MAX 无限重试,直到你意识到出现了问题:) properties.put("batch.size", KAFKA_BATCH_SIZE);//producer将试图批处理消息记录,以减少请求次数.默认的批量处理消息字节数 //batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms properties.put("linger.ms", KAFKA_BATCH_LINGER);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理 properties.put("buffer.memory", KAFKA_CACHE_MEMORY);//producer可以用来缓存数据的内存大小。 properties.put("key.serializer", KAFKA_KEY_SERIALIZER); properties.put("value.serializer", KAFKA_KEY_SERIALIZER); producer = new KafkaProducer<>(properties); } }7.测试 生产消费
import cn.com.kaf.DemoApplication; import cn.com.kaf.consumer.ConsumerFactoryTool; import cn.com.kaf.producer.ProducerFactoryTool; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.junit.Before; import org.junit.Test; import org.springframework.boot.SpringApplication; public class Te { @Before public void StBefore() { String[] args = new String[0]; SpringApplication.run(DemoApplication.class, args); } @Test public void pullinkafka() throws InterruptedException { KafkaConsumer kafkaConsumer = ConsumerFactoryTool.SingleCase(); while (true) { ConsumerRecordsrecords = kafkaConsumer.poll(100); for (ConsumerRecord record : records) { System.out.println("-----------------"); System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } } } @Test public void pushInKafka() throws InterruptedException { String data = ""; KafkaProducer kafkaProducer = ProducerFactoryTool.SingleCase(data); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)