Java实现kafka消费者

Java实现kafka消费者,第1张

Java实现kafka消费者

 消费者基本配置及代码:

#spring.kafka.client-id=
spring.kafka.bootstrap-servers=localhost:9092
###########【消费者基本配置】###########
#spring.kafka.consumer.client-id=
spring.kafka.consumer.properties.group.id=test01
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval-ms=1000
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.properties.session.timeout.ms=120000
spring.kafka.consumer.properties.request.timeout.ms=180000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.listener.concurrency=4
spring.kafka.listener.missing-topics-fatal=false
@Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;

    @Value("${spring.kafka.consumer.auto-commit-interval-ms}")
    private Integer autoCommitInterval;


    @Value("3000")
    private Integer maxNum;
    @Value("1048576")//最大数据大小为10M
    private Integer maxBytes;


    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;
    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valDeserializer;
    
    @Bean
    public KafkaListenerContainerFactory> kafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_ConFIG
        factory.setBatchListener(true);
        //设置要监听的线程数。(设置10个消费者进行消费。)
        factory.setConcurrency(10);
        return factory;
    }
    @Bean
    public Map consumerConfigs() {   
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valDeserializer);
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,maxBytes);//每一批
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxNum);//每一批最多数量
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return props;
    }
}

 代码实现消费者连接kafka消费数据到es库:

@Component
public class NewWebsitealertConsumer {

    @Autowired
    private RestHighLevelClient client;

    @Value("test_index")
    private String TEST_INDEX;

    // 配置要监听的主题、消费者组、消费者工厂
    @KafkaListener(
            id = "nw0",
            groupId = "TESTCONSUMER",
            topics = test01,
            containerFactory = "kafkaListenerFactory"
    )
    public void newbulkWebsitealert(List> records){
        if (CollectionUtils.isEmpty(records)) {
            return;
        }
        try {
            BulkRequest firstBulkRequest = new BulkRequest();  // 不新建 会 重复插入,不然就指定id

            for (ConsumerRecord record : records) {
                TestData value = JSON.parseObject(record.value().toString(), TestData.class);
                String id = DigestUtils.md5Hex(value.getId());
                IndexRequest indexRequest = new IndexRequest(TEST_INDEX);
                indexRequest.id(id);
                indexRequest.source(JSON.toJSonString(value), XContentType.JSON);//推荐添加string类型的,并且是json格式
                firstBulkRequest.add(indexRequest);//可以添加其他的请求的
            }

            client.bulk(firstBulkRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5636347.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存