springboot整合kafka:
导入依赖:
org.apache.kafka
kafka-clients
2.1.0
org.springframework.kafka
spring-kafka
2.4.1.RELEASE
注意导入的版本,如果导入版本不合适可能会引起冲突,导致项目无法启动;
接收数据导入kafka:
1.根据自己的情况对接收数据进行序列化,kafka有自己的序列化可供适使用,由于通常使用实体类接收数据,此处以实体类简介:
编写一个序列化工具类,进行实体类的序列化:
public class SerializerUtils implements Serializer {
@Override
public void configure(Map
} @Override public byte[] serialize(String s, T t) { return JSON.toJSonBytes(t); } @Override public void close() { }
}
重写serialize方法;
对需要存储的实体类数据使用上面的工具类转化位byte数组;
新建ProducerRecord
ProducerRecord
新建producer:
public KafkaProducer createProducer() {
String jaasTemplate = “org.apache.kafka.common.security.scram.ScramLoginModule required username=”%s" password="%s";";
String jaasCfg = String.format(jaasTemplate, "production_testing_rw", "production_testing_rw"); Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); 权限校验 properties.setProperty("security.protocol", "SASL_PLAINTEXT"); properties.setProperty("sasl.mechanism", "SCRAM-SHA-256"); properties.setProperty("sasl.jaas.config", jaasCfg); producer = new KafkaProducer(properties); return producer; }
}
发送消息至kafka:
try {
Future future = this.createProducer().send(producerRecord);
future.get();
}catch (Exception e){
e.printStackTrace();//连接错误、No Leader错误都可以通过重试解决;消息太大这类错误kafkaProducer不会进行任何重试,直接抛出异常
}
发送END;;;
从kafka取出刚才的消息:
监听自己所需要取的数据topic;
取出数据并进行反序列化;
public class DeSerializerUtils implements Deserializer {
@Override
public void configure(Map
} @Override public T deserialize(String topic, byte[] data) { return JSON.parseObject(data, T.class); } @Override public void close() { }
}
存入相应数据库;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)