传统ELK架构的日志收集:
存在的问题:Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患。适用于小规模的集群使用。
第二种架构:
位于各个节点上的Log Agent先将数据/日志传递给Kafka,并将队列中消息或数据交由Log Transfer,传递给Elasticsearch存储。最后由Kibana将日志和数据呈现给用户。因为引入了Kafka,数据会先被存储下来,所以即使Logstash server因故障停止运行,数据也不会丢失。这种架构适合于较大集群使用
各组件介绍:
LogAgent:日志收集客户端,用来收集服务器上的日志
Kafka:高吞吐量的分布式队列(Linkin开发,apache顶级开源项目),消息队列和日志存储。
ElasticSearch:开源的搜索引擎,提供介于HTTP RESTful的web接口
Kibana:开源的ES数据分析和可视化工具。
Hadoop:分布式计算框架,能够对大量数据进行分布式处理的平台。
Storm:一个免费并开源的分布式实时计算系统
引用链接
Kafka和tailf的参考链接
Zookeeper:ZooKeeper是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper通过其简单的架构和API解决了这个问题。
Zookeeper扮演红色角色
ElasticSearch:是一个分布式可扩展的实时搜索和分析引擎,一个建立在全文搜索引擎 Apache Lucene™ 基础上的搜索引擎.当然 Elasticsearch 并不仅仅是 Lucene 那么简单,它不仅包括了全文搜索功能,还可以进行以下工作:
分布式实时文件存储,并将每一个字段都编入索引,使其可以被搜索。
实时分析的分布式搜索引擎。
可以扩展到上百台服务器,处理PB级别的结构化或非结构化数据。
Kibana是一个开源的分析与可视化平台,设计出来用于和Elasticsearch一起使用的。你可以用kibana搜索、查看存放在Elasticsearch中的数据。Kibana与Elasticsearch的交互方式是各种不同的图表、表格、地图等,直观的展示数据,从而达到高级的数据分析与可视化的目的。
Elasticsearch、Logstash和Kibana这三个技术就是我们常说的ELK技术栈,可以说这三个技术的组合是大数据领域中一个很巧妙的设计。一种很典型的MVC思想,模型持久层,视图层和控制层。Logstash担任控制层的角色,负责搜集和过滤数据。Elasticsearch担任数据持久层的角色,负责储存数据。而我们这章的主题Kibana担任视图层角色,拥有各种维度的查询和分析,并使用图形化的界面展示存放在Elasticsearch中的数据。
etcd 是一个分布式键值对存储系统,由coreos 开发,内部采用 raft 协议作为一致性算法,用于可靠、快速地保存关键数据,并提供访问。通过分布式锁、leader选举和写屏障(write barriers),来实现可靠的分布式协作。etcd集群是为高可用、持久化数据存储和检索而准备。
etcd架构图:
源码:
logagent包
config.ini
[kafka]
address=127.0.0.1:9092
chan_max_size=100000
[etcd]
address=127.0.0.1:2379
timeout=5
collect_log_key=/log/%s/collect_config
[taillog]
filename=./my.log
timeout=5
config.go
package conf
type AppConf struct {
KafkaConf `ini:"kafka"`
EtcdConf `ini:"etcd"`
}
type KafkaConf struct {
Address string `ini:"address"`
ChanMaxSize int `ini:"chan_max_size"`
}
type EtcdConf struct {
Address string `ini:"address"`
Key string `ini:"collect_log_key"`
Timeout int `ini:"timeout"`
}
type TaillogConf struct {
Filename string `ini:"filename"`
}
etcd.go
package etcd
import (
"context"
"encoding/json"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
var (
cli *clientv3.Client
)
//需要收集的日志的配置信息
type LogEntry struct {
Path string `json:"path"`
Topic string `json:"topic"`
}
//初始化etcd的函数
func Init(addr string, timeout time.Duration) {
var err error
cli, err = clientv3.New(clientv3.Config{
Endpoints: []string{addr},
DialTimeout: timeout,
})
if err != nil {
fmt.Println("connect to etcd success")
return
}
fmt.Println("connect to etcd success")
}
//从etcd中获取根据key配置项
func GetConf(key string) (LogEntryConf []*LogEntry, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, key)
cancel()
if err != nil {
fmt.Println("put to etcd failed,err:", err)
return
}
for _, ev := range resp.Kvs {
err = json.Unmarshal(ev.Value, &LogEntryConf)
if err != nil {
fmt.Println("unmarshal etcd value failed,err:", err)
return
}
fmt.Printf("value:%s\n", ev.Value)
}
return
}
//etcd_watch
func WatchConf(key string, newConfCh chan<- []*LogEntry) {
ch := cli.Watch(context.Background(), key)
//从通道尝试取值(监视的信息)
for wresp := range ch {
for _, evt := range wresp.Events {
fmt.Printf("Type:%v key:%v value:%v\n", evt.Type, string(evt.Kv.Key), string(evt.Kv.Value))
//通知taillog.tskMgr
//1.先判断 *** 作的类型
var newConf []*LogEntry
// if evt.Type != clientv3.EventTypeDelete{
// //如果是删除 *** 作,手动传递一个空的配置项
// err := json.Unmarshal(evt.Kv.Value, &newConf)
// if err != nil {
// fmt.Println("unmarshal failed,err:", err)
// continue
// }
err := json.Unmarshal(evt.Kv.Value, &newConf)
if err != nil {
fmt.Println("unmarshal failed,err:", err)
continue
}
fmt.Println("get new conf:", newConf)
newConfCh <- newConf
}
}
}
etcd_put.go
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"}, //节点
DialTimeout: 5 * time.Second, //超过5秒钟连不上超时
})
if err != nil {
fmt.Println("connect to etcd failed:", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
value := `[{"path":"d:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"},{"path":"d:/xxx/mysql.log","topic":"mysql_log"}]`
_, err = cli.Put(ctx, "/log/192.168.1.7/collect_config", value)
cancel()
if err != nil {
fmt.Println("put to etcd failed,err:", err)
return
}
}
kafka.go
package kafka
//log Agent入口
import (
"fmt"
"time"
"github.com/Shopify/sarama"
)
type logData struct {
topic string
data string
}
var (
client sarama.SyncProducer //声明一个全局的连接kafka的生产者client
logDataChan chan *logData
)
//初始化client
func Init(addrs []string, maxSize int) (err error) {
config := sarama.NewConfig()
//tailf包使用
config.Producer.RequiredAcks = sarama.WaitForAll //发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner //新选出一个partition
config.Producer.Return.Successes = true //成功交付的消息将在success channel返回
//连接kafka
client, err = sarama.NewSyncProducer(addrs, config)
if err != nil {
fmt.Println("producer closed,err:", err)
return
}
fmt.Println("连接kafka成功!")
if err != nil {
fmt.Println("send msg failed,err:", err)
return
}
//初始化logDataChan
logDataChan = make(chan *logData, maxSize)
//开启后台的goroutine从通道中取数据发往kafka
go SendToKarfka()
return
}
//给外部暴露的一个函数,该函数只把日志数据发送到一个内部的channel中
func SendToChan(topic, data string) {
msg := &logData{
topic: topic,
data: data,
}
logDataChan <- msg
}
//真正往kafka发送日志的函数
func SendToKarfka() {
for {
select {
case ld := <-logDataChan:
//构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = ld.topic
msg.Value = sarama.StringEncoder(ld.data)
//发送到kafka
pid, offset, err := client.SendMessage(msg) //offset是写成功的文件的索引位置
if err != nil {
fmt.Println("send msg failed,err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
default:
time.Sleep(time.Millisecond * 50)
}
}
}
tail.go
package taillog
import (
"context"
"fmt"
"test/log/kafka"
"github.com/hpcloud/tail"
)
var (
tailObj *tail.Tail
LogChan chan string
)
//TailTask:一个日志收集的任务
type TailTask struct {
path string
topic string
instance *tail.Tail
//为了能够实现退出t.run()
ctx context.Context
cancelFunc context.CancelFunc
}
func NewTailTask(path, topic string) (tailObj *TailTask) {
ctx, cancel := context.WithCancel(context.Background())
tailObj = &TailTask{
path: path,
topic: topic,
ctx: ctx,
cancelFunc: cancel,
}
tailObj.init() //根据路径去打开对应的日志
return
}
func (t *TailTask) init() {
config := tail.Config{
ReOpen: true, //重新打开
Follow: true, //是否跟随
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件哪个地方开始读
MustExist: false, //文件不存在不报错
Poll: true,
}
var err error
t.instance, err = tail.TailFile(t.path, config)
if err != nil {
fmt.Println("tail file failed,err:", err)
}
go t.run() //直接去采集日志发送到kafka
}
func (t *TailTask) run() {
for {
select {
case <-t.ctx.Done():
fmt.Printf("tail tast:%v_%s finish...\n", t.path, t.topic)
return
case line := <-t.instance.Lines: //从tailObj的通道中一行一行的读取日志数据
// kafka.SendToKarfka(t.topic, line.Text) //函数调函数
//先把日志数据发到一个通道中
kafka.SendToChan(t.topic, line.Text)
//kafka那个包中有单独的goroutine去取日志数据发到kafka
}
}
}
tail_mgr.go
package taillog
import (
"fmt"
"test/log/etcd"
"time"
)
var tskMgr *taillogMgr
//tailTask 管理者
type taillogMgr struct {
logEntry []*etcd.LogEntry
tskMap map[string]*TailTask
newConfChan chan []*etcd.LogEntry
}
func Init(logEntryConf []*etcd.LogEntry) {
tskMgr = &taillogMgr{
logEntry: logEntryConf, //把当前的日志收集配置信息保存起来
tskMap: make(map[string]*TailTask, 16),
newConfChan: make(chan []*etcd.LogEntry), //无缓冲区的通道
}
for _, LogEntry := range logEntryConf {
//conf:*etcd.LogEntry
//logEntry.Path:要收集的日志文件的路径
//初始化的时候起了多少个tailtask都要记下来,为了后续判断方便
tailObj := NewTailTask(LogEntry.Path, LogEntry.Topic)
mk := fmt.Sprintf("%s_%s", LogEntry.Path, LogEntry.Topic)
tskMgr.tskMap[mk] = tailObj
}
go tskMgr.run()
}
//监听自己的newConfChan,有了新的配置过来之后就做对应的处理
func (t *taillogMgr) run() {
for {
select {
case newConf := <-t.newConfChan:
fmt.Println("新的配置来了!", newConf)
for _, conf := range newConf {
mk := fmt.Sprintf("%s_%s", conf.Path, conf.Topic)
_, ok := t.tskMap[mk]
if ok {
//原来就有,不需要 *** 作
continue
} else {
//新增的
tailObj := NewTailTask(conf.Path, conf.Topic)
t.tskMap[mk] = tailObj
}
}
//找出原来t.tskMap有,但是newConf中没有的,要删除
for _, c1 := range t.logEntry { //从原来的配置中依次拿出配置项
isDelete := true
for _, c2 := range newConf { //去新的配置中逐一进行比较
if c2.Path == c1.Path && c2.Topic == c1.Topic {
isDelete = false
continue
}
}
if isDelete {
//把c1对应的tailObj给停掉
mk := fmt.Sprintf("%s_%s", c1.Path, c1.Topic)
t.tskMap[mk].cancelFunc()
}
}
//1.配置新增
//2.配置删除
//3.配置变更
default:
time.Sleep(time.Second)
}
}
}
//一个函数,向外暴露tskMgr的newConfChan
func NewConfChan() chan<- []*etcd.LogEntry {
return tskMgr.newConfChan
}
ip.go
package utils
import (
"net"
"strings"
)
//GetOutboundIP 获取本地对外IP
func GetOutboundIP() (ip string, err error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return
}
defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
ip = strings.Split(localAddr.IP.String(), ":")[0]
return
}
main.go
package main
import (
"fmt"
"sync"
"test/log/conf"
"test/log/etcd"
"test/log/kafka"
"test/log/taillog"
"test/log/utils"
"time"
"gopkg.in/ini.v1"
)
var (
cfg = new(conf.AppConf)
)
func main() {
//0.加载配置文件
err := ini.MapTo(cfg, "./conf/config.ini")
if err != nil {
println("load ini failed,err:", err)
return
}
//1.初始化一个kafka连接
err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.ChanMaxSize)
if err != nil {
fmt.Println("init kafka failed,err:", err)
return
}
fmt.Println("初始化成功!")
//2.初始化etcd
etcd.Init(cfg.EtcdConf.Address, time.Duration(cfg.EtcdConf.Timeout)*time.Second)
if err != nil {
fmt.Println("init etcd failed,err:", err)
return
}
//为了实现每个logagent都拉取自己独有的配置,所以要以自己的IP地址作为区分
ipStr, err := utils.GetOutboundIP()
if err != nil {
panic(err)
}
etcdConfKey := fmt.Sprintf(cfg.EtcdConf.Key, ipStr)
fmt.Printf("etcdConfKey:%s\n", etcdConfKey)
//2.1 从etcd中获取日志收集项的配置信息
logEntryConf, err := etcd.GetConf(etcdConfKey)
if err != nil {
fmt.Println("etcd.GetConf failed,err:", err)
return
}
fmt.Println("get conf from etcd success:", logEntryConf)
//2.2 派一个哨兵去监视日志收集项的变化(有变化及时通知我的logAgent实现加载配置)
for index, value := range logEntryConf {
fmt.Printf("index:%v value:%v\n", index, value)
}
fmt.Println("init etcd success.")
//3.收集日志发往Kafka
//3.1 循环每一个日志收集项,创建一个TailObj
taillog.Init(logEntryConf)
//因为NewConfChan访问了tskMgr的newConfChan,这个channel是在taillog.Init(logEntryConf)执行的初始化
newConfChan := taillog.NewConfChan() //从taillog包中获取对外暴露的通道
var wg sync.WaitGroup
wg.Add(1)
go etcd.WatchConf(etcdConfKey, newConfChan) //哨兵发现最新的配置信息会通知上面的那个通道
wg.Wait()
//3.2发往Kafka
//4.打开日志文件准备收集日志
}
log_transfer包
cfg.ini
[kafka]
address=127.0.0.1:9092
topic=web_log
[es]
address=127.0.0.1:9200
size=100000
cfg.go
package conf
//LogTransfer 全局配置
type Logtransfer struct {
KafkaCfg `ini:"kafka"`
ESCfg `ini:"es"`
}
//Kafka...
type KafkaCfg struct {
Address string `ini:"address"`
Topic string `ini:"topic"`
}
//ESCfg
type ESCfg struct {
Address string `ini:"address"`
ChanSize int `ini:"size"`
}
es.go
package es
import (
"context"
"fmt"
"strings"
"time"
"github.com/olivere/elastic/v7"
)
//初始化ES,准备接收kafka那边发来的数据
type LogData struct {
Topic string `json:"topic"`
Data string `json:"data"`
}
var (
client *elastic.Client
ch chan *LogData
)
//init...
func Init(address string, chanSize int) (err error) {
if !strings.HasPrefix(address, "http://") {
address = "http://" + address
}
client, err = elastic.NewClient(elastic.SetURL(address))
if err != nil {
return
}
fmt.Println("connect to es success")
ch = make(chan *LogData, chanSize)
go SendToES()
return
}
// func SendToESChan(d *LogData) (err error) {
// msg := &LogData{}
// msg.Topic = d.Topic
// msg.Data = string(d.Data)
// _, err = client.Index().
// Index(d.Topic).
// BodyJson(msg).
// Do(context.Background())
// if err != nil {
// panic(err)
// }
// return
// }
func SendToESChan(msg *LogData) {
ch <- msg
}
//发送数据到ES
func SendToES() {
//链式 *** 作
for {
select {
case msg := <-ch:
put1, err := client.Index().
Index(msg.Topic). //Index表数据库
Type("xxx").
BodyJson(msg). //把一个go语言的对象转换为json格式
Do(context.Background())
if err != nil {
fmt.Println(err)
}
fmt.Printf("Indexed %s to index %s,type %s\n", put1.Id, put1.Index, put1.Type)
default:
time.Sleep(time.Second)
}
}
}
kafka.go
package kafka
import (
"fmt"
"test/log_transfer/es"
"github.com/Shopify/sarama"
)
//LogData...
type LogData struct {
Data string `json:"data"`
}
//初始化kafka消费者,从kafka取数据发往ES
func Init(addr []string, topic string) (err error) {
consumer, err := sarama.NewConsumer(addr, nil)
if err != nil {
fmt.Printf("fail to start consumer,err:%v\n", err)
return
}
partitionList, err := consumer.Partitions(topic) //根据topic取到所有的分区
if err != nil {
fmt.Println("fail to get list of partition:", err)
return
}
var pc sarama.PartitionConsumer
fmt.Println(partitionList)
for partition := range partitionList {
pc, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v", partition, err)
return
}
defer pc.AsyncClose()
//异步从每个分区消费消息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
//直接发给ES
var ld = es.LogData{
Topic: topic,
Data: string(msg.Value),
}
es.SendToESChan(&ld) //函数调函数
//优化一下,直接放到chann中
}
}(pc)
select {}
}
return
}
main.go
package main
import (
"fmt"
"test/log_transfer/conf"
"test/log_transfer/es"
"test/log_transfer/kafka"
"gopkg.in/ini.v1"
)
//log transfer
//将日志数据从kafka取出来发往ES
func main() {
//0.加载配置文件
var cfg conf.Logtransfer
err := ini.MapTo(&cfg, "./conf/cfg.ini")
if err != nil {
fmt.Println("init config err:", err)
return
}
fmt.Printf("cfg:%v\n", cfg)
//1.初始化ES
//1.1 初始化一个ES连接的client
//1.2 对外提供y一个往ES写入数据的一个函数
err = es.Init(cfg.ESCfg.Address, cfg.ESCfg.ChanSize)
if err != nil {
fmt.Println("init ES consumer failed,err:", err)
return
}
fmt.Println("init es success.")
//2.初始化Kafka
//2.1 连接kafka,创建分区的消费者
//2.2 每个分区的消费者分别取出数据,通过sendToES()将数据发往ES
err = kafka.Init([]string{cfg.KafkaCfg.Address}, cfg.KafkaCfg.Topic)
if err != nil {
fmt.Println("init kafka consumer failed,err:", err)
return
}
//3.从kafka中取数据
//4.发往ES
select {}
}
自己编写版):
config包:
config.ini
[kafka]
address=127.0.0.1:9092
chan_max_size=100000
[etcd]
address=127.0.0.1:2379
timeout=5
log_key=/log/collect_config
[es]
address=127.0.0.1:9200
size=100000
config.go
package config
import (
"context"
"github.com/hpcloud/tail"
)
type AppConf struct {
KafkaConf `ini:"kafka"`
EtcdConf `ini:"etcd"`
ESConf `ini:"es"`
}
type KafkaConf struct {
Address string `ini:"address"`
Max_size int `ini:"chan_max_size"`
}
type EtcdConf struct {
Address string `ini:"address"`
Timeout int `ini:"timeout"`
Log_key string `ini:"log_key"`
}
type ESConf struct {
Address string `ini:"address"`
Max_size int `ini:"size"`
}
type LogConf struct {
Path string `ini:"path"`
Topic string `ini:"topic"`
}
type LogEntryConf []*LogConf
type TailTask struct {
Path string
Topic string
Instance *tail.Tail
Ctx context.Context
CancelF context.CancelFunc
}
type LogData struct {
Topic string
Data string
}
es包
es.go
package es
import (
"context"
"fmt"
"test/mylog/config"
"time"
"github.com/olivere/elastic/v7"
)
var (
client *elastic.Client
ESchan chan *config.LogData
)
func Init(address string, size int) (err error) {
client, err = elastic.NewClient(elastic.SetURL(address))
if err != nil {
fmt.Println("Init ES failed,err:", err)
return
}
ESchan = make(chan *config.LogData, 1000)
go SendToES()
return
}
func SendToESChan(msg *config.LogData) {
ESchan <- msg
fmt.Println("sssss")
}
func SendToES() {
for {
select {
case msg := <-ESchan:
put1, err := client.Index().Index(msg.Topic).BodyJson(msg).Do(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("Index user:%s to index %s,type:%s\n", put1.Id, put1.Index, put1.Type)
default:
time.Sleep(time.Second)
}
}
}
etcd包
etcd.go
package etcd
import (
"context"
"encoding/json"
"fmt"
"time"
"test/mylog/config"
"test/mylog/tail"
"go.etcd.io/etcd/clientv3"
)
var (
client *clientv3.Client
logdata config.LogEntryConf
)
func Init(address []string, timeout int) (err error) {
client, err = clientv3.New(clientv3.Config{
Endpoints: address,
DialTimeout: time.Duration(timeout) * time.Second,
})
if err != nil {
fmt.Println("connect to etcd failed,err:\n", err)
return
}
fmt.Println("connect to etcd success!")
return
}
func GetConf(key string) (logconf config.LogEntryConf, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
var resp *clientv3.GetResponse
resp, err = client.Get(ctx, key)
cancel()
if err != nil {
fmt.Println("get from etcd failed,err:", err)
return
}
for _, ev := range resp.Kvs {
err = json.Unmarshal(ev.Value, &logconf)
if err != nil {
fmt.Println("Unmarshal json failed:", err)
return
}
}
return
}
func WatchConf(topic string) {
rch := client.Watch(context.Background(), topic)
channel := tail.Get_chan()
for wresp := range rch {
for _, ev := range wresp.Events {
err := json.Unmarshal(ev.Kv.Value, &logdata)
if err != nil {
fmt.Println("Update conf failed,err:", err)
return
}
fmt.Println("update config success:", ev.Kv.Value)
channel <- logdata
}
}
}
kafka包
kafka.go
package kafka
import (
"fmt"
"test/mylog/config"
"test/mylog/es"
"github.com/Shopify/sarama"
)
var (
client sarama.SyncProducer
logDataChan chan *config.LogData
consumer sarama.Consumer
pc sarama.PartitionConsumer
)
func Init(address []string, max_size int) (err error) {
cfg := sarama.NewConfig()
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Producer.Partitioner = sarama.NewRandomPartitioner
cfg.Producer.Return.Successes = true
client, err = sarama.NewSyncProducer(address, cfg)
if err != nil {
fmt.Println("Produce error:", err)
return
}
logDataChan = make(chan *config.LogData, max_size)
consumer, err = sarama.NewConsumer(address, nil)
if err != nil {
fmt.Println("Init consumer failed,err:", err)
return
}
go SendMessage()
return
}
func SendToChan(topic, data string) {
var t = &config.LogData{
Topic: topic,
Data: data,
}
logDataChan <- t
}
func SendMessage() {
for {
select {
case ld := <-logDataChan:
msg := sarama.ProducerMessage{}
msg.Topic = ld.Topic
msg.Value = sarama.StringEncoder(ld.Data)
pid, offset, err := client.SendMessage(&msg)
if err != nil {
fmt.Println("Send Message error:", err)
}
fmt.Printf("pid:%v offser:%v Topic:%v Value:%v\n", pid, offset, ld.Topic, ld.Data)
default:
}
}
}
func Consumer(topic string) {
partitionList, err := consumer.Partitions(topic)
if err != nil {
fmt.Println("Get partitions failed,err:", err)
return
}
for partition := range partitionList {
pc, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Println("failed to start consumer for partition,err:", err)
return
}
}
defer pc.AsyncClose()
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
t := &config.LogData{
Topic: topic,
Data: string(msg.Value),
}
es.SendToESChan(t)
}
}(pc)
select {}
}
// func Consumer(topic string) (err error) {
// partitionList, err := consumer.Partitions(topic) //根据topic取到所有的分区
// if err != nil {
// fmt.Println("fail to get list of partition:", err)
// return
// }
// var pc sarama.PartitionConsumer
// fmt.Println(partitionList)
// for partition := range partitionList {
// pc, err = consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
// if err != nil {
// fmt.Printf("failed to start consumer for partition %d,err:%v", partition, err)
// return
// }
// defer pc.AsyncClose()
// //异步从每个分区消费消息
// go func(sarama.PartitionConsumer) {
// for msg := range pc.Messages() {
// fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v\n", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
// //直接发给ES
// var ld = config.LogData{
// Topic: topic,
// Data: string(msg.Value),
// }
// es.SendToESChan(&ld) //函数调函数
// //优化一下,直接放到chann中
// }
// }(pc)
// select {}
// }
// return
// }
tail包
tail.go
package tail
import (
"context"
"fmt"
"test/mylog/config"
"test/mylog/kafka"
"github.com/hpcloud/tail"
)
type Tasks config.TailTask
var (
tails *tail.Tail
tasks_map map[string]*config.TailTask
tasks_chan chan config.LogEntryConf
)
func run(T *config.TailTask) {
for {
select {
case <-T.Ctx.Done():
return
case line := <-T.Instance.Lines:
kafka.SendToChan(T.Topic, line.Text)
}
}
}
func Init(Tvalue config.LogEntryConf) error {
tasks_map = make(map[string]*config.TailTask, 100)
tasks_chan = make(chan config.LogEntryConf)
for _, value := range Tvalue {
base := config.LogConf{
Path: value.Path,
Topic: value.Topic,
}
Task, err := NewTask(base)
name := fmt.Sprintf("%s\%s", value.Path, value.Topic)
tasks_map[name] = Task
if err != nil {
fmt.Println("Init tail failed,err:", err)
return err
}
go run(Task)
}
go Update_Task()
return nil
}
func Update_Task() {
for {
select {
case new_tasks := <-tasks_chan:
for _, old_task := range tasks_map {
name := fmt.Sprintf("%s\%s", old_task.Path, old_task.Topic)
tasks_map[name].CancelF()
}
for _, new_task := range new_tasks {
name := fmt.Sprintf("%s\%s", new_task.Path, new_task.Topic)
Task, err := NewTask(*new_task)
if err != nil {
fmt.Println("init task err:", err)
return
}
tasks_map[name] = Task
}
}
}
}
func Get_chan() chan config.LogEntryConf {
return tasks_chan
}
func NewTask(base config.LogConf) (tal *config.TailTask, err error) {
cfg := tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
}
ctx, cancel := context.WithCancel(context.Background())
tails, err = tail.TailFile(base.Path, cfg)
if err != nil {
fmt.Println("tail file failed,err:", err)
}
tal = &config.TailTask{
Path: base.Path,
Topic: base.Topic,
Instance: tails,
Ctx: ctx,
CancelF: cancel,
}
return
}
main.go
package main
import (
"fmt"
"sync"
"test/mylog/config"
"test/mylog/es"
"test/mylog/etcd"
"test/mylog/tail"
"test/mylog/kafka"
"gopkg.in/ini.v1"
)
var wg sync.WaitGroup
func main() {
var cfg config.AppConf
err := ini.MapTo(&cfg, "./config/config.ini")
if err != nil {
fmt.Println("Decode Map failed!", err)
}
err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.Max_size)
if err != nil {
fmt.Println("init kafka failed", err)
return
}
fmt.Println("init kafka success!")
err = etcd.Init([]string{cfg.EtcdConf.Address}, cfg.EtcdConf.Timeout)
var path config.LogEntryConf
path, err = etcd.GetConf(cfg.Log_key)
if err != nil {
return
}
tail.Init(path)
es.Init(cfg.ESConf.Address, cfg.ESConf.Max_size)
for index, value := range path {
fmt.Printf("index:%v value:%v topic:%v\n", index, value, value.Topic)
kafka.Consumer(value.Topic)
}
wg.Add(1)
etcd.WatchConf(cfg.EtcdConf.Log_key)
wg.Done()
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)