关于elasticsearch批量删除后立马查询还能查出被删除的数据的问题

关于elasticsearch批量删除后立马查询还能查出被删除的数据的问题,第1张

关于elasticsearch批量删除后立马查询还能查出被删除的数据的问题

目录
  • 相关环境:
  • 项目场景:
  • 问题描述:
  • 原因分析:
  • 解决方案:

相关环境:
  1. elasticsearch:7.7.1,三个节点的集群
  2. java: 1.8
项目场景: 商城搜索,两个 *** 作。
*** 作一:用户首次查询,将数据从es查询出来,再缓存到redis,之后的查询,直接读redis。
*** 作二:后台管理数据,删除es部分数据,重新添加这部分数据,再删除缓存。

问题描述: 上面的流程看起来是没有问题,但是删除es部分数据的时候还是出现了问题。

假设一下流程:

  1. 删除es部分数据
  2. 重新添加这部分数据
  3. 删除缓存
  4. 用户首次查询,将数据从es查询出来
  5. 再缓存到redis

流程很正常,不会有问题,但是如果es还来不及删除呢?就被查询出来并缓存到redis里面了,这个时候就会有问题了。

这里使用的是es的同步批量删除,核心批量删除代码如下

// 根据id批量删除
BulkRequest bulkRequest = new BulkRequest();
for (String id : ids) {
	bulkRequest.add(new DeleteRequest(index, id));
}
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println("耗时: " + bulkResponse.getTook());
if (bulkResponse.hasFailures()) {
	System.out.println("批量删除失败: " + bulkResponse.buildFailureMessage());
}

按照官网的解释,这个删除bulk为同步的批量删除,异步为bulkAsync,这里说的是批量删除,其实在这里可以做增删查改符合的批量 *** 作。


原因分析: 猜测:由于是在es集群上面进行 *** 作的,集群上面设置了索引的分片=5,副本=3,在执行同步删除 *** 作后,es返回了成功,但是并不是所有的副本都删除了,此时,有个查询从其他副本上面查询出了"已被删除"的数据。

代码抽离出来,demo如下:

@Test
public void originalTest() throws IOException {
	// 1. 查询出floorId等于29的所有数据
	SearchRequest searchRequest = new SearchRequest(index);
	searchRequest.source(SearchSourceBuilder.searchSource().size(1000).query(QueryBuilders.termQuery("floorId", 29)));
	SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
	System.out.println("首次查询: " + searchResponse);
	// 2. 记录这批数据的id
	SearchHit[] hits = searchResponse.getHits().getHits();
	List ids = new ArrayList<>(hits.length);
	for (SearchHit hit : hits) {
		Map map = hit.getSourceAsMap();
		ids.add(map.get("id").toString());
	}
	System.out.println("需要删除的id: " + ids);
	// 3根据id批量删除
	BulkRequest bulkRequest = new BulkRequest();
	bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
	for (String id : ids) {
		bulkRequest.add(new DeleteRequest(index, id));
	}
	BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
	System.out.println("耗时: " + bulkResponse.getTook());
	if (bulkResponse.hasFailures()) {
		System.out.println("批量删除失败: " + bulkResponse.buildFailureMessage());
	}
	// 4. 再查询出floorId等于29的数据
	searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
	System.out.println("删除后再次查询: " + searchResponse);
}
将es集群改成单节点,分片=1,副本=1,执行上面的单元测试,结果还是能查询出"被删除"的数据。

结论:查出删除的数据不是由于备份和分片机制导致的。


再看官网:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-bulk.html

大概有这么一段代码

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 

Leave this request open until a refresh has made the contents of this request visible to search. This refresh policy is compatible with high indexing and search throughput but it causes the request to wait to reply until a refresh occurs.

看不懂,我理解的意思是,删除之后,等待分片与备份刷新完成后再返回。

解决方案: 批量删除 *** 作,加上这个刷新策略 完整代码如下:区别是加了第18行
@Test
public void originalTest() throws IOException {
	// 1. 查询出floorId等于29的所有数据
	SearchRequest searchRequest = new SearchRequest(index);
	searchRequest.source(SearchSourceBuilder.searchSource().size(1000).query(QueryBuilders.termQuery("floorId", 29)));
	SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
	System.out.println("首次查询: " + searchResponse);
	// 2. 记录这批数据的id
	SearchHit[] hits = searchResponse.getHits().getHits();
	List ids = new ArrayList<>(hits.length);
	for (SearchHit hit : hits) {
		Map map = hit.getSourceAsMap();
		ids.add(map.get("id").toString());
	}
	System.out.println("需要删除的id: " + ids);
	// 3根据id批量删除
	BulkRequest bulkRequest = new BulkRequest();
	bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
	for (String id : ids) {
		bulkRequest.add(new DeleteRequest(index, id));
	}
	BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
	System.out.println("耗时: " + bulkResponse.getTook());
	if (bulkResponse.hasFailures()) {
		System.out.println("批量删除失败: " + bulkResponse.buildFailureMessage());
	}
	// 4. 再查询出floorId等于29的数据
	searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
	System.out.println("删除后再次查询: " + searchResponse);
}

再次验证,这次查询不到"已经删除"的数据了。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5679170.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存