注意在造结构体反射配置文件时应该应该注意“tag”否则无法映射
type Config struct {
KafkaConfig `ini:"kafka"`
CollectConfig `ini:"collect"`
}
type KafkaConfig struct {
Address string `ini:"address"`
Topic string `ini:"topic"`
}
type CollectConfig struct {
LogFilePath string `ini:"logfile_path"`
}
读配置文件
var configObj =new(Config)
//cfg,err:= ini.Load("./conf/config.ini")
//if err!=nil{
// logrus.Error("load config faild :%v",err)
// return
//}
//kafkaaddr:=cfg.Section("kafka").Key("address").String()
//fmt.Println(kafkaaddr)
err:=ini.MapTo(configObj,"./conf/config.ini")
if err!=nil{
logrus.Error("load config faild :%v",err)
return
}
2、初始化
// 初始化生产者
//生产者配置
config:=sarama.NewConfig()
//配置kafka的ack回传级别
config.Producer.RequiredAcks=sarama.WaitForAll
//发送到那个分区
config.Producer.Partitioner=sarama.NewRandomPartitioner
// 成功交付信息
config.Producer.Return.Successes=true
// 连接kafka
Client,err=sarama.NewSyncProducer(address,config)
if err!=nil {
logrus.Error("kafka:producer closer err:",err)
return
}
3、使用tail读日志
var TailObj *tail.Tail
func Init(fileName string)(err error){
config:=tail.Config{
ReOpen:true,
Follow:true,
Location:&tail.SeekInfo{Offset:0,Whence:2},
MustExist:false,
Poll:true,
}
//打开文件
TailObj,err=tail.TailFile(fileName,config)
4、使用sarama发送到kafka
//循环读数据
for {
line,ok:=<-tailfile.TailObj.Lines
if !ok {
logrus.Warn("tail file close reopen,filename:%s\n",tailfile.TailObj.Filename)
time.Sleep(time.Second*1)
continue
}
// 利用通道 改为异步并发
//读出来line改为msg信息
msg:=&sarama.ProducerMessage{}
msg.Topic="web_log"
msg.Value=sarama.StringEncoder(line.Text)
kafka.MsgChan<-msg
}
}
使用通道 为了实现并行
etcd版本配置logagent 初始化etcd 启动连接etcderr=etcd.Init([]string{configObj.EtcConfig.Address})
var cli *clientv3.Client
func Init(addr []string)(err error){
cli, err = clientv3.New(clientv3.Config{
Endpoints: addr,
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
return
}
拉取日志收集配置项
allConf,err:=etcd.GetConf(configObj.EtcConfig.CollectKey)
etcd键值对中值采样json格式,包含多个结构体,每个结构体对应一个日志收集任务,
[
{
"path":"d:\logs\test.log",
"topic":"s4_log"
},
{
"path":"e:\logs\web.log",
"topic":"web_log"
}
]
//拉取日志收集配置项的函数
func GetConf(key string)(collectEntryList []common.CollectEntry,err error){
ctx,cancel:=context.WithTimeout(context.Background(),time.Second*2)
defer cancel()
resp,err:= cli.Get(ctx,key)
if err!=nil{
logrus.Errorf("get conf from etcd error by %s failed err :%s",key,err)
return
}
if len(resp.Kvs)==0{
logrus.Warningf("get len=0 conf from etcd error by %s",key)
return
}
ret :=resp.Kvs[0]
err=json.Unmarshal(ret.Value,&collectEntryList)
if err!=nil{
logrus.Errorf("json unmarshal failed err :%s",err)
return
}
return
}
返回的是要收集的日志列表
创建tailFile去收集日志发送到kfaka//3.使用tail读日志
err = tailfile.Init(allConf)
func Init(allConf []common.CollectEntry)(err error){
//allConf 存了好多个日志收集项
//每一个收集项 创建一个tailobj
for _,conf:=range allConf{
tt:=newTailTask(conf.Path,conf.Topic)//创建一个日志收集任务
err :=tt.Init()//初始化一个日志收集任务
if err!=nil{
logrus.Errorf("create tailobj for path :%s failed err:%v",conf.Path,err)
continue
}
logrus.Infof("create a tail task for path: %s success",conf.Path)
//去干活 收集日志
go tt.run()
}
return
}
run函数将从自己的tail任务中读取日志信息,到消息管道中,kafka再从管道中取信息(这里的管道是为了减少函数调用,让处理效率增加)
实现当etcd加载的配置更新时,有新的日志收集任务去处理可以使项目不停止的情况下 更新要收集的日志地址
拉取配置后就应该派人去监视etcd的配置信息是否有更新
1.使用etcd的cli启动watch关注key对应的值的变化/派小弟去监视etcd中configObj.EtcConfig.CollectKey的变化
go etcd.WatchConf(configObj.EtcConfig.CollectKey)
// 监视etcd中日志收集项的配置变化
func WatchConf(key string){
wCh:=cli.Watch(context.Background(),key)
var newConf []common.CollectEntry
for wresp :=range wCh{
logrus.Info("new conf from etc ")
for _,evt:=range wresp.Events{
fmt.Printf("type :%s ,key: %s , value :%s",evt.Type,evt.Kv.Key,evt.Kv.Value)
err:=json.Unmarshal(evt.Kv.Value,&newConf)
if err!=nil{
logrus.Errorf("json newconf failed err: %v",err)
continue
}
//告诉tailfile 模块启用新的配置!
tailfile.SendNewConf(newConf)//没有人接收 阻塞住了
}
}
}
将conf的变化放到新的切片newConf中,调用tailFile的方法,将新的切片放到管道里,
//初始化新配置管道
confChan=make(chan []common.CollectEntry)//做一个阻塞的配置 没有新配置我就在这等着
func SendNewConf(newConf []common.CollectEntry){
confChan<-newConf
}
tailefile派个小弟实时检测接收这个管道,因为管道是阻塞的当有新配置时,
//派小弟监视新配置来没来
newConf:=<- confChan//取到值说明新配置来了
处理新的配置管理之前的配置
1.原来有的别动
2.原来没有的加上
3.原来有新配置没有的,删除原来的配置
上边的etcd.watchConf()出现一个问题,新配置来一次,函数就执行完毕退出了,所以应修改为
// 监视etcd中日志收集项的配置变化
func WatchConf(key string){
for{
wCh:=cli.Watch(context.Background(),key)
var newConf []common.CollectEntry
for wresp :=range wCh{
logrus.Info("new conf from etc ")
for _,evt:=range wresp.Events{
fmt.Printf("type :%s ,key: %s , value :%s",evt.Type,evt.Kv.Key,evt.Kv.Value)
err:=json.Unmarshal(evt.Kv.Value,&newConf)
if err!=nil{
logrus.Errorf("json newconf failed err: %v",err)
continue
}
//告诉tailfile 模块启用新的配置!
tailfile.SendNewConf(newConf)//没有人接收 阻塞住了
}
}
}
}
使用for死循环不停监视键值对中值的变化
其次要创建一个管理新的TailFiletask的结构体
// 用来管理 tailfile
type tailTaskMgr struct {
tailTaskMap map[string]*tailTask//所有task任务
CollectEntryList []common.CollectEntry//所有配置项
confChan chan []common.CollectEntry//等待新配置的通道
}
使用tail读日志的Init函数也要修改,有小弟去watch()等待新的配置
func Init(allConf []common.CollectEntry)(err error){
//allConf 存了好多个日志收集项
//每一个收集项 创建一个tailobj
ttMgr = &tailTaskMgr{
tailTaskMap:make(map[string]*tailTask,20),
CollectEntryList:allConf,
confChan:make(chan []common.CollectEntry),
}
for _,conf:=range allConf{
tt:=newTailTask(conf.Path,conf.Topic)//创建一个日志收集任务
err :=tt.Init()//初始化一个日志收集任务
if err!=nil{
logrus.Errorf("create tailobj for path :%s failed err:%v",conf.Path,err)
continue
}
logrus.Infof("create a tail task for path: %s success",conf.Path)
ttMgr.tailTaskMap[tt.path]=tt//把创建的tailTask保存起来,方便后续管理
//去干活 收集日志
go tt.run()
}
go ttMgr.watch()//等新配置
return
}
这里的ttMgr.watch() 要死循环一直有人接收管道信息,因为新配置放在管道了,如果不死循环会只能更改一次配置
func (t *tailTaskMgr)watch(){
for {
//派一个小弟监视新配置来没来
newConf:=<- t.confChan//取到值说明新配置来了
//来了新配置管理一下我之前的tailTask
logrus.Infof("get new conf from etcd conf:%v,start manage tailTsak...",newConf)
for _,conf:=range newConf{
//1、原来已经存在的就别动
if t.isExist(conf) {
continue
}
//2、原来没有的 新创建一个
tt:=newTailTask(conf.Path,conf.Topic)//创建一个日志收集任务
err :=tt.Init()//初始化一个日志收集任务
if err!=nil{
logrus.Errorf("create tailobj for path :%s failed err:%v",conf.Path,err)
continue
}
t.tailTaskMap[tt.path]=tt
//去干活 收集日志
go tt.run()
//3、原来有现在没有的 停止
}
}
}
处理第三种情景 原来存在 但是新的配置中没有的情况
首先更改Task任务结构体,我们的管理任务的结构体中有map存放任务,新的任务结构体更改为:
type tailTask struct {
path string
topic string
//收集日志的实例
tobj *tail.Tail
// 创建一个cancel函数 用于停止goroutine
ctx context.Context
cancel context.CancelFunc
}
新增ctx和cancel字段,因为可以在想结束goroutine时调用cancel()结束函数
//3、原来有现在没有的 停止
// 找出tailTaskMap中存在,但是新的newConf中不存在的
for key, task := range t.tailTaskMap {
var found bool
for _, conf := range newConf {
if key == conf.Path {
found = true
break
}
}
if !found {
// 没找到 这个任务应该停下来
// 这里使用的是context上下文管理的停止,在日志收集结构体中加上ctx
//和cancel()
logrus.Infof("the task collect path :%s need to stop ", task.path)
//别忘了在管理的map中删除
delete(t.tailTaskMap, key)
task.cancel()
}
}
当遍历新的和旧的任务集合时,旧的需要删除,就将调用旧任务的cancel()函数,并**且别忘了删除map中的具体任务**,
我们日志收集任务的func (t *tailTask) run() 将做修改
func (t *tailTask) run() {
// 读取日志发送到kafka
logrus.Infof("create a tail task for path: %s success", t.path)
//循环读数据
for {
select {
case <-t.ctx.Done(): //调用cancel()就会触发
logrus.Infof("path : %s is stopping... ", t.path)
return
case line, ok := <-t.tobj.Lines:
if !ok {
logrus.Warn("tail file close reopen,path:%s\n", t.path)
time.Sleep(time.Second * 1)
continue
}
//如果是空行直接略过 但是在windows是有换行符
if len(strings.Trim(line.Text, "\r")) == 0 {
continue
}
// 利用通道 改为异步并发
//读出来line改为msg信息
msg := &sarama.ProducerMessage{}
msg.Topic = t.topic //自己的topic
msg.Value = sarama.StringEncoder(line.Text)
kafka.ToMsgChan(msg)
}
}
}
死循环读取 日志文件信息发往kafka中,使用select分支观察管理任务的函数是否调用cancel(),使用的是ctx.Done(),调用就终止函数
为验证etcd动态加载配置文件,博客链接(https://blog.csdn.net/m0_51490886/article/details/123977140)中演示更新etcd,配置文件键值对。 至此还有问题:现在出现的问题是 etcd拉取的配置全都一样,当我的logagent部署到不同的服务器上时应该有不同的配置相对应,后续将更新通过服务器ip对应自己的配置文件欢迎分享,转载请注明来源:内存溢出
评论列表(0条)