golang基于kafka消息队列的日志收集项目(2)

golang基于kafka消息队列的日志收集项目(2),第1张

配置文件的logagent 1、读配置文件使用ini加载配置文件的包

注意在造结构体反射配置文件时应该应该注意“tag”否则无法映射

type Config struct {
   KafkaConfig `ini:"kafka"`
   CollectConfig `ini:"collect"`
}
type KafkaConfig struct {
   Address string `ini:"address"`
   Topic string `ini:"topic"`
}
type CollectConfig struct {
   LogFilePath string `ini:"logfile_path"`
}

读配置文件


var configObj =new(Config)
//cfg,err:= ini.Load("./conf/config.ini")
//if err!=nil{
// logrus.Error("load config faild :%v",err)
// return
//}
//kafkaaddr:=cfg.Section("kafka").Key("address").String()
//fmt.Println(kafkaaddr)
err:=ini.MapTo(configObj,"./conf/config.ini")
if err!=nil{
   logrus.Error("load config faild :%v",err)
   return
}
2、初始化
//  初始化生产者
//生产者配置
config:=sarama.NewConfig()
//配置kafka的ack回传级别
config.Producer.RequiredAcks=sarama.WaitForAll
//发送到那个分区
config.Producer.Partitioner=sarama.NewRandomPartitioner
// 成功交付信息
config.Producer.Return.Successes=true



// 连接kafka
Client,err=sarama.NewSyncProducer(address,config)
if err!=nil {
   logrus.Error("kafka:producer closer err:",err)
   return
}
3、使用tail读日志
var TailObj *tail.Tail
func Init(fileName string)(err error){

   config:=tail.Config{
      ReOpen:true,
      Follow:true,
      Location:&tail.SeekInfo{Offset:0,Whence:2},
      MustExist:false,
      Poll:true,
   }
   //打开文件
   TailObj,err=tail.TailFile(fileName,config)
 4、使用sarama发送到kafka
	//循环读数据
	for {
		line,ok:=<-tailfile.TailObj.Lines
		if !ok {
			logrus.Warn("tail file close reopen,filename:%s\n",tailfile.TailObj.Filename)
			time.Sleep(time.Second*1)
			continue
		}
		//	利用通道  改为异步并发
		//读出来line改为msg信息
		msg:=&sarama.ProducerMessage{}
		msg.Topic="web_log"
		msg.Value=sarama.StringEncoder(line.Text)
		kafka.MsgChan<-msg
	}
}

使用通道 为了实现并行  

etcd版本配置logagent 初始化etcd 启动连接etcd
err=etcd.Init([]string{configObj.EtcConfig.Address})
var cli *clientv3.Client
func Init(addr []string)(err error){
   cli, err = clientv3.New(clientv3.Config{
      Endpoints:   addr,
      DialTimeout: 5 * time.Second,
   })
   if err != nil {
      // handle error!
      fmt.Printf("connect to etcd failed, err:%v\n", err)
      return
   }
   return
}
拉取日志收集配置项
allConf,err:=etcd.GetConf(configObj.EtcConfig.CollectKey)

etcd键值对中值采样json格式,包含多个结构体,每个结构体对应一个日志收集任务,

[
    {
        "path":"d:\logs\test.log",
        "topic":"s4_log"
    },
    {
        "path":"e:\logs\web.log",
        "topic":"web_log"
    }
]
//拉取日志收集配置项的函数
func GetConf(key string)(collectEntryList []common.CollectEntry,err error){
   ctx,cancel:=context.WithTimeout(context.Background(),time.Second*2)
   defer cancel()
   resp,err:= cli.Get(ctx,key)
   if err!=nil{
      logrus.Errorf("get conf from etcd error by %s failed err :%s",key,err)
      return
   }
   if len(resp.Kvs)==0{
      logrus.Warningf("get len=0 conf from etcd error by %s",key)
      return
   }
   ret :=resp.Kvs[0]

   err=json.Unmarshal(ret.Value,&collectEntryList)
   if err!=nil{
      logrus.Errorf("json unmarshal failed err :%s",err)
      return
   }
   return
}

返回的是要收集的日志列表

创建tailFile去收集日志发送到kfaka
//3.使用tail读日志
err = tailfile.Init(allConf)
func Init(allConf []common.CollectEntry)(err error){
   //allConf 存了好多个日志收集项
   //每一个收集项 创建一个tailobj



   for _,conf:=range allConf{
      tt:=newTailTask(conf.Path,conf.Topic)//创建一个日志收集任务
      err :=tt.Init()//初始化一个日志收集任务
      if err!=nil{
         logrus.Errorf("create tailobj for path :%s failed err:%v",conf.Path,err)
         continue
      }
      logrus.Infof("create a tail task for path: %s success",conf.Path)
      //去干活 收集日志
      go tt.run()
   }

   return
}

run函数将从自己的tail任务中读取日志信息,到消息管道中,kafka再从管道中取信息(这里的管道是为了减少函数调用,让处理效率增加)

 实现当etcd加载的配置更新时,有新的日志收集任务去处理

可以使项目不停止的情况下  更新要收集的日志地址

拉取配置后就应该派人去监视etcd的配置信息是否有更新

1.使用etcd的cli启动watch关注key对应的值的变化
/派小弟去监视etcd中configObj.EtcConfig.CollectKey的变化
go etcd.WatchConf(configObj.EtcConfig.CollectKey)
// 监视etcd中日志收集项的配置变化
func WatchConf(key string){
   wCh:=cli.Watch(context.Background(),key)
   var newConf []common.CollectEntry
   for wresp :=range wCh{
      logrus.Info("new conf from etc ")
      for _,evt:=range wresp.Events{
         fmt.Printf("type :%s ,key: %s , value :%s",evt.Type,evt.Kv.Key,evt.Kv.Value)
         err:=json.Unmarshal(evt.Kv.Value,&newConf)
         if err!=nil{
            logrus.Errorf("json newconf failed err: %v",err)
            continue
         }
         //告诉tailfile 模块启用新的配置!
         tailfile.SendNewConf(newConf)//没有人接收 阻塞住了
      }
   }
}

将conf的变化放到新的切片newConf中,调用tailFile的方法,将新的切片放到管道里,

//初始化新配置管道
confChan=make(chan []common.CollectEntry)//做一个阻塞的配置  没有新配置我就在这等着
func SendNewConf(newConf []common.CollectEntry){
   confChan<-newConf
}

tailefile派个小弟实时检测接收这个管道,因为管道是阻塞的当有新配置时,

//派小弟监视新配置来没来
newConf:=<- confChan//取到值说明新配置来了
处理新的配置管理之前的配置

1.原来有的别动

2.原来没有的加上

3.原来有新配置没有的,删除原来的配置

上边的etcd.watchConf()出现一个问题,新配置来一次,函数就执行完毕退出了,所以应修改为

// 监视etcd中日志收集项的配置变化
func WatchConf(key string){
   for{
      wCh:=cli.Watch(context.Background(),key)
      var newConf []common.CollectEntry
      for wresp :=range wCh{
         logrus.Info("new conf from etc ")
         for _,evt:=range wresp.Events{
            fmt.Printf("type :%s ,key: %s , value :%s",evt.Type,evt.Kv.Key,evt.Kv.Value)
            err:=json.Unmarshal(evt.Kv.Value,&newConf)
            if err!=nil{
               logrus.Errorf("json newconf failed err: %v",err)
               continue
            }
            //告诉tailfile 模块启用新的配置!
            tailfile.SendNewConf(newConf)//没有人接收 阻塞住了
         }
      }
   }
}

使用for死循环不停监视键值对中值的变化

其次要创建一个管理新的TailFiletask的结构体

// 用来管理 tailfile
type tailTaskMgr struct {
   tailTaskMap map[string]*tailTask//所有task任务
   CollectEntryList []common.CollectEntry//所有配置项
   confChan chan []common.CollectEntry//等待新配置的通道
}

使用tail读日志的Init函数也要修改,有小弟去watch()等待新的配置

func Init(allConf []common.CollectEntry)(err error){
   //allConf 存了好多个日志收集项
   //每一个收集项 创建一个tailobj
   ttMgr = &tailTaskMgr{
      tailTaskMap:make(map[string]*tailTask,20),
      CollectEntryList:allConf,
      confChan:make(chan []common.CollectEntry),
   }
   for _,conf:=range allConf{
      tt:=newTailTask(conf.Path,conf.Topic)//创建一个日志收集任务
      err :=tt.Init()//初始化一个日志收集任务
      if err!=nil{
         logrus.Errorf("create tailobj for path :%s failed err:%v",conf.Path,err)
         continue
      }
      logrus.Infof("create a tail task for path: %s success",conf.Path)
      ttMgr.tailTaskMap[tt.path]=tt//把创建的tailTask保存起来,方便后续管理
      //去干活 收集日志
      go tt.run()
   }
    go ttMgr.watch()//等新配置

   return
}

这里的ttMgr.watch() 要死循环一直有人接收管道信息,因为新配置放在管道了,如果不死循环会只能更改一次配置

func (t *tailTaskMgr)watch(){
   for {
      //派一个小弟监视新配置来没来
      newConf:=<- t.confChan//取到值说明新配置来了
      //来了新配置管理一下我之前的tailTask
      logrus.Infof("get new conf from etcd conf:%v,start manage tailTsak...",newConf)
      for _,conf:=range newConf{
         //1、原来已经存在的就别动
         if t.isExist(conf) {
            continue
         }
         //2、原来没有的  新创建一个
         tt:=newTailTask(conf.Path,conf.Topic)//创建一个日志收集任务
         err :=tt.Init()//初始化一个日志收集任务
         if err!=nil{
            logrus.Errorf("create tailobj for path :%s failed err:%v",conf.Path,err)
            continue
         }
         t.tailTaskMap[tt.path]=tt
         //去干活 收集日志
         go tt.run()
         //3、原来有现在没有的  停止
      }
   }
}

处理第三种情景  原来存在  但是新的配置中没有的情况

首先更改Task任务结构体,我们的管理任务的结构体中有map存放任务,新的任务结构体更改为:

type tailTask struct {
   path  string
   topic string
   //收集日志的实例
   tobj *tail.Tail
   // 创建一个cancel函数  用于停止goroutine
   ctx    context.Context
   cancel context.CancelFunc
}

新增ctx和cancel字段,因为可以在想结束goroutine时调用cancel()结束函数

//3、原来有现在没有的  停止
// 找出tailTaskMap中存在,但是新的newConf中不存在的
for key, task := range t.tailTaskMap {
   var found bool
   for _, conf := range newConf {
      if key == conf.Path {
         found = true
         break
      }
   }
   if !found {
      // 没找到 这个任务应该停下来
      // 这里使用的是context上下文管理的停止,在日志收集结构体中加上ctx
      //和cancel()
      logrus.Infof("the task collect path :%s need to stop ", task.path)
      //别忘了在管理的map中删除
      delete(t.tailTaskMap, key)
      task.cancel()
   }
}

当遍历新的和旧的任务集合时,旧的需要删除,就将调用旧任务的cancel()函数,并**且别忘了删除map中的具体任务**,

我们日志收集任务的func (t *tailTask) run() 将做修改

func (t *tailTask) run() {
   // 读取日志发送到kafka
   logrus.Infof("create a tail task for path: %s success", t.path)

   //循环读数据
   for {
      select {
      case <-t.ctx.Done(): //调用cancel()就会触发
         logrus.Infof("path : %s is stopping... ", t.path)
         return
      case line, ok := <-t.tobj.Lines:

         if !ok {
            logrus.Warn("tail file close reopen,path:%s\n", t.path)
            time.Sleep(time.Second * 1)
            continue
         }
         //如果是空行直接略过 但是在windows是有换行符
         if len(strings.Trim(line.Text, "\r")) == 0 {
            continue
         }
         // 利用通道  改为异步并发
         //读出来line改为msg信息
         msg := &sarama.ProducerMessage{}
         msg.Topic = t.topic //自己的topic
         msg.Value = sarama.StringEncoder(line.Text)
         kafka.ToMsgChan(msg)

      }
   }
}

死循环读取 日志文件信息发往kafka中,使用select分支观察管理任务的函数是否调用cancel(),使用的是ctx.Done(),调用就终止函数

为验证etcd动态加载配置文件,博客链接(https://blog.csdn.net/m0_51490886/article/details/123977140)中演示更新etcd,配置文件键值对。 至此还有问题:现在出现的问题是 etcd拉取的配置全都一样,当我的logagent部署到不同的服务器上时应该有不同的配置相对应,后续将更新通过服务器ip对应自己的配置文件

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/990478.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存