如何编写一个kafka生产者?
1、pom文件中导入
org.apache.kafka kafka_2.122.1.1 org.projectlombok lombokprovided
2、yml配置文件中加入kafka配置
spring: kafka: producer: bootstrap-servers: ip:prot batch-size: 16384 retries: 0 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
3、新建一个类
@Component public class KafkaProducerTest implements Runnable { private KafkaProducerkafkaProducer; private final String topic = "test_producer"; @Value("${spring.kafka.producer.bootstrap-servers}") private String address; @Value("${spring.kafka.producer.key-serializer}") private String key; @Value("${spring.kafka.producer.value-serializer}") private String value; private Properties initProperties() { // zookeeper 配置 Properties props = new Properties(); props.put("bootstrap.servers", address); // group 代表一个消费组 props.put("group.id", "kafkaProducer"); props.put("session.timeout.ms", "30000"); // 往zookeeper上写offset的频率 props.put("auto.commit.interval.ms", "1000"); // key的反序列化类型 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 设置value的反序列化类型 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); return props; } public void senMessage(int key, String value) { kafkaProducer.send(new ProducerRecord(topic, value)); } //开启线程,往kfaka中插入一百万数据 public void run() { kafkaProducer = new KafkaProducer (initProperties()); boolean result = true; int i = 0; while (result) { i++; senMessage(i, "这是第" + i + "条数据。"); if (i > 1000000) { result = false; } } System.out.println("共插入数量:" + i); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)