消费者基本配置及代码:
#spring.kafka.client-id= spring.kafka.bootstrap-servers=localhost:9092 ###########【消费者基本配置】########### #spring.kafka.consumer.client-id= spring.kafka.consumer.properties.group.id=test01 spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval-ms=1000 spring.kafka.consumer.auto-offset-reset=latest spring.kafka.consumer.properties.session.timeout.ms=120000 spring.kafka.consumer.properties.request.timeout.ms=180000 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #spring.kafka.listener.concurrency=4 spring.kafka.listener.missing-topics-fatal=false
@Configuration public class KafkaConfiguration { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.enable-auto-commit}") private Boolean autoCommit; @Value("${spring.kafka.consumer.auto-commit-interval-ms}") private Integer autoCommitInterval; @Value("3000") private Integer maxNum; @Value("1048576")//最大数据大小为10M private Integer maxBytes; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${spring.kafka.consumer.key-deserializer}") private String keyDeserializer; @Value("${spring.kafka.consumer.value-deserializer}") private String valDeserializer; @Bean public KafkaListenerContainerFactory> kafkaListenerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_ConFIG factory.setBatchListener(true); //设置要监听的线程数。(设置10个消费者进行消费。) factory.setConcurrency(10); return factory; } @Bean public Map consumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valDeserializer); props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,maxBytes);//每一批 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxNum);//每一批最多数量 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return props; } }
代码实现消费者连接kafka消费数据到es库:
@Component public class NewWebsitealertConsumer { @Autowired private RestHighLevelClient client; @Value("test_index") private String TEST_INDEX; // 配置要监听的主题、消费者组、消费者工厂 @KafkaListener( id = "nw0", groupId = "TESTCONSUMER", topics = test01, containerFactory = "kafkaListenerFactory" ) public void newbulkWebsitealert(List> records){ if (CollectionUtils.isEmpty(records)) { return; } try { BulkRequest firstBulkRequest = new BulkRequest(); // 不新建 会 重复插入,不然就指定id for (ConsumerRecord, ?> record : records) { TestData value = JSON.parseObject(record.value().toString(), TestData.class); String id = DigestUtils.md5Hex(value.getId()); IndexRequest indexRequest = new IndexRequest(TEST_INDEX); indexRequest.id(id); indexRequest.source(JSON.toJSonString(value), XContentType.JSON);//推荐添加string类型的,并且是json格式 firstBulkRequest.add(indexRequest);//可以添加其他的请求的 } client.bulk(firstBulkRequest, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)