通常ES存储数据是直接插入JSON数据(此 *** 作在另一篇博文中记录:ES中JSON文档存储),而在实际Java开发中一般定义数据接收对象是个实体对象,对ES进行新增修改 *** 作时就需要进行进一步的转换;
1.新增保存
public boolean save(T entity) { boolean result = false; String index = indexName(entity); Assert.notNull(index, INDEX_NULL); IndexRequest request = new IndexRequest(index).source(removeNull(entity)); if(isNotEmpty(entity.get_id())){ //指定id,如果ES中存在则更新,不存在则新增 //不指定时,ES自己生成唯一主键 request.id(entity.get_id()); } if(isNotEmpty(entity.getRouting())){ //指定routing,在插入父子文档数据时为必需 request.routing(entity.getRouting()); } try { client.index(request, RequestOptions.DEFAULT); result = true; } catch (IOException e) { log.error("保存数据失败:", e); } return result; }
entity转map,过滤null
private MapremoveNull(T entity) { BeanMap beanMap = BeanMap.create(entity); Map map = new HashMap<>(); beanMap.forEach((key, value) -> { if (null != value && !EXCLUDE_FIELD.contains(String.valueOf(key))) { if(value instanceof List){ map.put(String.valueOf(key), parseArray(JSON.toJSonString(value))); }else if(value instanceof EvntRel){ map.put(String.valueOf(key), JSON.toJSON(value)); }else{ map.put(String.valueOf(key), value); } } }); return map; }
ES的entity主类
public class ESEntity implements Serializable { protected static final long serialVersionUID = 1L; protected String id; private String bucket;//用于分组聚合存储聚合的key @JsonProperty(access = JsonProperty.Access.WRITE_ONLY) protected int size = 10; @JsonProperty(access = JsonProperty.Access.WRITE_ONLY) protected int current = 1; @JsonProperty(access = JsonProperty.Access.WRITE_ONLY) protected transient Page page; public Page getPage() { if (page == null) { page = new Page(current, size); } return page; } }
public class ESbaseEntity extends ESEntity { private String _id; private String _index; private String routing = ""; }
public BulkResponse bulkAddOrUpdate(Listlist) { Assert.notEmpty(list, BULK_LIST_NULL); Assert.notNull(indexName(list.get(0)), INDEX_NULL); BulkRequest bulkRequest = new BulkRequest(); for (T obj : list) { IndexRequest indexRequest = new IndexRequest(obj.get_index()).source(removeNull(obj)); if(isNotEmpty(obj.get_id())){ indexRequest.id(obj.get_id()); } if(isNotEmpty(obj.getRouting())){ indexRequest.routing(obj.getRouting()); } bulkRequest.add(indexRequest); } try { return client.bulk(bulkRequest, RequestOptions.DEFAULT); } catch (IOException e) { log.error("批量插入数据失败:", e); } return null; }
3.更新指定文档
public boolean updateById(String id, T entity) { String index = indexName(Objects.requireNonNull(entity, "对象不能为null")); return updateById(index, id, entity, entity.getRouting()); } public boolean updateById(String index, String id, T entity, String routing) { boolean result; UpdateRequest request = new UpdateRequest(index, id).doc(removeNull(entity)); if (isNotEmpty(routing)) { request.routing(routing); } try { client.update(request, RequestOptions.DEFAULT); result = true; } catch (Exception e) { log.error("指定索引更新失败,Exception:", e); result = false; } return result; }
3.根据查询条件批量更新文档
public boolean updateByQuery(String index, QueryBuilder queryBuilder, script script) { boolean result = false; try { UpdateByQueryRequest request = new UpdateByQueryRequest(index); request.setQuery(queryBuilder); request.setscript(script); request.setBatchSize(BATCH_NUM); //documents processed per batch default = 1000 大了,意味着消耗更多的内存 client.updateByQuery(request, RequestOptions.DEFAULT); result = true; } catch (IOException e) { log.error("更新失败,Exception:", e); } return result; }
4.大量更新
public BulkByScrollResponse updateByQueryResponse(String index, QueryBuilder queryBuilder, script script) { Assert.notNull(index, INDEX_NULL); try { UpdateByQueryRequest request = new UpdateByQueryRequest(index); request.setQuery(queryBuilder); request.setscript(script); request.setSize(MAX_NUM);//最大值-自己设置,一般10000 request.setBatchSize(BATCH_NUM); //批处理数量 documents processed per batch default = 1000 大了,意味着消耗更多的内存 return client.updateByQuery(request, RequestOptions.DEFAULT); } catch (IOException e) { log.error("更新失败,Exception:", e); } return null; }
5.批处理
private boolean batch(final String[] ids, final Listlist, final T entity, final Action action) { BulkProcessor processor = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), processorListener).build(); if (action.equals(Action.UPDATE)) { if (!CollectionUtils.isEmpty(list)) { log.info("多值批量更新"); list.forEach(e -> processor.add(new UpdateRequest(indexName(e), e.getId()).doc(removeNull(entity)))); } else { log.info("单值批量更新"); Assert.notNull(ids, PARAM_IDS_NULL); Stream.of(ids) .forEach(id -> processor.add(new UpdateRequest(indexName(entity), id).doc(beanToMap(entity)))); } } else if (action.equals(Action.DELETE)) { log.info("批量删除"); if (!CollectionUtils.isEmpty(list)) { list.forEach(e -> processor.add(new DeleteRequest(indexName(e), e.getId()))); } else { Assert.notNull(ids, PARAM_IDS_NULL); String index = indexName(entity); Stream.of(ids).forEach(id -> processor.add(new DeleteRequest(index, id))); } } else { log.info("参数错误没有执行,ids:{},list:{},entity:{},action:{}", ids, list, entity, action); } try { processor.awaitClose(10L, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("bulkProcessor关闭异常:", e); } return true; }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)