Kafka【付诸实践 04】Java实现筛选查询Kafka符合条件的最新数据(保证数据最新+修改map对象key的方法+获取指定数量的记录 源码分享粘贴可用)

Kafka【付诸实践 04】Java实现筛选查询Kafka符合条件的最新数据(保证数据最新+修改map对象key的方法+获取指定数量的记录 源码分享粘贴可用),第1张

Kafka【付诸实践 04】Java实现筛选查询Kafka符合条件的最新数据(保证数据最新+修改map对象key的方法+获取指定数量的记录 源码分享粘贴可用) 1.需求说明

业务上有大量从硬件采集到的数据通过Kafka入库GreenPlum数据库,虽然数据表已进行分区,每个分区少的有100+万条多的时候有1000+万条记录,现在有一个接口要获取最新的20条数据用来展示,即便是从单个分区上查询由于需要全量数据排序,时间长的时候需要7~8秒,这个时候就考虑直接从Kafka获取最新数据。

2.代码实现 2.1 配置信息

这里只贴出使用到的配置信息。

# kafka的服务地址
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:xxxx
# tableName与topic的映射
tableNameKafkaTopic:
  mapping: "{"table_name":"topic_name"}"

	cn.hutool
	hutool-all
	5.6.6

2.2 映射对象

由于Kafka内的字段跟数据库的字段名称不同,这里要创建映射关系(仅保留几个字段用来说明问题)。

@Data
@ApiModel(value = "数据封装对象", description = "用于对Kafka内的数据进行封装")
public class DataRes implements Serializable {
    
    @ApiModelProperty(name = "LOCATION", value = "设备位置")
    @JsonProperty(value = "LOCATION")
    private String location;
   
    @ApiModelProperty(name = "IP", value = "设备ID")
    @JsonProperty(value = "IP")
    private String equip;

    @ApiModelProperty(name = "TME", value = "创建时间")
    @JsonProperty(value = "TME")
    private String tme;

    @ConstructorProperties({"LOCATION", "IP", "TME"})
    public DataGsmRes(String location, String ip, String tme) {
        this.location = location;
        this.equip = ip;
        this.tme = tme;
    }
}

Kafka的记录信息:

{"LOCATION":"河南郑州","IP":"xxxx","TME":"2022-01-12 15:29:55"}

接口返回的数据:

{
    "location": "河南郑州",
    "equip": "xxxx",
    "tme": "2022-01-12 15:29:55"
}
2.3 代码实现

为了简洁删掉了一些业务相关的代码。

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${tableNameKafkaTopic.mapping}")
    private String tableNameKafkaTopicMapping;

    @Override
    public baseResult> queryNewest(Map mapParam) {

        // 参数解析(根据tableName获取对应的Kafka主题)
        String tableName = MapUtils.getString(mapParam, "table_name", "");
        if (StringUtils.isBlank(tableName)) {
            return baseResult.getInstance(101, "数据源参数table_name不能为空!");
        }
        // 获取equip信息用来筛选数据
        JSonObject jsonParam = new JSONObject();
        try {
            String paramStr = MapUtils.getString(mapParam, "param_json", "");
            JSonObject json = JSONObject.parseObject(paramStr);
            if (json != null) {
                for (String key : json.keySet()) {
                    jsonParam.put(key.toLowerCase(), json.get(key));
                }
            }
        } catch (Exception e) {
            return baseResult.getInstance(102, "请求参数param_json非JSON格式");
        }
        Object equip = jsonParam.get("equip");
        if (equip == null || StringUtils.isBlank(equip.toString())) {
            return baseResult.getInstance(101, "请求参数param_json内的equip不能为空!");
        }
        List equipList = Arrays.asList(equip.toString().split(","));
        // 从Kafka获取的符合条件的数据条数 equipKey用于筛选数据 timeKey用于排序
        String equipKey = "IP";
        String timeKey = "TME";
        int pageSize = MapUtils.getInteger(mapParam, "pageSize");
        int querySize = 1000;
        int queryTime = 50;
        int queryTotal = querySize * queryTime;
        // 结果数据封装
        List rows = new ArrayList<>();
        List rowsSorted;
        // 从Kafka获取最新数据
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", "queryNewest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.poll.records", querySize);
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        // 查询主题数据
        Object topicName = JSONObject.parseObject(tableNameKafkaTopicMapping).get(tableName);
        if (topicName != null && StringUtils.isNotBlank(topicName.toString())) {
            TopicPartition topicPartition = new TopicPartition(topicName.toString(), 0);
            List topicPartitionList = Collections.singletonList(topicPartition);
            consumer.assign(topicPartitionList);
            consumer.seekToEnd(topicPartitionList);
            // 获取当前最大偏移量
            long currentPosition = consumer.position(topicPartition);
            int recordsCount;
            try {
                for (int i = 1; i <= queryTime; i++) {
                    long seekOffset = currentPosition - i * querySize;
                    consumer.seek(topicPartition, seekOffset > 0 ? seekOffset : 0);
                    ConsumerRecords records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
                    recordsCount = records.count();
                    for (ConsumerRecord record : records) {
                        queryTotal--;
                        Map map = JSONObject.parseObject(record.value());
                        String ip = MapUtils.getString(map, equipKey);
                        if (equipList.size() == 0 || equipList.contains(ip)) {
                            rows.add(map);
                        }
                    }
                    // 获取数据(达到 pageSize 或 queryTotal 或消息队列无数据 即停止查询)
                    if (rows.size() >= pageSize || queryTotal <= 0 || recordsCount <= 0) {
                        break;
                    }
                }
            } finally {
                consumer.close();
                // 重新排序
                String finalTimeKey = timeKey;
                rowsSorted = rows.stream()
                .sorted(Comparator.comparingLong(row -> -DateUtil.parse(row.get(finalTimeKey).toString(), "yyyy-MM-dd HH:mm:ss").getTime()))
                .collect(Collectors.toList());
            }
        } else {
            return baseResult.getInstance(301, "不存在" + tableName + "对应的KafkaTopic!");
        }
        // 结果封装(截取pageSize个结果并映射key值)
        List subList;
        if (rowsSorted.size() > pageSize) {
            subList = rowsSorted.subList(0, pageSize);
        } else {
            subList = rowsSorted;
        }
        List res = new ArrayList<>();
        PageEntity pageEntity = new PageEntity<>();
        // 重新封装Kafka数据(字段值映射)
        res = subList.stream()
                    .map(item -> JSONObject.parseObject(JSON.toJSONString(JSONObject.parseObject(item.toString(), DataRes.class)), Map.class))
                    .collect(Collectors.toList());
        } 
        pageEntity.setTotal(res.size());
        pageEntity.setRows(res);
        return baseResult.getInstance(pageEntity);
    }
3.算法分析
    筛选的是查询时最大偏移量向前queryTotal条数据,这个可以根据业务进行调整。重新封装Kafka数据的算法实际上是修改map对象key的方法。max.poll.records参数明确了每次poll的记录数便于统计。特别注意:当前代码适用的是Partition只有1️⃣个的情况,多个分区的情况需要先查询分区数,再轮询获取每个分区的最新数据,合并后重新排序。
// 仅查询了1个分区
TopicPartition topicPartition = new TopicPartition(topicName.toString(), 0);
// 获取主题的分区列表
List partitionInfoList = consumer.partitionsFor(topicName.toString());
//Partition(topic = gp_gsmdata, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5706170.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存