golang批量写入elasticsearch

golang批量写入elasticsearch,第1张

文章目录 需求ES写入阿里云CDN的API整体代码BulkProcessor流程图

需求

需要将阿里云CDN的离线日志每天自动化写入到elasticsearch,方便查看。

ES写入

使用olivere/elastic组件
单携程批量写入用Bulk,数据量上来之后一定会出现性能瓶颈
后改用多携程用BulkProcessor

// ReadLineFile 同步方式,使用Bulk批量写入
func ReadLineFile(index int, fileName string, elk *Elk) {
	if file, err := os.Open(fileName); err != nil {
		panic(err)
	} else {
		defer file.Close()
		scanner := bufio.NewScanner(file)
		bulkRequest := elk.Client.Bulk()
		i := 0
		n := 0
		date := time.Now().AddDate(0, 0, -1).Format("2006-01-02")
		for scanner.Scan() {
			i++
			n++
			cdn := toStruct(scanner.Text())
			date = cdn.Time.Format("2006-01-02")
			req := elastic.NewBulkIndexRequest().Index("cdnlog-" + date).Doc(cdn)
			bulkRequest = bulkRequest.Add(req)
			if i > 9999 {
				fmt.Println(n)
				i = 0
				_, err := bulkRequest.Do(context.Background())
				if err != nil {
					fmt.Println(fmt.Sprintf("bulkResponse:%v count:%v is err:%v", index, bulkRequest.NumberOfActions(), err.Error()))
				} else {
					bulkRequest = elk.Client.Bulk()
				}
			}
		}
		bulkResponse, err := bulkRequest.Do(context.Background())
		if err != nil {
			fmt.Println(err)
		}
		if bulkResponse != nil {
			fmt.Println(n)
		}
	}
}

// ReadLineFileSync 异步的方式,使用BulkProcessor
func ReadLineFileSync(index int, fileName string, elk *Elk) {
	if file, err := os.Open(fileName); err != nil {
		panic(err)
	} else {
		defer file.Close()
		scanner := bufio.NewScanner(file)
		w, _ := elk.Client.BulkProcessor().
			BulkActions(bulkActions).        //每个携程队列容量
			FlushInterval(time.Millisecond). //刷新间隔
			Workers(workerNum).              //携程数
			Stats(true).                     //是否获取统计信息
			After(GetFailed).                //刷新后回调函数
			Do(context.Background())
		w.Start(context.Background())
		defer w.Close() //关闭并提交所有队列里的数据,一定要做
		i := 0
		n := 0
		date := time.Now().AddDate(0, 0, -1).Format("2006-01-02")
		for scanner.Scan() {
			i++
			n++
			cdn := toStruct(scanner.Text())
			date = cdn.Time.Format("2006-01-02")
			req := elastic.NewBulkIndexRequest().Index("cdnlog-" + date).Doc(cdn)
			w.Add(req) //这里不用显示调用Do 因为上面定义了两个写入的触发条件
			if i > 99999 {
				i = 0
				st := w.Stats() //获取数据写入情况
				log.Printf("BulkMonitor state Succeeded:%d Failed:%d Created:%d Updated:%d Deleted:%d Flushed:%d Committed:%d Indexed:%d\n", st.Succeeded, st.Failed, st.Created, st.Updated, st.Deleted, st.Flushed, st.Committed, st.Indexed)
				fmt.Println(fmt.Sprintf("index:%v count:%v", index, n))
				for _, s := range st.Workers {
					log.Println("BulkMonitor Queue:", s.Queued) //这里打印每个worker的队列里面有多少数据
				}
			}
		}
		fmt.Println(fmt.Sprintf("index:%v count:%v", index, n))
	}
}

//GetFailed 是发生数据写入失败时获取详情的回调函数
func GetFailed(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
	if response == nil { //可能存在为空的情况 😳
		log.Println("GetNil response return")
		return
	}
	fi := response.Failed()
	if len(fi) != 0 {
		for _, f := range fi {
			log.Printf("DebugFailedEs: index:%s type:%s id:%s version:%d  status:%d result:%s ForceRefresh:%v errorDetail:%v getResult:%v\n", f.Index, f.Type, f.Id, f.Version, f.Status, f.Result, f.ForcedRefresh, f.Error, f.GetResult)
		}
	}
}

func NewElk(url, user, password string) (*Elk, error) {
	Es, err := elastic.NewClient(
		elastic.SetSniff(false),
		elastic.SetURL(url),
		elastic.SetBasicAuth(user, password),
	)
	if err != nil {
		return nil, err
	}

	e := &Elk{
		Client: Es,
	}
	return e, nil
}
阿里云CDN的API
package common

import (
	openapi "github.com/alibabacloud-go/darabonba-openapi/client"
	dcdn20180115 "github.com/alibabacloud-go/dcdn-20180115/client"
	"github.com/alibabacloud-go/tea/tea"
)

const (
	AccessKeyId     = "LTAI5txxxxxUzoNCnrc"
	AccessKeySecret = "81UquYxxxxQtW7Q7m19loXg"
	Endpoint        = "dcdn.aliyuncs.com"
	DomainName      = "vvin.gwxxxek.com"
)

func CreateClient(accessKeyId *string, accessKeySecret *string) (_result *dcdn20180115.Client, _err error) {
	config := &openapi.Config{
		// 您的AccessKey ID
		AccessKeyId: accessKeyId,
		// 您的AccessKey Secret
		AccessKeySecret: accessKeySecret,
	}
	// 访问的域名
	config.Endpoint = tea.String(Endpoint)
	_result = &dcdn20180115.Client{}
	_result, _err = dcdn20180115.NewClient(config)
	return _result, _err
}

func GetDownData(startTime,endTime string) (paths []string, _err error) {
	paths = make([]string, 0)
	client, _err := CreateClient(tea.String(AccessKeyId), tea.String(AccessKeySecret))
	if _err != nil {
		return paths, _err
	}

	describeDcdnDomainLogRequest := &dcdn20180115.DescribeDcdnDomainLogRequest{
		DomainName: tea.String(DomainName),
		StartTime: tea.String(startTime),
		EndTime: tea.String(endTime),
	}
	// 复制代码运行请自行打印 API 的返回值
	res, _err := client.DescribeDcdnDomainLog(describeDcdnDomainLogRequest)
	if _err != nil {
		return paths, _err
	}
	for _, DomainLogDetail := range res.Body.DomainLogDetails.DomainLogDetail {
		for _, LogInfoDetail := range DomainLogDetail.LogInfos.LogInfoDetail {
			paths = append(paths, tea.StringValue(LogInfoDetail.LogPath))
		}
	}
	//fmt.Println(res)
	return paths, _err
}
整体代码
package main

import (
	"bufio"
	"compress/gzip"
	"context"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"strconv"
	"strings"
	"time"

	"toolTest/common"

	"github.com/olivere/elastic/v7"
)

type Elk struct {
	Client *elastic.Client
}

type Cdn struct {
	Time         time.Time
	DateTime     string
	Ip           string
	Country      string
	Vpn          string
	ResponseTime string
	Referer      string
	Method       string
	Url          string
	HttpCode     string
	RequestSize  string
	ResponseSize string
	CacheStatus  string
	UAHeard      string
	ContentType  string
	Content      string
	LogModule    string
}

const (
	url         = "http://es-cn-zv5xxxxx.elasticsearch.aliyuncs.com:9200"
	user        = "elastic"
	password    = "000000"
	bulkActions = 5000
	workerNum   = 20
	TimeForm    = "2/Jan/2006:15:04:05"
	TimeForm2   = "2006-01-02 15:04:05"
)

func main() {
	StartRunCdn()
}

func StartRunCdn() {
	//ip.InitRegion()
	sysPath, _ := os.Getwd()
	dirPath := sysPath + "\CndLog"
	_, err := os.Stat(dirPath)
	if err != nil {
		if os.IsNotExist(err) {
			err = os.Mkdir(dirPath, 0777)
			if err != nil {
				fmt.Println(fmt.Sprintf("mkdir path:%v,is err:%v", dirPath, err.Error()))
				return
			}
		} else {
			fmt.Println(fmt.Sprintf("mkdir path:%v,is err:%v", dirPath, err.Error()))
			return
		}
	}
	startTime, err := time.Parse(TimeForm2, "2021-12-21 00:00:00")
	if err != nil {
		fmt.Println(err.Error())
	}
	endTime, err := time.Parse(TimeForm2, "2021-12-21 01:00:00")
	if err != nil {
		fmt.Println(err.Error())
	}
	elk, err := NewElk(url, user, password)
	if err != nil {
		fmt.Println(fmt.Sprintf("NewElk is err:%v", err.Error()))
		return
	}
	for j := 0; j < 1; j++ {
		startTimeStr := startTime.AddDate(0, 0, j).Format("2006-01-02T15:04:05Z")
		endTimeStr := endTime.AddDate(0, 0, j).Format("2006-01-02T15:04:05Z")
		paths, _ := common.GetDownData(startTimeStr, endTimeStr)//调用API获取下载路径
		fmt.Println(fmt.Sprintf("startTime:%s;\nendTime:%s;\npwd:%s;\npathCount:%v;\npath:%v", startTimeStr, endTimeStr, sysPath, len(paths), paths))
		RunTask(elk, dirPath, paths)
	}
}

func RunTask(elk *Elk, dirPath string, paths []string) {
	//var wg sync.WaitGroup
	for i, path := range paths {
		//wg.Add(1)
		func(index int) {
			//defer wg.Done()
			fmt.Println(path)
			logTime := time.Now().Format("2006-01-02T")
			gzipPath := dirPath + "\" + logTime + strconv.Itoa(index) + ".gz"
			filePath := dirPath + "\" + logTime + strconv.Itoa(index) + ".txt"
			err := downloadFile("https://"+path, gzipPath)//下载文件
			if err != nil {
				fmt.Println(fmt.Sprintf("path:%v;gzipPath:%v;downloadFile:%v", path, gzipPath, err.Error()))
			}
			err = ReadGzipFile(gzipPath, filePath)//解压文件
			if err != nil {
				fmt.Println(fmt.Sprintf("gzipPath:%v;filePath:%v;ReadGzipFile:%v", gzipPath, filePath, err.Error()))
			}
			ReadLineFileSync(index, filePath, elk)//读取文件并写入ES
			err = os.RemoveAll(gzipPath)//删除下载文件
			if err != nil {
				fmt.Println("del dir error:", err)
			}
			err = os.RemoveAll(filePath)//删除解压文件
			if err != nil {
				fmt.Println("del dir error:", err)
			}
		}(i)
	}
	//wg.Wait()
}

// toStruct cdn数据转结构体
func toStruct(str string) (cdn *Cdn) {
	arrStr := strings.Split(str, " ")
	if len(arrStr) > 0 {
		defer func() {
			if err := recover(); err != nil {
				fmt.Println(err)
			}
		}()
		cdnTime, err := time.Parse(TimeForm, strings.TrimLeft(arrStr[0], "["))
		if err != nil {
			fmt.Println(err.Error())
		}
		//c, err := ip.GetCountry(arrStr[2])
		//if err != nil {
		//	fmt.Println(err.Error())
		//}
		cdn = &Cdn{
			LogModule:    "CDN",
			Content:      str,
			Time:         cdnTime,
			DateTime:     arrStr[0] + arrStr[1],
			Ip:           arrStr[2],
			//Country:      c.Country.Names["zh-CN"],
			Vpn:          arrStr[3],
			ResponseTime: arrStr[4],
			Referer:      arrStr[5],
			Method:       arrStr[6],
			Url:          arrStr[7],
			HttpCode:     arrStr[8],
			RequestSize:  arrStr[9],
			ResponseSize: arrStr[10],
			CacheStatus:  arrStr[11],
		}
		if arrStr[len(arrStr)-1] == "charset=UTF-8\"" && arrStr[len(arrStr)-2] == "\"text/html;" {
			for i := 12; i < len(arrStr)-2; i++ {
				cdn.UAHeard += arrStr[i]
			}
			cdn.ContentType = arrStr[len(arrStr)-2] + arrStr[len(arrStr)-1]
		} else {
			for i := 12; i < len(arrStr)-1; i++ {
				cdn.UAHeard += arrStr[i]
			}
			cdn.ContentType = arrStr[len(arrStr)-1]
		}
	}
	return cdn
}

func downloadFile(url string, path string) error {
	// Get the data
	resp, err := http.Get(url)
	if err != nil {
		return err
	}
	defer resp.Body.Close()
	// Create output file
	out, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0777)
	if err != nil {
		return err
	}
	defer out.Close()
	// copy stream
	_, err = io.Copy(out, resp.Body)
	if err != nil {
		return err
	}
	return err
}

func ReadGzipFile(url string, path string) error {
	gzipFile, _ := os.Open(url)
	defer gzipFile.Close()
	gzipReader, _ := gzip.NewReader(gzipFile)
	defer gzipReader.Close()
	outfileWriter, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0777)
	defer outfileWriter.Close()
	if err != nil {
		return err
	}

	// 复制内容
	_, err = io.Copy(outfileWriter, gzipReader)
	if err != nil {
		return err
	}
	return err
}
BulkProcessor流程图

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/993966.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存