需要将阿里云CDN的离线日志每天自动化写入到elasticsearch,方便查看。
ES写入使用olivere/elastic组件
单携程批量写入用Bulk,数据量上来之后一定会出现性能瓶颈
后改用多携程用BulkProcessor
// ReadLineFile 同步方式,使用Bulk批量写入
func ReadLineFile(index int, fileName string, elk *Elk) {
if file, err := os.Open(fileName); err != nil {
panic(err)
} else {
defer file.Close()
scanner := bufio.NewScanner(file)
bulkRequest := elk.Client.Bulk()
i := 0
n := 0
date := time.Now().AddDate(0, 0, -1).Format("2006-01-02")
for scanner.Scan() {
i++
n++
cdn := toStruct(scanner.Text())
date = cdn.Time.Format("2006-01-02")
req := elastic.NewBulkIndexRequest().Index("cdnlog-" + date).Doc(cdn)
bulkRequest = bulkRequest.Add(req)
if i > 9999 {
fmt.Println(n)
i = 0
_, err := bulkRequest.Do(context.Background())
if err != nil {
fmt.Println(fmt.Sprintf("bulkResponse:%v count:%v is err:%v", index, bulkRequest.NumberOfActions(), err.Error()))
} else {
bulkRequest = elk.Client.Bulk()
}
}
}
bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
fmt.Println(err)
}
if bulkResponse != nil {
fmt.Println(n)
}
}
}
// ReadLineFileSync 异步的方式,使用BulkProcessor
func ReadLineFileSync(index int, fileName string, elk *Elk) {
if file, err := os.Open(fileName); err != nil {
panic(err)
} else {
defer file.Close()
scanner := bufio.NewScanner(file)
w, _ := elk.Client.BulkProcessor().
BulkActions(bulkActions). //每个携程队列容量
FlushInterval(time.Millisecond). //刷新间隔
Workers(workerNum). //携程数
Stats(true). //是否获取统计信息
After(GetFailed). //刷新后回调函数
Do(context.Background())
w.Start(context.Background())
defer w.Close() //关闭并提交所有队列里的数据,一定要做
i := 0
n := 0
date := time.Now().AddDate(0, 0, -1).Format("2006-01-02")
for scanner.Scan() {
i++
n++
cdn := toStruct(scanner.Text())
date = cdn.Time.Format("2006-01-02")
req := elastic.NewBulkIndexRequest().Index("cdnlog-" + date).Doc(cdn)
w.Add(req) //这里不用显示调用Do 因为上面定义了两个写入的触发条件
if i > 99999 {
i = 0
st := w.Stats() //获取数据写入情况
log.Printf("BulkMonitor state Succeeded:%d Failed:%d Created:%d Updated:%d Deleted:%d Flushed:%d Committed:%d Indexed:%d\n", st.Succeeded, st.Failed, st.Created, st.Updated, st.Deleted, st.Flushed, st.Committed, st.Indexed)
fmt.Println(fmt.Sprintf("index:%v count:%v", index, n))
for _, s := range st.Workers {
log.Println("BulkMonitor Queue:", s.Queued) //这里打印每个worker的队列里面有多少数据
}
}
}
fmt.Println(fmt.Sprintf("index:%v count:%v", index, n))
}
}
//GetFailed 是发生数据写入失败时获取详情的回调函数
func GetFailed(executionId int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if response == nil { //可能存在为空的情况 😳
log.Println("GetNil response return")
return
}
fi := response.Failed()
if len(fi) != 0 {
for _, f := range fi {
log.Printf("DebugFailedEs: index:%s type:%s id:%s version:%d status:%d result:%s ForceRefresh:%v errorDetail:%v getResult:%v\n", f.Index, f.Type, f.Id, f.Version, f.Status, f.Result, f.ForcedRefresh, f.Error, f.GetResult)
}
}
}
func NewElk(url, user, password string) (*Elk, error) {
Es, err := elastic.NewClient(
elastic.SetSniff(false),
elastic.SetURL(url),
elastic.SetBasicAuth(user, password),
)
if err != nil {
return nil, err
}
e := &Elk{
Client: Es,
}
return e, nil
}
阿里云CDN的API
package common
import (
openapi "github.com/alibabacloud-go/darabonba-openapi/client"
dcdn20180115 "github.com/alibabacloud-go/dcdn-20180115/client"
"github.com/alibabacloud-go/tea/tea"
)
const (
AccessKeyId = "LTAI5txxxxxUzoNCnrc"
AccessKeySecret = "81UquYxxxxQtW7Q7m19loXg"
Endpoint = "dcdn.aliyuncs.com"
DomainName = "vvin.gwxxxek.com"
)
func CreateClient(accessKeyId *string, accessKeySecret *string) (_result *dcdn20180115.Client, _err error) {
config := &openapi.Config{
// 您的AccessKey ID
AccessKeyId: accessKeyId,
// 您的AccessKey Secret
AccessKeySecret: accessKeySecret,
}
// 访问的域名
config.Endpoint = tea.String(Endpoint)
_result = &dcdn20180115.Client{}
_result, _err = dcdn20180115.NewClient(config)
return _result, _err
}
func GetDownData(startTime,endTime string) (paths []string, _err error) {
paths = make([]string, 0)
client, _err := CreateClient(tea.String(AccessKeyId), tea.String(AccessKeySecret))
if _err != nil {
return paths, _err
}
describeDcdnDomainLogRequest := &dcdn20180115.DescribeDcdnDomainLogRequest{
DomainName: tea.String(DomainName),
StartTime: tea.String(startTime),
EndTime: tea.String(endTime),
}
// 复制代码运行请自行打印 API 的返回值
res, _err := client.DescribeDcdnDomainLog(describeDcdnDomainLogRequest)
if _err != nil {
return paths, _err
}
for _, DomainLogDetail := range res.Body.DomainLogDetails.DomainLogDetail {
for _, LogInfoDetail := range DomainLogDetail.LogInfos.LogInfoDetail {
paths = append(paths, tea.StringValue(LogInfoDetail.LogPath))
}
}
//fmt.Println(res)
return paths, _err
}
整体代码
package main
import (
"bufio"
"compress/gzip"
"context"
"fmt"
"io"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
"toolTest/common"
"github.com/olivere/elastic/v7"
)
type Elk struct {
Client *elastic.Client
}
type Cdn struct {
Time time.Time
DateTime string
Ip string
Country string
Vpn string
ResponseTime string
Referer string
Method string
Url string
HttpCode string
RequestSize string
ResponseSize string
CacheStatus string
UAHeard string
ContentType string
Content string
LogModule string
}
const (
url = "http://es-cn-zv5xxxxx.elasticsearch.aliyuncs.com:9200"
user = "elastic"
password = "000000"
bulkActions = 5000
workerNum = 20
TimeForm = "2/Jan/2006:15:04:05"
TimeForm2 = "2006-01-02 15:04:05"
)
func main() {
StartRunCdn()
}
func StartRunCdn() {
//ip.InitRegion()
sysPath, _ := os.Getwd()
dirPath := sysPath + "\CndLog"
_, err := os.Stat(dirPath)
if err != nil {
if os.IsNotExist(err) {
err = os.Mkdir(dirPath, 0777)
if err != nil {
fmt.Println(fmt.Sprintf("mkdir path:%v,is err:%v", dirPath, err.Error()))
return
}
} else {
fmt.Println(fmt.Sprintf("mkdir path:%v,is err:%v", dirPath, err.Error()))
return
}
}
startTime, err := time.Parse(TimeForm2, "2021-12-21 00:00:00")
if err != nil {
fmt.Println(err.Error())
}
endTime, err := time.Parse(TimeForm2, "2021-12-21 01:00:00")
if err != nil {
fmt.Println(err.Error())
}
elk, err := NewElk(url, user, password)
if err != nil {
fmt.Println(fmt.Sprintf("NewElk is err:%v", err.Error()))
return
}
for j := 0; j < 1; j++ {
startTimeStr := startTime.AddDate(0, 0, j).Format("2006-01-02T15:04:05Z")
endTimeStr := endTime.AddDate(0, 0, j).Format("2006-01-02T15:04:05Z")
paths, _ := common.GetDownData(startTimeStr, endTimeStr)//调用API获取下载路径
fmt.Println(fmt.Sprintf("startTime:%s;\nendTime:%s;\npwd:%s;\npathCount:%v;\npath:%v", startTimeStr, endTimeStr, sysPath, len(paths), paths))
RunTask(elk, dirPath, paths)
}
}
func RunTask(elk *Elk, dirPath string, paths []string) {
//var wg sync.WaitGroup
for i, path := range paths {
//wg.Add(1)
func(index int) {
//defer wg.Done()
fmt.Println(path)
logTime := time.Now().Format("2006-01-02T")
gzipPath := dirPath + "\" + logTime + strconv.Itoa(index) + ".gz"
filePath := dirPath + "\" + logTime + strconv.Itoa(index) + ".txt"
err := downloadFile("https://"+path, gzipPath)//下载文件
if err != nil {
fmt.Println(fmt.Sprintf("path:%v;gzipPath:%v;downloadFile:%v", path, gzipPath, err.Error()))
}
err = ReadGzipFile(gzipPath, filePath)//解压文件
if err != nil {
fmt.Println(fmt.Sprintf("gzipPath:%v;filePath:%v;ReadGzipFile:%v", gzipPath, filePath, err.Error()))
}
ReadLineFileSync(index, filePath, elk)//读取文件并写入ES
err = os.RemoveAll(gzipPath)//删除下载文件
if err != nil {
fmt.Println("del dir error:", err)
}
err = os.RemoveAll(filePath)//删除解压文件
if err != nil {
fmt.Println("del dir error:", err)
}
}(i)
}
//wg.Wait()
}
// toStruct cdn数据转结构体
func toStruct(str string) (cdn *Cdn) {
arrStr := strings.Split(str, " ")
if len(arrStr) > 0 {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
cdnTime, err := time.Parse(TimeForm, strings.TrimLeft(arrStr[0], "["))
if err != nil {
fmt.Println(err.Error())
}
//c, err := ip.GetCountry(arrStr[2])
//if err != nil {
// fmt.Println(err.Error())
//}
cdn = &Cdn{
LogModule: "CDN",
Content: str,
Time: cdnTime,
DateTime: arrStr[0] + arrStr[1],
Ip: arrStr[2],
//Country: c.Country.Names["zh-CN"],
Vpn: arrStr[3],
ResponseTime: arrStr[4],
Referer: arrStr[5],
Method: arrStr[6],
Url: arrStr[7],
HttpCode: arrStr[8],
RequestSize: arrStr[9],
ResponseSize: arrStr[10],
CacheStatus: arrStr[11],
}
if arrStr[len(arrStr)-1] == "charset=UTF-8\"" && arrStr[len(arrStr)-2] == "\"text/html;" {
for i := 12; i < len(arrStr)-2; i++ {
cdn.UAHeard += arrStr[i]
}
cdn.ContentType = arrStr[len(arrStr)-2] + arrStr[len(arrStr)-1]
} else {
for i := 12; i < len(arrStr)-1; i++ {
cdn.UAHeard += arrStr[i]
}
cdn.ContentType = arrStr[len(arrStr)-1]
}
}
return cdn
}
func downloadFile(url string, path string) error {
// Get the data
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
// Create output file
out, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0777)
if err != nil {
return err
}
defer out.Close()
// copy stream
_, err = io.Copy(out, resp.Body)
if err != nil {
return err
}
return err
}
func ReadGzipFile(url string, path string) error {
gzipFile, _ := os.Open(url)
defer gzipFile.Close()
gzipReader, _ := gzip.NewReader(gzipFile)
defer gzipReader.Close()
outfileWriter, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0777)
defer outfileWriter.Close()
if err != nil {
return err
}
// 复制内容
_, err = io.Copy(outfileWriter, gzipReader)
if err != nil {
return err
}
return err
}
BulkProcessor流程图
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)