本节内容以go语言设计一个简易的日志收集系统,并且完成日志的客户端开发。
项目背景每个系统都有日志,当系统出现问题的时候需要通过日志解决问题。
当系统机器比较少时,登录服务器即可查看日志。但当机器规模较大时,登录机器看就不太现实,这个时候就需要日志收集系统。
解决方案- 把机器上的日志实时收集,统一存储到一个中心系统。
- 对日志建立索引,通过搜索即可找到对应的日志。
- 通过提供友好的web界面,完成日志的搜索查找。
- 实时日志量非常大,每天几十亿条
- 日志准时收集,延迟控制在分钟级别
- 能够水平扩展
ELK的方案架构如下,
ELK存在的问题:
- 运维成本高,每增加一个日志收集,都需要手动修改配置
- 监控缺失,无法准确获取logstash的状态
- 无法做定制化开发以及维护
各组件介绍如下
- Log Agent,日志收集客户端,用来收集服务器上的日志
- Kafka,高吞吐量的分布式队列,linkin开发,apache顶级开源项目。使用Kafka可以实现日志收集和日志处理的解耦。
- ES,elasticsearch,开源的搜索引擎,提供基于http restful的web接口
- Hadoop,分布式计算框架,能够对大量数据进行分布式处理的平台
- Storm,
Storm
是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop。 - 除了图中的基本部件外,Log Agent需要连接ETCD和WEB界面进行日志存储和日志配置。
1. 异步处理, 把非关键流程异步化,提高系统的响应时间和健壮性
2. 应用解耦,通过消息队列
3. 流量削峰
zookeeper应用场景1. 服务注册&服务发现
2. 配置中心
3. 分布式锁
- Zookeeper是强一致的
- 多个客户端同时在Zookeeper上创建相同znode,只有一个创建成功
我们的项目实践又如下几部分内容构成:
- linux上的kafka安装
- 生产者log agent开发
- etcd+contex+kafka消费者开发
- WEB日志管理平台开发
- 安装JDK,从oracle下载最新的SDK安装
- 安装zookeeper3.3.6,下载地址:http://apache.fayea.com/zookeeper/
- mv conf/zoo_sample.cfg conf/zoo.cfg
- 编辑 conf/zoo.cfg,修改dataDir=D:\zookeeper-3.3.6\data\
- 运行bin/zkServer.cmd
- 安装kafka
- 打开链接:http://kafka.apache.org/downloads.html下载kafka2.1.2
- 打开config目录下的server.properties, 修改log.dirs为D:\kafka_logs, 修改advertised.host.name=服务器ip
- 启动kafka ./bin/windows/kafka-server-start.bat ./config/server.preperties
我们构建一个日志客户端log agent作为生产者。需要用到的go的包为:
- Import “github.com/Shopify/sarama":基于sarama第三方库开发的 kafka client,往kafka里面发送消息
- Import “github.com/hpcloud/tail”:HP团队出的tail库,常用于日志收集
- Import “github.com/astaxie/beego/config”
- Import “github.com/astaxie/beego/logs”
我们的go语言设计log agent用到的模块和模块数据流如下:
kafka示例代码Import “github.com/Shopify/sarama"
tailf介绍程序示例:
package main import ( "fmt" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true msg := &sarama.ProducerMessage{} msg.Topic = "nginx_log" msg.Value = sarama.StringEncoder("this is a good test, my message is good") client, err := sarama.NewSyncProducer([]string{"192.168.31.177:9092"}, config) if err != nil { fmt.Println("producer close, err:", err) return } defer client.Close() pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send message failed,", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) }
tailf组件使用
Import “github.com/hpcloud/tail”
配置文件库使用程序示例:
package main import ( "fmt" "github.com/hpcloud/tail" "time" ) func main() { filename := "./my.log" tails, err := tail.TailFile(filename, tail.Config{ ReOpen: true, Follow: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}, MustExist: false, Poll: true, }) if err != nil { fmt.Println("tail file err:", err) return } var msg *tail.Line var ok bool for true { msg, ok = <-tails.Lines if !ok { fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename) time.Sleep(100 * time.Millisecond) continue } fmt.Println("msg:", msg) } }
Import “github.com/astaxie/beego/config”
日志库的使用程序示例:
package main import ( "fmt" "github.com/astaxie/beego/config" ) func main() { conf, err := config.NewConfig("ini", "./logcollect.conf") if err != nil { fmt.Println("new config failed, err:", err) return } port, err := conf.Int("server::port") if err != nil { fmt.Println("read server:port failed, err:", err) return } fmt.Println("Port:", port) log_level, err := conf.Int("log::log_level") if err != nil { fmt.Println("read log_level failed, ", err) return } fmt.Println("log_level:", log_level) log_path := conf.String("log::log_path") fmt.Println("log_path:", log_path) }
Import “github.com/astaxie/beego/logs”
程序示例:
package main import ( "encoding/json" "fmt" "github.com/astaxie/beego/logs" ) func main() { config := make(map[string]interface{}) config["filename"] = "./logs/logcollect.log" config["level"] = logs.LevelDebug configStr, err := json.Marshal(config) if err != nil { fmt.Println("marshal failed, err:", err) return } logs.SetLogger(logs.AdapterFile, string(configStr)) logs.Debug("this is a test, my name is %s", "stu01") logs.Trace("this is a trace, my name is %s", "stu02") logs.Warn("this is a warn, my name is %s", "stu03") }
生产者具体程序运行过程参考:
Go语言学习之11 日志收集系统kafka库实战 - pointerC++ - 博客园本节主要内容: 1. 日志收集系统设计2. 日志客户端开发 1. 项目背景 a. 每个系统都有日志,当系统出现问题时,需要通过日志解决问题 b. 当系统机器比较少时,登陆到服务器上查看即可满足 c.https://www.cnblogs.com/xuejiale/p/10657989.html
消费者etcd+contex+kafka etcd介绍etcd介绍与使用:
- 概念:高可用的分布式key-value存储,可以使用配置共享和服务发现
- 类似项目:zookeeper和consul
- 开发语言:Go
- 接口:提供restful的http接口,使用简单
- 实现算法:基于raft算法的强一致性、高可用的服务存储目录
etcd的应用场景:
- 服务发现和服务注册
- 配置中心
- 分布式存储
- master选举
contex主要作用如下:
- 如何控制goroutine
- 如何保存上下文数据
消费者具体程序运行过程参考:
Go语言学习之12 etcd、contex、kafka消费实例、logagent - pointerC++ - 博客园本节内容: 1. etcd介绍与使用 2. ElastcSearch介绍与使用 1. etcd介绍与使用 概念:高可用的分布式key-value存储,可以使用配置共享和服务发现 类似项目:zookeehttps://www.cnblogs.com/xuejiale/p/10660857.html
WEB日志管理平台我们会用到如下知识点:
- ElasticSearch介绍与使用
- kibana介绍与使用
1. ElasticSearch安装
详见上节内容
2. kibana安装
(1) 下载ES,下载地址:https://www.elastic.co/start
(2)解压缩
(3)启动kibana, ./bin/kibana.bat
(4)在浏览器中访问: http://localhost:5601
Username: elastic Passwd: changeme
3. nginx安装
(1)下载nginx,下载地:https://nginx.org
(2)解压缩
(3)启动nginx, ./nginx
(4)在浏览器中访问: http://localhost
4. mysql事务
(1) 原子性
(2)一致性
(3)隔离性
(4)持久性
5. mysql事务
(1)import (“github.com/jmoiron/sqlx")
(2)Db.Begin()开始事务
(3)Db.Submit()提交事务
(4)Db.Rollback() 回滚事务
6. Beego web开发
(1)规划好url
(2)添加路由
(3)开发controller,继承beego.Controller
(4)测试
Beego 模板渲染
1)把需要传给模板的变量赋值给beego.controller里面的Data字段
2)实现模板逻辑
详见博客:一、Beego介绍与项目创建及启动 - pointerC++ - 博客园一、beego 简介 beego 是一个快速开发 Go 应用的 HTTP 框架,他可以用来快速开发 API、Web 及后端服务等各种应用,是一个 RESTful 的框架,主要设计灵感来源于 tornahttps://www.cnblogs.com/xuejiale/p/10562074.html
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)