同步方式
public class Myproducer1 { public static void main(String[] args) { Mapconfigs = new HashMap<>(); configs.put("bootstrap.servers", "Linux122:9092"); configs.put("key.serializer", IntegerSerializer.class); configs.put("value.serializer", StringSerializer.class); KafkaProducer producer = new KafkaProducer<>(configs); ArrayList headers = new ArrayList<>(); headers.add(new RecordHeader("biz.name", "producer.demo".getBytes())); ProducerRecord producerRecord = new ProducerRecord("topic_1", 0, 0, "hello laogu 0 ", headers); // Future future = producer.send(producerRecord); // try { // Recordmetadata recordmetadata = future.get(); // System.out.println("消息的主题:" + recordmetadata.topic()); // System.out.println("消息的分区号:" + recordmetadata.partition()); // System.out.println("消息的偏移量:" + recordmetadata.offset()); // } catch (InterruptedException | ExecutionException e) { // e.printStackTrace(); // }finally { // producer.close(); // }
异步方式
producer.send(producerRecord, new Callback() { @Override public void onCompletion(Recordmetadata recordmetadata, Exception e) { if (e != null) { System.out.println("异常消息" + e.getMessage()); } else { System.out.println("消息的主题:" + recordmetadata.topic()); System.out.println("消息的分区号:" + recordmetadata.partition()); System.out.println("消息的偏移量:" + recordmetadata.offset()); } } }); producer.close(); } }2.消费者消费消息的方式
public class MyConsumer { public static void main(String[] args) { HashMap3.Kafka和Spring boot整合configs = new HashMap<>(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Linux122:9092"); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo"); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumer consumer = new KafkaConsumer<>(configs); consumer.subscribe(Arrays.asList("topic_1")); while (true) { ConsumerRecords consumerRecords = consumer.poll(3000); for (ConsumerRecord consumerRecord : consumerRecords) { System.out.println("topic = " + consumerRecord.topic() + " partition = " + consumerRecord.partition() + " offset = " + consumerRecord.offset() + " key = " + consumerRecord.key() + " value = " + consumerRecord.value()); } } } }
首先需要配置application.properties文件
spring.application.name=springboot-kafka-02 server.port=8080 #kafka配置 spring.kafka.bootstrap-servers=Linux122:9092 #producer配置 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 #consumer配置 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.group-id=springboot-consumer02 spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=1000(1)同步生产者
@RestController public class KafkaSyncProducerController { @Autowired private KafkaTemplate(2)异步生产者template; @RequestMapping("send/sync/{message}") public String send(@PathVariable String message){ ListenableFuture > future = template.send("topic-spring-01", 0, 0, message); try { SendResult sendResult = future.get(); Recordmetadata recordmetadata = sendResult.getRecordmetadata(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return "success"; } }
@RestController public class KafkaASyncProducerController { @Autowired private KafkaTemplatetemplate; @RequestMapping("send/async/{message}") public String send(@PathVariable String message) { ListenableFuture > future = template.send("topic-spring-01", 0, 0, message); future.addCallback(new ListenableFutureCallback >() { @Override public void onFailure(Throwable throwable) { System.out.println("发送消息失败 : " + throwable.getMessage()); } @Override public void onSuccess(SendResult result) { Recordmetadata recordmetadata = result.getRecordmetadata(); } }); return "success"; } }
我们在和Spring boot整合时候发现没有主动去创建主题等信息,但是运行后发现存在这是由于spring配置中自动做了这些工作.
我们也可以对其进行自定义,类似的对象能重新覆盖定义
eg:Spring boot中自定义Kafka中KafkaConfig
@Configuration public class KafkaConfig { @Bean public NewTopic topic1(){ return new NewTopic("nptc-01",1,(short) 1); } @Bean public NewTopic topic2(){ return new NewTopic("nptc-02",5,(short) 1); } }(3)消费者
@Component public class Myconsumer { @KafkaListener(topics = "topic-spring-01") public void onMessage(ConsumerRecord二.自定义序列化和反序列化record) { System.out.println("消费者收到消息 消息的topic = " + record.topic() + " 消息的value = " + record.value() + " 消息的partition = " + record.partition() + " 消息的offset = " + record.offset()); } }
(1)自定义Object
public class User { private Integer userId; private String userName; public Integer getUserId() { return userId; } public void setUserId(Integer userId) { this.userId = userId; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } @Override public String toString() { return "User{" + "userId=" + userId + ", userName='" + userName + ''' + '}'; } }
(2)自定义序列化器
public class UserSerializer implements Serializer{ @Override public void configure(Map configs, boolean isKey) { Serializer.super.configure(configs, isKey); //用于接收对序列化器的配置参数,并对当前的序列化器进行配置和初始化 } @Override public byte[] serialize(String s, User user) { if (user == null) { return null; } Integer userId = user.getUserId(); String userName = user.getUserName(); if (userId != null && userName != null) { try { byte[] bytes = userName.getBytes("UTF-8"); int length = bytes.length; ByteBuffer byteBuffer = ByteBuffer.allocate(4+4+length); byteBuffer.putInt(userId); byteBuffer.putInt(length); byteBuffer.put(bytes); return byteBuffer.array(); } catch (Exception e) { throw new SerializationException("User Serialization Exception!"); } } return null; } @Override public byte[] serialize(String topic, Headers headers, User data) { return Serializer.super.serialize(topic, headers, data); } @Override public void close() { Serializer.super.close(); //幂等调用即调用多次效果和一次一样 } }
(3).使用自定义序列化器进行发送消息
public class MyProducer { public static void main(String[] args) { Mapconfigs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Linux122:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class); KafkaProducer userKafkaProducer = new KafkaProducer<>(configs); User user = new User(); user.setUserId(111); user.setUserName("good go"); ProducerRecord userProducerRecord = new ProducerRecord ("tp_user_01", user.getUserName(), user); userKafkaProducer.send(userProducerRecord, new Callback() { @Override public void onCompletion(Recordmetadata recordmetadata, Exception e) { if (e == null) { System.out.println("userProducerRecord 发送成功,返回的信息是:topic = " + recordmetadata.topic() + " offset = " + recordmetadata.offset() + " partition = " + recordmetadata.partition()); } } }); userKafkaProducer.close(); } }
(4).自定义反序列化器
public class UserDeserializer implements Deserializer{ @Override public void configure(Map configs, boolean isKey) { Deserializer.super.configure(configs, isKey); } @Override public User deserialize(String topic, byte[] data) { ByteBuffer byteBuffer = ByteBuffer.allocate(data.length); byteBuffer.put(data); byteBuffer.flip();//类似重置 *** 作 int userId = byteBuffer.getInt(); int length = byteBuffer.getInt(); String userName = new String(data,8,length); return new User(userId,userName); } @Override public User deserialize(String topic, Headers headers, byte[] data) { return Deserializer.super.deserialize(topic, headers, data); } @Override public void close() { Deserializer.super.close(); } }
(5)使用自定义反序列化器进行消费消息
public class MyConsumer { public static void main(String[] args) { Mapconfigs = new HashMap<>(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Linux122:9092"); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class); configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1"); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1"); KafkaConsumer kafkaConsumer = new KafkaConsumer (configs); kafkaConsumer.subscribe(Collections.singleton("tp_user_01")); int i = 0; while (true) { ConsumerRecords consumerRecords = kafkaConsumer.poll(5000); final int index = i++; consumerRecords.forEach(new Consumer >() { @Override public void accept(ConsumerRecord consumerRecord) { System.out.println("第" + index + "批次收到消息key = " + consumerRecord.key() + " value = " + consumerRecord.value()); } }); } //kafkaConsumer.close(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)