java *** 作kafka

java *** 作kafka,第1张

java *** 作kafka

springboot整合kafka:
导入依赖:

org.apache.kafka
kafka-clients
2.1.0


org.springframework.kafka
spring-kafka
2.4.1.RELEASE

注意导入的版本,如果导入版本不合适可能会引起冲突,导致项目无法启动;
接收数据导入kafka:
1.根据自己的情况对接收数据进行序列化,kafka有自己的序列化可供适使用,由于通常使用实体类接收数据,此处以实体类简介:
编写一个序列化工具类,进行实体类的序列化:
public class SerializerUtils implements Serializer {
@Override
public void configure(Map map, boolean b) {

}

@Override
public byte[] serialize(String s, T t) {
    return JSON.toJSonBytes(t);
}

@Override
public void close() {

}

}
重写serialize方法;
对需要存储的实体类数据使用上面的工具类转化位byte数组;
新建ProducerRecord
ProducerRecord producerRecord = new ProducerRecord<>(topic, 1, bytes);
新建producer:

public KafkaProducer createProducer() {
String jaasTemplate = “org.apache.kafka.common.security.scram.ScramLoginModule required username=”%s" password="%s";";

    String jaasCfg = String.format(jaasTemplate, "production_testing_rw", "production_testing_rw");
    Properties properties = new Properties();

    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    权限校验
    properties.setProperty("security.protocol", "SASL_PLAINTEXT");
    properties.setProperty("sasl.mechanism", "SCRAM-SHA-256");

    properties.setProperty("sasl.jaas.config", jaasCfg);
    producer = new KafkaProducer(properties);
    return producer;
}

}
发送消息至kafka:
try {
Future future = this.createProducer().send(producerRecord);
future.get();
}catch (Exception e){
e.printStackTrace();//连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
}
发送END;;;
从kafka取出刚才的消息:
监听自己所需要取的数据topic;
取出数据并进行反序列化;
public class DeSerializerUtils implements Deserializer {
@Override
public void configure(Map configs, boolean isKey) {

}

@Override
public T deserialize(String topic, byte[] data) {
    return JSON.parseObject(data, T.class);
}

@Override
public void close() {

}

}
存入相应数据库;

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5636191.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存