- 1、基于 TransportClient
- 1.1 创建索引
- 1.2 根据Id查找
- 1.3 查询所有文档
- 1.4 更新文档
- 1.5 删除指定Id文档
- 1.6 多条件查找
- 1.7 聚合查询
- 2、基于 Rest High Level Client
- 2.1 创建索引
- 2.2 查询指定索引文档
- 2.3 删除指定索引
- 2.4 写入数据
- 2.5 批量写入
- 2.6 根据指定Id查询
- 2.7 根据Id删除文档
- 2.8 多条件查找
- 2.9 update_by_query
- 2.10 嗅探器
@SneakyThrows
private void create(TransportClient client) {
List list = service.list();
for (Product item : list) {
System.out.println(item.getDate().toLocalDateTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
IndexResponse response = client.prepareIndex("product", "_doc", item.getId().toString())
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("name", item.getName())
.field("desc", item.getDesc())
.field("price", item.getPrice())
.field("date", item.getDate().toLocalDateTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))
.field("tags", item.getTags().replace("\"", "").split(","))
.endObject())
.get();
System.out.println(response.getResult());
}
}
1.2 根据Id查找
private void get(TransportClient client) {
GetResponse response = client.prepareGet("product", "_doc", "1").get();
String index = response.getIndex();//获取索引名称
String type = response.getType();//获取索引类型
String id = response.getId();//获取索引id
System.out.println("index:" + index);
System.out.println("type:" + type);
System.out.println("id:" + id);
System.out.println(response.getSourceAsString());
}
1.3 查询所有文档
private void getAll(TransportClient client) {
SearchResponse response = client.prepareSearch("product")
.get();
SearchHits searchHits = response.getHits();
SearchHit[] hits = searchHits.getHits();
for (SearchHit hit : hits) {
String res = hit.getSourceAsString();
System.out.println("res" + res);
}
}
1.4 更新文档
private void update(TransportClient client) {
UpdateResponse response = client.prepareUpdate("product", "_doc", "2")
.setDoc(XContentFactory.jsonBuilder()
.startObject()
.field("name", "update name")
.endObject())
.get();
System.out.println(response.getResult());
}
1.5 删除指定Id文档
private void delete(TransportClient client) {
DeleteResponse response = client.prepareDelete("product", "_doc", "2").get();
System.out.println(response.getResult());
}
1.6 多条件查找
void multiSearch() {
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
SearchResponse response = client.prepareSearch("product")
.setQuery(QueryBuilders.termQuery("name", "xiaomi"))//Query
.setPostFilter(QueryBuilders.rangeQuery("price").from(0).to(4000))
.setFrom(1).setSize(2)
.get();
SearchHits searchHits = response.getHits();
SearchHit[] hits = searchHits.getHits();
for (SearchHit hit : hits) {
String res = hit.getSourceAsString();
System.out.println("res" + res);
}
client.close();
}
1.7 聚合查询
void aggSearch() {
//region 1->创建客户端连接
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
//endregion
//region 2->计算并返回聚合分析response对象
SearchResponse response = client.prepareSearch("product")
.setSize(0)
.setQuery(QueryBuilders.matchAllQuery())
.addAggregation(AggregationBuilders.dateHistogram("group_by_month")
.field("date")
.calendarInterval(DateHistogramInterval.MONTH)
.minDocCount(1)
.subAggregation(AggregationBuilders.terms("by_tag")
.field("tags.keyword")
.subAggregation(AggregationBuilders.avg("avg_price")
.field("price"))
)
).execute().actionGet();
//endregion
//region 3->输出结果信息
SearchHit[] hits = response.getHits().getHits();
Map map = response.getAggregations().asMap();
Aggregation group_by_month = map.get("group_by_month");
Histogram dates = (Histogram) group_by_month;
Iterator buckets = (Iterator) dates.getBuckets().iterator();
while (buckets.hasNext()) {
Histogram.Bucket dateBucket = buckets.next();
System.out.println("\n月份:" + dateBucket.getKeyAsString() + "\n计数:" + dateBucket.getDocCount());
Aggregation by_tag = dateBucket.getAggregations().asMap().get("by_tag");
StringTerms terms = (StringTerms) by_tag;
Iterator tags = terms.getBuckets().iterator();
while (tags.hasNext()) {
StringTerms.Bucket tag = tags.next();
System.out.println("\t变迁名称:" + tag.getKey() + "\n\t数量:" + tag.getDocCount());
Aggregation avg_price = tag.getAggregations().get("avg_price");
Avg avg = (Avg) avg_price;
System.out.println("\t平均价格:" + avg.getValue());
}
}
//endregion
client.close();
}
2、基于 Rest High Level Client
2.1 创建索引
public void createIndex() {
//region 创建客户端对象
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")
)
);
//endregion
//region Request对象
CreateIndexRequest request = new CreateIndexRequest("product2");
//endregion
//region 组装数据
//region setting
request.settings(Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 0)
);
//endregion
//region mapping
// request.mapping(
// "{\n" +
// " \"properties\": {\n" +
// " \"message\": {\n" +
// " \"type\": \"text\"\n" +
// " }\n" +
// " }\n" +
// "}",
// XContentType.JSON);
//region 还可以使用Map构建
// Map message = new HashMap<>();
// message.put("type", "text");
// Map properties = new HashMap<>();
// properties.put("message", message);
// Map mapping = new HashMap<>();
// mapping.put("properties", properties);
// request.mapping(mapping);
//endregion
//region 使用XContentBuilder构建
// XContentBuilder builder = XContentFactory.jsonBuilder();
// builder.startObject();
// {
// builder.startObject("properties");
// {
// builder.startObject("message");
// {
// builder.field("type", "text");
// }
// builder.endObject();
// }
// builder.endObject();
// }
// builder.endObject();
// request.mapping(builder);
//endregion
//endregion
//region 别名
request.alias(new Alias("product_alias").filter(QueryBuilders.termQuery("name", "xiaomi")));
//endregion
request.timeout(TimeValue.timeValueMillis(2));
//endregion
// 同步
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
// 异步
client.indices().createAsync(request, RequestOptions.DEFAULT, new ActionListener() {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
// 是否所有节点都已确认请求
createIndexResponse.isAcknowledged();
// 在超时之前是否为索引中的每个碎片启动所需数量的碎片副本
createIndexResponse.isShardsAcknowledged();
client.close();
}
2.2 查询指定索引文档
public void getIndex() {
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")
)
);
GetIndexRequest request = new GetIndexRequest("product*");
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
String[] indices = response.getIndices();
for (String indexName : indices) {
System.out.println("index name:" + indexName);
}
client.close();
}
2.3 删除指定索引
public void delIndex() {
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")
)
);
DeleteIndexRequest request = new DeleteIndexRequest("product2");
AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
if (response.isAcknowledged()) {
System.out.println("删除index成功!");
} else {
System.out.println("删除index失败!");
}
client.close();
}
2.4 写入数据
public void insertData() {
//region 创建连接
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")
)
);
//endregion
//region 准备数据
List list = service.list();
//endregion
//region 创建Request对象
//插入数据,index不存在则自动根据匹配到的template创建。index没必要每天创建一个,如果是为了灵活管理,最低建议每月一个 yyyyMM。
IndexRequest request = new IndexRequest("test_index");
//endregion
//region 组装数据
Product product = list.get(0);
Gson gson = new Gson();
//最好不要自定义id 会影响插入速度。
request.id(product.getId().toString());
request.source(gson.toJson(product)
, XContentType.JSON);
//endregion
//region 执行Index *** 作
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
//endregion
System.out.println(response);
client.close();
}
2.5 批量写入
public void batchInsertData() {
//region 创建连接
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")
)
);
//endregion
//region 创建Request对象
//批量插入数据,更新和删除同理
BulkRequest request = new BulkRequest("test_index");
//endregion
//region 组装数据
Gson gson = new Gson();
Product product = new Product();
product.setPrice(3999.00);
product.setDesc("xioami");
for (int i = 0; i < 10; i++) {
product.setName("name" + i);
request.add(new IndexRequest()
.id(Integer.toString(i))
.source(gson.toJson(product)
, XContentType.JSON)
);
}
//endregion
BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
System.out.println("数量:" + response.getItems().length);
client.close();
}
2.6 根据指定Id查询
public void getById() {
//region 创建连接
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")));
//endregion
//region 创建Request对象
//注意 这里查询使用的是别名。
GetRequest request = new GetRequest("test_index", "6");
//endregion
//region 组装数据
String[] includes = {"name", "price"};
String[] excludes = {"desc"};
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
//只查询特定字段。如果需要查询所有字段则不设置该项。
request.fetchSourceContext(fetchSourceContext);
//endregion
//region 响应数据
GetResponse response = client.get(request, RequestOptions.DEFAULT);
//endregion
System.out.println(response);
client.close();
}
2.7 根据Id删除文档
public void delById() throws IOException {
//region Description
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")
)
);
//endregion
DeleteRequest request = new DeleteRequest("test_index", "1");
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
System.out.println(response);
client.close();
}
2.8 多条件查找
public void multiGetById() throws IOException {
//region Description
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")));
//endregion
//region Description
//根据多个id查询
MultiGetRequest request = new MultiGetRequest();
//endregion
//region Description
request.add("test_index", "6");
//两种写法
request.add(new MultiGetRequest.Item(
"test_index",
"7"));
//endregion
//region Description
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
//endregion
for (MultiGetItemResponse itemResponse : response) {
System.out.println(itemResponse.getResponse().getSourceAsString());
}
client.close();
}
2.9 update_by_query
public void updateByQuery() throws IOException {
//region 连接
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")
)
);
//endregion
//region 请求对象
UpdateByQueryRequest request = new UpdateByQueryRequest("test_index");
//endregion
//region 组装数据
//默认情况下,版本冲突会中止 UpdateByQueryRequest 进程,但是你可以用以下命令来代替
//设置版本冲突继续
// request.setConflicts("proceed");
//设置更新条件
request.setQuery(QueryBuilders.termQuery("name", "name2"));
// //限制更新条数
// request.setMaxDocs(10);
request.setScript(
new Script(ScriptType.INLINE, "painless", "ctx._source.desc+='#';", Collections.emptyMap()));
//endregion
BulkByScrollResponse response = client.updateByQuery(request, RequestOptions.DEFAULT);
System.out.println(response);
client.close();
}
2.10 嗅探器
public void sniffer() throws IOException {
// sniffer监听器
SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();
// 默认每五分钟发现一次
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9201, "http"))
.setFailureListener(sniffOnFailureListener)//设置用于监听嗅探失败的监听器(小皮鞭)
.build();
NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(
restClient,
ElasticsearchNodesSniffer.DEFAULT_SNIFF_REQUEST_TIMEOUT,
ElasticsearchNodesSniffer.Scheme.HTTPS
);
Sniffer sniffer = Sniffer.builder(restClient)
.setSniffIntervalMillis(5000)//没隔多久嗅探一次 默认5分钟
.setSniffAfterFailureDelayMillis(30000)//嗅探失败的时候,没隔多久触发嗅探,经过设置的时间之后再次嗅探,直至正常位置
.setNodesSniffer(nodesSniffer)
.build();
// 启用监听
sniffOnFailureListener.setSniffer(sniffer);
sniffer.close();
restClient.close();
// Sniffer sniffer = Sniffer.builder(restClient).build();
}
源码地址:附件(点击下载)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)