玩转Elasticsearch—Go整合ES7.16.2

玩转Elasticsearch—Go整合ES7.16.2,第1张

下载依赖:

go get -u github.com/olivere/elastic/v7


Init.go

package main

import (
   "context"
   "fmt"
   "github.com/olivere/elastic/v7"
   "log"
)

var ctx = context.Background()
var Url = "http://127.0.0.1:9200"
var esClient *elastic.Client

var index = "student"

//结构体
type Student struct {
   Id      int    `json:"id"`
   Name    string `json:"name"`
   Age     int    `json:"age"`
   Address string `json:"address"`
}

// 定义一些变量,mapping为定制的index字段类型
// number_of_replicas备份数 , number_of_shards分片数
const mapping = `
{
   "settings":{
      "number_of_shards": 1,
      "number_of_replicas": 0 
   },
   "mappings":{
         "properties":{
            "name":{
               "type":"keyword"
            },
            "address":{
               "type":"text"
            },
            "age":{
               "type":"long"
            },
            "id":{
               "type":"long"
            }
         }
   }
}`

// 初始化es连接
func init() {
   client, err := elastic.NewClient(
      elastic.SetURL(Url),
   )
   if err != nil {
      log.Fatal("es 连接失败:", err)
   }
   // ping通服务端,并获得服务端的es版本,本实例的es版本为version 7.16.2
   info, code, err := client.Ping(Url).Do(ctx)
   if err != nil {
      panic(err)
   }
   fmt.Println("Elasticsearch call code:", code, " version:", info.Version.Number)
   esClient = client
   // fmt.Println("es连接成功")
}

Add.go

package main

import (
   "fmt"
   "log"
)

//添加数据
func AddDoc(index string, data interface{}) (bool, error) {
   // 添加索引
   _, err := addIndex(index)
   if err != nil {
      log.Fatal("创建索引失败", err)
   }
   // 添加doc 先获取index再为index添加index
   res, err := esClient.Index().
      Index(index).
      BodyJson(data).
      Do(ctx)
   if err != nil {
      return false, err
   }
   fmt.Println("添加数据成功:", res)
   return true, nil
}

//添加数据 指定id
func AddDocById(id string, index string, data interface{}) (bool, error) {
   // 添加索引
   _, err := addIndex(index)
   if err != nil {
      log.Fatal("创建索引失败", err)
   }
   // 添加doc 先获取index再为index添加index
   res, err := esClient.Index().
      Index(index).
      BodyJson(data).
      Id(id).
      Do(ctx)
   if err != nil {
      return false, err
   }
   fmt.Println("添加数据成功:", res)
   return true, nil
}

// 添加索引
// 并在7.x后强制要求只能有一个类型,就是 _doc  ---> /{index}/_doc/{id}
func addIndex(index string) (bool, error) {
   // 创建index前,先查看es引擎中是否存在自己想要创建的索引index
   exists, err := esClient.IndexExists(index).Do(ctx)
   if err != nil {
      fmt.Println("存在索引:", err)
      return true, nil
   }
   if !exists {
      // 如果不存在,就创建  BodyString将索引的配置指定为字符串。
      createIndex, err := esClient.CreateIndex(index).BodyString(mapping).Do(ctx)
      if err != nil {
         return false, err
      }
      if !createIndex.Acknowledged {
         return false, err
      }
   }
   return true, nil
}

select.go

package main

import (
   "encoding/json"
   "fmt"
   "github.com/olivere/elastic/v7"
   "strings"
)

// 查询数据
func query(index string, field string, filter elastic.Query, sort string, page int, limit int) (*elastic.SearchResult, error) {
   // 分页数据处理
   isAsc := true
   if sort != "" {
      sortSlice := strings.Split(sort, " ")
      sort = sortSlice[0]
      if sortSlice[1] == "desc" {
         isAsc = false
      }
   }
   // 查询位置处理
   if page <= 1 {
      page = 1
   }

   fsc := elastic.NewFetchSourceContext(true)
   // 返回字段处理
   if field != "" {
      fieldSlice := strings.Split(field, ",")
      if len(fieldSlice) > 0 {
         for _, v := range fieldSlice {
            fsc.Include(v)
         }
      }
   }

   // 开始查询位置
   fromStart := (page - 1) * limit
   res, err := esClient.Search().
      Index(index).
      FetchSourceContext(fsc).
      Query(filter).
      Sort(sort, isAsc).
      From(fromStart).
      Size(limit).
      Pretty(true).
      Do(ctx)
   if err != nil {
      return nil, err
   }
   return res, nil

}

//index:"index"      索引
//field:"name,age"   要查询的字段
//filter:*TermQuery  查询规则
//sort:"age asc"     排序规则
//page:0             页数
//limit:10           条目数量
func QueryDoc(index string, field string, filter elastic.Query, sort string, page int, limit int) (interface{}, error) {
   res, err := query(index, field, filter, sort, page, limit)
   strD, _ := json.Marshal(res)
   if err != nil {
      fmt.Println("失败:", err)
      return nil, nil
   }
   fmt.Println("执行完成")
   return string(strD), nil
}

Update.go

package main

import (
   "errors"
   "fmt"
   "github.com/olivere/elastic/v7"
)

// 条件更新文档
func UpdateDoc(index string, filter elastic.Query, data map[string]interface{}) (bool, error) {
   // 修改数据组装
   if len(data) < 0 {
      return false, errors.New("修改参数不正确")
   }
   scriptStr := ""
   for k := range data {
      scriptStr += "ctx._source." + k + " = params." + k + ";"
   }
   script := elastic.NewScript(scriptStr).Params(data)
   res, err := esClient.UpdateByQuery(index).
      Query(filter).
      Script(script).
      Do(ctx)
   if err != nil {
      return false, err
   }
   fmt.Println("添加数据成功:", res)
   return true, nil
}

delete.go

package main

import (
   "fmt"
   "github.com/olivere/elastic/v7"
)

// 删除文档
func DeleteDoc(index string, filter elastic.Query) (bool, error) {
   res, err := esClient.DeleteByQuery().
      Query(filter).
      Index(index).
      Do(ctx)
   if err != nil {
      return false, err
   }
   fmt.Println("删除信息:", res)
   return true, nil
}

Main.go

package main

import (
   "fmt"
   "github.com/olivere/elastic/v7"
)

func main() {
   //deleteDoc()
   //add()
   //update()
}

func add() {
   stu := Student{
      Id:      2,
      Address: "北京",
      Name:    "小红",
      Age:     10,
   }
   //res, err := AddDoc(index, stu)
   res, err := AddDocById("2", index, stu)
   fmt.Println(res)
   fmt.Println(err)
}

func selectDoc() {
   index_ := index
   sort := "age asc"
   page := 0
   limit := 10
   field := "id,name,age,address"
   filter := elastic.NewTermQuery("name", "名称")
   res, err := QueryDoc(index_, field, filter, sort, page, limit)
   fmt.Println(res)
   fmt.Println(err)
}

func deleteDoc() {
   query := elastic.NewTermQuery("name", "小明")
   DeleteDoc(index, query)
}

func update() {
   index_ := index
   filter := elastic.NewTermQuery("name", "小明")
   data := make(map[string]interface{})
   data["address"] = "上海"
   data["age"] = 120
   res, err := UpdateDoc(index_, filter, data)
   if err != nil {
      fmt.Println("失败:", err)
   } else {
      fmt.Println("成功:", res)
   }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/995794.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存