Kafka生产者和消费者以及自定义序列化和反序列化

Kafka生产者和消费者以及自定义序列化和反序列化,第1张

Kafka生产者和消费者以及自定义序列化和反序列化 一.生产者消费者 1.生产者生产消息方式

同步方式

public class Myproducer1 {
    public static void main(String[] args) {
        Map configs = new HashMap<>();
        configs.put("bootstrap.servers", "Linux122:9092");
        configs.put("key.serializer", IntegerSerializer.class);
        configs.put("value.serializer", StringSerializer.class);
        KafkaProducer producer = new KafkaProducer<>(configs);
        ArrayList
headers = new ArrayList<>(); headers.add(new RecordHeader("biz.name", "producer.demo".getBytes())); ProducerRecord producerRecord = new ProducerRecord("topic_1", 0, 0, "hello laogu 0 ", headers); // Future future = producer.send(producerRecord); // try { // Recordmetadata recordmetadata = future.get(); // System.out.println("消息的主题:" + recordmetadata.topic()); // System.out.println("消息的分区号:" + recordmetadata.partition()); // System.out.println("消息的偏移量:" + recordmetadata.offset()); // } catch (InterruptedException | ExecutionException e) { // e.printStackTrace(); // }finally { // producer.close(); // }

异步方式

        producer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(Recordmetadata recordmetadata, Exception e) {
                if (e != null) {
                    System.out.println("异常消息" + e.getMessage());
                } else {
                    System.out.println("消息的主题:" + recordmetadata.topic());
                    System.out.println("消息的分区号:" + recordmetadata.partition());
                    System.out.println("消息的偏移量:" + recordmetadata.offset());
                }
            }
        });

        producer.close();
    }
}
2.消费者消费消息的方式
public class MyConsumer {
    public static void main(String[] args) {
        HashMap configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Linux122:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        KafkaConsumer consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList("topic_1"));
        while (true) {
            ConsumerRecords consumerRecords = consumer.poll(3000);
            for (ConsumerRecord consumerRecord : consumerRecords) {
                System.out.println("topic = " + consumerRecord.topic() + " partition = " + consumerRecord.partition() + " offset = " + consumerRecord.offset() + " key = " + consumerRecord.key() + " value = " + consumerRecord.value());
            }
        }
    }
}
3.Kafka和Spring boot整合

首先需要配置application.properties文件

spring.application.name=springboot-kafka-02
server.port=8080

#kafka配置
spring.kafka.bootstrap-servers=Linux122:9092

#producer配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

#consumer配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=springboot-consumer02
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
(1)同步生产者
@RestController
public class KafkaSyncProducerController {
    @Autowired
    private KafkaTemplate template;

    @RequestMapping("send/sync/{message}")
    public String send(@PathVariable String message){
        ListenableFuture> future = template.send("topic-spring-01", 0, 0, message);
        try {
            SendResult sendResult = future.get();
            Recordmetadata recordmetadata = sendResult.getRecordmetadata();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        return "success";
    }
}
(2)异步生产者
@RestController
public class KafkaASyncProducerController {
    @Autowired
    private KafkaTemplate template;

    @RequestMapping("send/async/{message}")
    public String send(@PathVariable String message) {
        ListenableFuture> future = template.send("topic-spring-01", 0, 0, message);

        future.addCallback(new ListenableFutureCallback>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("发送消息失败 : " + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult result) {
                Recordmetadata recordmetadata = result.getRecordmetadata();
            }
        });
        return "success";
    }
}

我们在和Spring boot整合时候发现没有主动去创建主题等信息,但是运行后发现存在这是由于spring配置中自动做了这些工作.

我们也可以对其进行自定义,类似的对象能重新覆盖定义
eg:Spring boot中自定义Kafka中KafkaConfig

@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic topic1(){
        return new NewTopic("nptc-01",1,(short) 1);
    }

    @Bean
    public NewTopic topic2(){
        return new NewTopic("nptc-02",5,(short) 1);
    }
}
(3)消费者
@Component
public class Myconsumer {
    @KafkaListener(topics = "topic-spring-01")
    public void onMessage(ConsumerRecord record) {
        System.out.println("消费者收到消息 消息的topic = " + record.topic() + " 消息的value = " + record.value() + " 消息的partition = " + record.partition() + " 消息的offset = " + record.offset());
    }
}
二.自定义序列化和反序列化

(1)自定义Object

public class User {
    private Integer userId;
    private String userName;

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    @Override
    public String toString() {
        return "User{" +
                "userId=" + userId +
                ", userName='" + userName + ''' +
                '}';
    }
}

(2)自定义序列化器

public class UserSerializer implements Serializer {
    @Override
    public void configure(Map configs, boolean isKey) {
        Serializer.super.configure(configs, isKey);
        //用于接收对序列化器的配置参数,并对当前的序列化器进行配置和初始化
    }

    @Override
    public byte[] serialize(String s, User user) {
        if (user == null) {
            return null;
        }
        Integer userId = user.getUserId();
        String userName = user.getUserName();
        if (userId != null && userName != null) {
            try {
                byte[] bytes = userName.getBytes("UTF-8");
                int length = bytes.length;
                ByteBuffer byteBuffer = ByteBuffer.allocate(4+4+length);
                byteBuffer.putInt(userId);
                byteBuffer.putInt(length);
                byteBuffer.put(bytes);
                return byteBuffer.array();
            } catch (Exception e) {
                throw new SerializationException("User Serialization Exception!");
            }
        }
        return null;
    }

    @Override
    public byte[] serialize(String topic, Headers headers, User data) {
        return Serializer.super.serialize(topic, headers, data);
    }

    @Override
    public void close() {
        Serializer.super.close();
        //幂等调用即调用多次效果和一次一样
    }
}

(3).使用自定义序列化器进行发送消息

public class MyProducer {
    public static void main(String[] args) {
        Map configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Linux122:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);
        KafkaProducer userKafkaProducer = new KafkaProducer<>(configs);
        User user = new User();
        user.setUserId(111);
        user.setUserName("good go");
        ProducerRecord userProducerRecord = new ProducerRecord("tp_user_01", user.getUserName(), user);
        userKafkaProducer.send(userProducerRecord, new Callback() {
            @Override
            public void onCompletion(Recordmetadata recordmetadata, Exception e) {
                if (e == null) {
                    System.out.println("userProducerRecord 发送成功,返回的信息是:topic = " + recordmetadata.topic() + " offset = " + recordmetadata.offset() + " partition = " + recordmetadata.partition());
                }
            }
        });

        userKafkaProducer.close();
    }
}

(4).自定义反序列化器

public class UserDeserializer implements Deserializer {
    @Override
    public void configure(Map configs, boolean isKey) {
        Deserializer.super.configure(configs, isKey);
    }

    @Override
    public User deserialize(String topic, byte[] data) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(data.length);
        byteBuffer.put(data);
        byteBuffer.flip();//类似重置 *** 作
        int userId = byteBuffer.getInt();
        int length = byteBuffer.getInt();
        String userName = new String(data,8,length);
        return new User(userId,userName);
    }

    @Override
    public User deserialize(String topic, Headers headers, byte[] data) {
        return Deserializer.super.deserialize(topic, headers, data);
    }

    @Override
    public void close() {
        Deserializer.super.close();
    }
}

(5)使用自定义反序列化器进行消费消息

public class MyConsumer {
    public static void main(String[] args) {
        Map configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "Linux122:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");


        KafkaConsumer kafkaConsumer = new KafkaConsumer(configs);
        kafkaConsumer.subscribe(Collections.singleton("tp_user_01"));
        int i = 0;
        while (true) {
            ConsumerRecords consumerRecords = kafkaConsumer.poll(5000);
            final int index = i++;
            consumerRecords.forEach(new Consumer>() {
                @Override
                public void accept(ConsumerRecord consumerRecord) {
                    System.out.println("第" + index + "批次收到消息key = " + consumerRecord.key() + " value = " + consumerRecord.value());
                }
            });
        }
        //kafkaConsumer.close();
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5573666.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存