Elasticsearch 实体类数据保存修改

Elasticsearch 实体类数据保存修改,第1张

Elasticsearch 实体类数据保存修改

通常ES存储数据是直接插入JSON数据(此 *** 作在另一篇博文中记录:ES中JSON文档存储),而在实际Java开发中一般定义数据接收对象是个实体对象,对ES进行新增修改 *** 作时就需要进行进一步的转换;

1.新增保存

public boolean save(T entity) {
        boolean result = false;
        String index = indexName(entity);
        Assert.notNull(index, INDEX_NULL);
        IndexRequest request = new IndexRequest(index).source(removeNull(entity));
        if(isNotEmpty(entity.get_id())){
            //指定id,如果ES中存在则更新,不存在则新增
            //不指定时,ES自己生成唯一主键
            request.id(entity.get_id());
        }
        if(isNotEmpty(entity.getRouting())){
            //指定routing,在插入父子文档数据时为必需
            request.routing(entity.getRouting());
        }
        try {
            client.index(request, RequestOptions.DEFAULT);
            result = true;
        } catch (IOException e) {
            log.error("保存数据失败:", e);
        }
        return result;
    }

entity转map,过滤null

private Map removeNull(T entity) {
        BeanMap beanMap = BeanMap.create(entity);
        Map map = new HashMap<>();
        beanMap.forEach((key, value) -> {
            if (null != value && !EXCLUDE_FIELD.contains(String.valueOf(key))) {
                if(value instanceof List){
                    map.put(String.valueOf(key), parseArray(JSON.toJSonString(value)));
                }else if(value instanceof EvntRel){
                    map.put(String.valueOf(key), JSON.toJSON(value));
                }else{
                    map.put(String.valueOf(key), value);
                }
            }
        });
        return map;
    }

ES的entity主类

public class ESEntity implements Serializable {
    protected static final long serialVersionUID = 1L;
    protected String id;
    private String bucket;//用于分组聚合存储聚合的key
    
    @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
    protected int size = 10;
    
    @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
    protected int current = 1;
    
    @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
    protected transient Page page;

    public Page getPage() {
        if (page == null) {
            page = new Page(current, size);
        }
        return page;
    }


}
public class ESbaseEntity extends ESEntity {
    private String _id;

    private String _index;

    private String routing = "";
}

2.批量保存、更新文档

    public BulkResponse bulkAddOrUpdate(List list) {
        Assert.notEmpty(list, BULK_LIST_NULL);
        Assert.notNull(indexName(list.get(0)), INDEX_NULL);
        BulkRequest bulkRequest = new BulkRequest();
        for (T obj : list) {
            IndexRequest indexRequest = new IndexRequest(obj.get_index()).source(removeNull(obj));
            if(isNotEmpty(obj.get_id())){
                indexRequest.id(obj.get_id());
            }
            if(isNotEmpty(obj.getRouting())){
                indexRequest.routing(obj.getRouting());
            }
            bulkRequest.add(indexRequest);
        }
        try {
            return client.bulk(bulkRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("批量插入数据失败:", e);
        }
        return null;
    }

3.更新指定文档

public boolean updateById(String id, T entity) {
        String index = indexName(Objects.requireNonNull(entity, "对象不能为null"));
        return updateById(index, id, entity, entity.getRouting());
    }

    public boolean updateById(String index, String id, T entity, String routing) {
        boolean result;
     
        UpdateRequest request = new UpdateRequest(index, id).doc(removeNull(entity));
        if (isNotEmpty(routing)) {
            request.routing(routing);
        }
        try {
            client.update(request, RequestOptions.DEFAULT);
            result = true;
        } catch (Exception e) {
            log.error("指定索引更新失败,Exception:", e);
            result = false;
        }
        return result;
    }

3.根据查询条件批量更新文档

    public boolean updateByQuery(String index, QueryBuilder queryBuilder, script script) {
       
        boolean result = false;
        try {
            UpdateByQueryRequest request = new UpdateByQueryRequest(index);
            request.setQuery(queryBuilder);
            request.setscript(script);
            request.setBatchSize(BATCH_NUM); //documents processed per batch default = 1000 大了,意味着消耗更多的内存
            client.updateByQuery(request, RequestOptions.DEFAULT);
            result = true;
        } catch (IOException e) {
            log.error("更新失败,Exception:", e);
        }
        return result;
    }

4.大量更新

public BulkByScrollResponse updateByQueryResponse(String index, QueryBuilder queryBuilder, script script) {
        Assert.notNull(index, INDEX_NULL);
        try {
            UpdateByQueryRequest request = new UpdateByQueryRequest(index);
            request.setQuery(queryBuilder);
            request.setscript(script);
            request.setSize(MAX_NUM);//最大值-自己设置,一般10000
            request.setBatchSize(BATCH_NUM); //批处理数量  documents processed per batch default = 1000 大了,意味着消耗更多的内存
            return client.updateByQuery(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("更新失败,Exception:", e);
        }
        return null;
    }

5.批处理

    private boolean batch(final String[] ids, final List list, final T entity, final Action action) {
        BulkProcessor processor = BulkProcessor.builder(
                (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                processorListener).build();

        if (action.equals(Action.UPDATE)) {
            if (!CollectionUtils.isEmpty(list)) {
                log.info("多值批量更新");
                list.forEach(e -> processor.add(new UpdateRequest(indexName(e), e.getId()).doc(removeNull(entity))));

            } else {
                log.info("单值批量更新");
                Assert.notNull(ids, PARAM_IDS_NULL);
                Stream.of(ids)
                        .forEach(id -> processor.add(new UpdateRequest(indexName(entity), id).doc(beanToMap(entity))));
            }

        } else if (action.equals(Action.DELETE)) {
            log.info("批量删除");
            if (!CollectionUtils.isEmpty(list)) {
                list.forEach(e -> processor.add(new DeleteRequest(indexName(e), e.getId())));
            } else {
                Assert.notNull(ids, PARAM_IDS_NULL);
                String index = indexName(entity);
                Stream.of(ids).forEach(id -> processor.add(new DeleteRequest(index, id)));
            }


        } else {
            log.info("参数错误没有执行,ids:{},list:{},entity:{},action:{}", ids, list, entity, action);
        }
        try {
            processor.awaitClose(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("bulkProcessor关闭异常:", e);
        }
        return true;


    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5696937.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存