2.创建配置2.6.0 org.springframework.boot spring-boot-starter-weborg.springframework.kafka spring-kafka${kafka.version}
配置application.yaml文件
spring: kafka: # kafka 服务地址 bootstrap-servers: ip:9092 # 消费者配置 consumer: auto-commit-interval: 5000 #自动提交消费位移时间隔时间 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 500 #批量消费每次最多消费多少条消息 enable-auto-commit: true #开启自动提交消费位移 auto-offset-reset: latest #其他earliest、none group-id: kafka.consumer.group #消费者组名称 client-id: kafka.consumer.client.id #消费者客户端ID fetch-max-wait: 400 #最大等待时间 fetch-min-size: 1 #最小消费字节数 heartbeat-interval: 3000 #分组管理时心跳到消费者协调器之间的预计时间 isolation-level: read_committed topic-name: huachun # 生产者配置 producer: batch-size: 16384 #批次大小,默认16k acks: -1 #ACK应答级别,指定分区中必须要有多少个副本收到消息之后才会认为消息成功写入,默认为1只要分区的leader副本成功写入消息;0表示不需要等待任何服务端响应;-1或all需要等待ISR中所有副本都成功写入消息 retries: 3 #重试次数 value-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化 key-serializer: org.apache.kafka.common.serialization.StringSerializer buffer-memory: 33554432 #缓冲区大小,默认32M client-id: kafka.producer.client.id #客户端ID compression-type: none #消息压缩方式,默认为none,另外有gzip、snappy、lz4 properties: retry.backoff.ms: 100 #重试时间间隔,默认100 linger.ms: 0 #默认为0,表示批量发送消息之前等待更多消息加入batch的时间 max.request.size: 1048576 #默认1MB,表示发送消息最大值 connections.max.idle.ms: 540000 #默认9分钟,表示多久后关闭限制的连接 receive.buffer.bytes: 32768 #默认32KB,表示socket接收消息缓冲区的大小,为-1时使用 *** 作系统默认值 send.buffer.bytes: 131072 #默认128KB,表示socket发送消息缓冲区大小,为-1时使用 *** 作系统默认值 request.timeout.ms: 30000 #默认30000ms,表示等待请求响应的最长时间 topic-name: huachun3.创建生产者服务类 1.生产者接口
package com.hhmt.delivery.service; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.messaging.Message; import java.util.concurrent.ExecutionException; public interface ProducerService { void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException; void sendMessage(String topic, String data); void sendMessage(ProducerRecord2.生产者实现类record); void sendMessage(Message message); void sendMessage(String topic, String key, String data); void sendMessage(String topic, Integer partition, String key, String data); void sendMessage(String topic, Integer partition, Long timestamp, String key, String data); }
package com.hhmt.delivery.service.impl; import com.hhmt.delivery.service.ProducerService; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.Recordmetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.concurrent.ExecutionException; @Service @RequiredArgsConstructor public class ProducerServiceImpl implements ProducerService { private final KafkaTemplate4.生产者发送消息测试 1.创建topickafkaTemplate; private static final Logger logger = LoggerFactory.getLogger(ProducerServiceImpl.class); @Override public void sendSyncMessage(String topic, String data) throws ExecutionException, InterruptedException { SendResult sendResult = kafkaTemplate.send(topic, data).get(); Recordmetadata recordmetadata = sendResult.getRecordmetadata(); logger.info("发送同步消息成功!发送的主题为:{}", recordmetadata.topic()); } @Override public void sendMessage(String topic, String data) { ListenableFuture > future = kafkaTemplate.send(topic, data); future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage())); } @Override public void sendMessage(ProducerRecord record) { ListenableFuture > future = kafkaTemplate.send(record); future.addCallback(new ListenableFutureCallback >() { @Override public void onFailure(Throwable throwable) { logger.error("发送消息失败!失败原因是:{}", throwable.getMessage()); } @Override public void onSuccess(SendResult sendResult) { Recordmetadata metadata = sendResult.getRecordmetadata(); logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); } }); } @Override public void sendMessage(Message message) { ListenableFuture > future = kafkaTemplate.send(message); future.addCallback(new ListenableFutureCallback >() { @Override public void onFailure(Throwable throwable) { logger.error("发送消息失败!失败原因是:{}", throwable.getMessage()); } @Override public void onSuccess(SendResult sendResult) { Recordmetadata metadata = sendResult.getRecordmetadata(); logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); } }); } @Override public void sendMessage(String topic, String key, String data) { ListenableFuture > future = kafkaTemplate.send(topic, key, data); future.addCallback(success -> logger.info("发送消息成功!"), failure -> logger.error("发送消息失败!失败原因是:{}", failure.getMessage())); } @Override public void sendMessage(String topic, Integer partition, String key, String data) { ListenableFuture > future = kafkaTemplate.send(topic, partition, key, data); future.addCallback(new ListenableFutureCallback >() { @Override public void onFailure(Throwable throwable) { logger.error("发送消息失败!失败原因是:{}", throwable.getMessage()); } @Override public void onSuccess(SendResult sendResult) { Recordmetadata metadata = sendResult.getRecordmetadata(); logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); } }); } @Override public void sendMessage(String topic, Integer partition, Long timestamp, String key, String data) { ListenableFuture > future = kafkaTemplate.send(topic, partition, timestamp, key, data); future.addCallback(new ListenableFutureCallback >() { @Override public void onFailure(Throwable throwable) { logger.error("发送消息失败!失败原因是:{}", throwable.getMessage()); } @Override public void onSuccess(SendResult sendResult) { Recordmetadata metadata = sendResult.getRecordmetadata(); logger.info("发送消息成功!消息主题是:{},消息分区是:{}", metadata.topic(), metadata.partition()); } }); } }
bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic huachun --bootstrap-server localhost:90922.向指定topic发送消息
package com.hhmt.delivery; import com.hhmt.delivery.service.ProducerService; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.context.junit4.SpringRunner; import java.util.HashMap; import java.util.Map; @RunWith(SpringRunner.class) @SpringBootTest public class TestKafka { @Autowired private ProducerService producerService; @Autowired private KafkaTemplate1.出现的问题kafkaTemplate; @Value("${spring.kafka.producer.topic-name}") private String topicName; @Test public void cotextLoads() { producerService.sendMessage(topicName, "springboot"); } @Test public void sendMessage1() { ProducerRecord producerRecord = new ProducerRecord (topicName, 0, System.currentTimeMillis(), "topic-key", "测试"); producerRecord.headers().add("user", "zhangsan".getBytes()); producerService.sendMessage(producerRecord); } @Test public void sendMessage2() { String event = "测试"; Map map = new HashMap<>(); map.put("user", "zhangsan"); MessageHeaders headers = new MessageHeaders(map); Message message = MessageBuilder.createMessage(event, headers); kafkaTemplate.setDefaultTopic(topicName); producerService.sendMessage(message); } }
此时发送kafka消息会出现一个问题
定位后发现使用的是云主机,有内网IP和外网IP,虚拟机对外ip[暴露的ip]和真实ip[ifconfig显示的ip]可能只是映射关系,用户访问对外ip时,OpenStack会转发到对应的真实ip实现访问。但此时如果 Kafka server.properties配置中的listeners=PLAINTEXT://对外IP:9092中的ip配置为[对外ip]的时候无法启动,因为socket无法绑定监听
2.解决方案在kafka的server.properties中添加如下内容
listeners=PLAINTEXT://内网:9092 advertised.host.name=外网ip或者域名 advertised.listeners=PLAINTEXT://外网ip或者域名:9092
重启kafka服务测试,消息发送成功...
参考原文:关于kafka的Cannot assign requested address_APTX4869_YXW的博客-CSDN博客
参考原文:kafka无法启动,Cannot assign requested address._MysticalYcc的博客-CSDN博客
5.消费者 1.消费指定topic消息package com.hhmt.delivery; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component @Slf4j public class TestCustomerKafka { @KafkaListener(topics = {"${spring.kafka.consumer.topic-name}"}) public void listenerMessage(ConsumerRecord2.测试消费消息record) { log.info("接收到kafka消息键为:{},消息值为:{},消息头为:{},消息分区为:{},消息主题为:{}", record.key(), record.value(), record.headers(), record.partition(), record.topic()); } }
package com.hhmt.delivery.task; import com.hhmt.delivery.service.ProducerService; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import java.text.SimpleDateFormat; import java.util.Date; @Configuration @EnableScheduling public class KafkaTask { @Autowired private ProducerService producerService; @Value("${spring.kafka.producer.topic-name}") private String topicName; @Scheduled(cron = "*/1 * * * * *") public void cotextLoads() { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); String date = sdf.format(new Date()); ProducerRecordproducerRecord = new ProducerRecord (topicName, 0, System.currentTimeMillis(), "topic-key" + date, "测试" + date); producerRecord.headers().add("user", "zhangsan".getBytes()); producerService.sendMessage(producerRecord); } }
说明:通过定时任务每秒钟发送一个消息,kafka消费者可以监听到。启动服务查看日志
在服务器使用客户端也可以收到发送的消息
bin/kafka-console-consumer.sh --topic huachun --from-beginning --bootstrap-server ip:9092
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)