业务上有大量从硬件采集到的数据通过Kafka入库GreenPlum数据库,虽然数据表已进行分区,每个分区少的有100+万条多的时候有1000+万条记录,现在有一个接口要获取最新的20条数据用来展示,即便是从单个分区上查询由于需要全量数据排序,时间长的时候需要7~8秒,这个时候就考虑直接从Kafka获取最新数据。
2.代码实现 2.1 配置信息这里只贴出使用到的配置信息。
# kafka的服务地址 spring: kafka: bootstrap-servers: 127.0.0.1:xxxx # tableName与topic的映射 tableNameKafkaTopic: mapping: "{"table_name":"topic_name"}"
2.2 映射对象cn.hutool hutool-all5.6.6
由于Kafka内的字段跟数据库的字段名称不同,这里要创建映射关系(仅保留几个字段用来说明问题)。
@Data @ApiModel(value = "数据封装对象", description = "用于对Kafka内的数据进行封装") public class DataRes implements Serializable { @ApiModelProperty(name = "LOCATION", value = "设备位置") @JsonProperty(value = "LOCATION") private String location; @ApiModelProperty(name = "IP", value = "设备ID") @JsonProperty(value = "IP") private String equip; @ApiModelProperty(name = "TME", value = "创建时间") @JsonProperty(value = "TME") private String tme; @ConstructorProperties({"LOCATION", "IP", "TME"}) public DataGsmRes(String location, String ip, String tme) { this.location = location; this.equip = ip; this.tme = tme; } }
Kafka的记录信息:
{"LOCATION":"河南郑州","IP":"xxxx","TME":"2022-01-12 15:29:55"}
接口返回的数据:
{ "location": "河南郑州", "equip": "xxxx", "tme": "2022-01-12 15:29:55" }2.3 代码实现
为了简洁删掉了一些业务相关的代码。
@Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${tableNameKafkaTopic.mapping}") private String tableNameKafkaTopicMapping; @Override public baseResult3.算法分析> queryNewest(Map mapParam) { // 参数解析(根据tableName获取对应的Kafka主题) String tableName = MapUtils.getString(mapParam, "table_name", ""); if (StringUtils.isBlank(tableName)) { return baseResult.getInstance(101, "数据源参数table_name不能为空!"); } // 获取equip信息用来筛选数据 JSonObject jsonParam = new JSONObject(); try { String paramStr = MapUtils.getString(mapParam, "param_json", ""); JSonObject json = JSONObject.parseObject(paramStr); if (json != null) { for (String key : json.keySet()) { jsonParam.put(key.toLowerCase(), json.get(key)); } } } catch (Exception e) { return baseResult.getInstance(102, "请求参数param_json非JSON格式"); } Object equip = jsonParam.get("equip"); if (equip == null || StringUtils.isBlank(equip.toString())) { return baseResult.getInstance(101, "请求参数param_json内的equip不能为空!"); } List equipList = Arrays.asList(equip.toString().split(",")); // 从Kafka获取的符合条件的数据条数 equipKey用于筛选数据 timeKey用于排序 String equipKey = "IP"; String timeKey = "TME"; int pageSize = MapUtils.getInteger(mapParam, "pageSize"); int querySize = 1000; int queryTime = 50; int queryTotal = querySize * queryTime; // 结果数据封装 List
- 筛选的是查询时最大偏移量向前queryTotal条数据,这个可以根据业务进行调整。重新封装Kafka数据的算法实际上是修改map对象key的方法。max.poll.records参数明确了每次poll的记录数便于统计。特别注意:当前代码适用的是Partition只有1️⃣个的情况,多个分区的情况需要先查询分区数,再轮询获取每个分区的最新数据,合并后重新排序。
// 仅查询了1个分区 TopicPartition topicPartition = new TopicPartition(topicName.toString(), 0); // 获取主题的分区列表 ListpartitionInfoList = consumer.partitionsFor(topicName.toString()); //Partition(topic = gp_gsmdata, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)