我们公司的hadoop集群都是基于原生安装的,没有使用cdh这些第三方整合好的,还提供各种监控。为了保证各个组件的正常,需要自己来开发弥补监控上的缺失。hadoop生态相当庞大,组件也超级多,其中一两个组件挂掉了,不经常巡检可能发现不了,经常巡检也会消耗掉大量精力
设计思路针对前面提到的痛点,我要动手解决掉下面的几个问题
定时检测端口是否正常 怎么模块化配置,支持各个地方不同的场景 当异常发生时候,怎么通知到我明确了要解决的问题之后,开始着手开发。
定时检测端口是否正常在上一篇文章中,已经实现了定时任务功能,所以可以直接参考那个做一个tcp检测的功能,设计一个TCPTask 结构体来结合Crond实现定时检测。
type TCPTask struct {
// 检测端口
Port int
// 检测地址
Addr string
// 角色,这里后面篇幅讲
Role string
// 返回结果,默认是0
Result int `default:"0"`
// 监控结构体,后面篇幅讲
Monitor *Monitor
}
func (t *TCPTask) Run() {
// 正常状态就是0,等下面产生异常时候会修改成1
t.Result = 0
// 这里开始监控tcp的状态,设置一个超时时间
conn, err := net.DialTimeout("tcp", t.Addr+":"+strconv.Itoa(t.Port), time.Duration(t.Monitor.Config.TCP_TIMEOUT*int(time.Second)))
if err != nil {
log.Println("err = ", err)
t.Result = 1
} else {
// 连接成功就就可以关掉原来的连接了
conn.Close()
}
// 通过chan传递执行状态
t.Monitor.Message <- t
}
怎么模块化配置,支持各个地方不同的场景?
hadoop的组件超级多,每个组件启动的端口也很多,我整理了一份表格,简称就是前面篇幅提到的Role(角色),全称就是hadoop组件的名称。
简称 | 全称 |
---|---|
NN | NameNode |
DN | DataNode |
JN | JournalNode |
FC | HDFS Failover Controller |
HG | Hive Gateway |
HS2 | Hive Server 2 |
SG | Spark2 Gateway |
SHS | Spart 2 History Server |
NM | Yarn NodeManager |
RM | Yarn ResourceManager |
ZKS | ZooKeeper Server |
M | Hbase Master |
RS | HBase RegionServer |
FA | Flume Agent |
HMS | Hive Metastore Server |
JHS | JobHistory Server |
想了好久才确定下来一个十分灵活的配置文件结构
hadoop:
tcp_timeout: 10
config:
NN:
port:
- 8020
- 50070
crond: "01 */2 * * * *"
describe: hadoop NameNode
name: hadoop_go_NameNode
DN:
port:
- 50020
- 50010
- 50075
crond: "30 */2 * * * *"
describe: hadoop DataNode
name: hadoop_go_DataNode
对应的go struct,下面入口程序代码有用到。
type GlobalConfig struct {
Hadoop HadoopGlobalConfig
Listen string
MaxTaskNumber int `yaml:"max_task_number" default:"1000"`
Log string
}
type HadoopGlobalConfig struct {
TCP_TIMEOUT int `yaml:"tcp_timeout" default:"30"`
Config map[string]HadoopRoleConfig
}
type HadoopRoleConfig struct {
Port []int
Crond string
Describe string
Name string
}
hadoop.config.tcp_timeout 超时时间hadoop.config.ROLE_NAME
键 | 类型 | 描述 |
---|---|---|
port | list | 角色启动的端口列表 |
crond | crond表达式 | 秒 分 时 日 月 周 |
describe | string | 描述这角色是干嘛的 |
name | string | 监控项名称 |
这样做的好处就是为了灵活,启动时候只加载需要的角色,在程序启动时候去初始化监控指标(metrics),附上最新的入口代码
package main
import (
"fmt"
"hadoop-go/hadoop"
"io/ioutil"
"log"
"net/http"
"os"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/robfig/cron"
"github.com/urfave/cli"
"gopkg.in/yaml.v2"
)
var (
hadoopConfig map[string]hadoop.HadoopConfig
config_path string
global_config_path string
GlobalConfig hadoop.GlobalConfig
enable_task bool
ding hadoop.Ding
monitor hadoop.Monitor
)
func load_config() {
// 读取全局配置文件
log.Print("读取全局配置文件:", global_config_path)
f, err := os.OpenFile(global_config_path, os.O_RDONLY, 0444)
if err != nil {
log.Panic("读取全局配置文件失败 ", err)
}
data, _ := ioutil.ReadAll(f)
err = yaml.Unmarshal(data, &GlobalConfig)
if err != nil {
log.Panic("格式化全局配置文件失败", err)
}
f.Close()
// 设置日志
logFile, err := os.OpenFile(GlobalConfig.Log, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766)
if err != nil {
panic(err)
}
log.SetOutput(logFile) // 将文件设置为log输出的文件
// log.SetPrefix("[reboot]")
log.SetFlags(log.LstdFlags | log.Lshortfile | log.LUTC)
// 读取配置文件
log.Print("读取配置文件:", config_path)
f, err1 := os.OpenFile(config_path, os.O_RDONLY, 0444)
if err1 != nil {
log.Panic("读取配置文件失败 ", err)
}
data1, _ := ioutil.ReadAll(f)
err = yaml.Unmarshal(data1, &hadoopConfig)
if err != nil {
log.Panic("格式化配置文件失败", err)
}
f.Close()
// 上面都没问题了,就去初始化监控指标
monitor.Metrcis = make(map[string]*prometheus.GaugeVec)
// 初始化一个channel,用来接收tcp检测的结果,
monitor.Message = make(chan *hadoop.TCPTask, GlobalConfig.MaxTaskNumber)
monitor.Config = &GlobalConfig.Hadoop
for n, d := range GlobalConfig.Hadoop.Config {
monitor.Metrcis[n] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: d.Name,
Help: d.Describe,
},
[]string{"addr", "port"},
)
prometheus.MustRegister(monitor.Metrcis[n])
}
}
func action(c *cli.Context) error {
// 载入配置
load_config()
// fix: error caused when modifying the configuration path
if len(c.Args()) == 0 {
println("请选择一个配置")
i := 0
for k := range hadoopConfig {
i++
println(i, k)
}
return nil
}
config, err := hadoopConfig[c.Args()[0]]
if !err {
log.Panic("没有", c.Args()[0], "的配置")
}
// 初始化dingding监听
if config.Dingding != "" {
log.Println("启用钉钉,token:", config.Dingding)
ding = hadoop.Ding{Token: config.Dingding, Msg: make(chan string, 999)}
go ding.Send()
}
// hadoop功能监控整合Prometheus
monitor.Load(config)
// 启用task
crond := cron.New()
if enable_task {
log.Println("启用定时任务")
// 循环遍历任务加到定时任务里面
for n, t := range config.Tasks {
t.Ding = &ding
switch t.Module {
case "hdfs":
if t.Crond != "" {
log.Println("添加定时任务:", n, "时间:", t.Crond)
crond.AddJob(t.Crond, hadoop.Hdfs{Name: n, Task: t})
}
default:
log.Println("没有对应的模块")
}
}
crond.Start()
}
http.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(GlobalConfig.Listen, nil))
defer monitor.Crond.Stop()
defer crond.Stop()
return nil
}
func main() {
app := cli.NewApp()
app.Name = "hadoop-go"
app.Version = "1.0.0"
app.Usage = "hadoop监控"
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "c",
Usage: "配置文件路径。default: ./config/hadoop.yml",
Value: "./config/hadoop.yml",
Destination: &config_path,
},
cli.StringFlag{
Name: "g",
Usage: "全局配置文件路径。default: ./config/config.yml",
Value: "./config/config.yml",
Destination: &global_config_path,
},
cli.BoolFlag{
Name: "t",
Usage: "启动定时任务",
Destination: &enable_task,
},
}
app.Action = action
err := app.Run(os.Args)
if err != nil {
fmt.Println(err)
}
}
比较重要的就是Monitor,附上完整的代码
package hadoop
import (
"log"
"net"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/robfig/cron"
)
type Monitor struct {
Metrcis map[string]*prometheus.GaugeVec
Crond *cron.Cron
Config *HadoopGlobalConfig
Message chan *TCPTask
}
type TCPTask struct {
Port int
Addr string
Role string
Result int `default:"0"`
Monitor *Monitor
}
func (t *TCPTask) Run() {
// 正常状态就是0,等下面产生异常时候会修改成1
t.Result = 0
// 这里开始监控tcp的状态,设置一个超时时间
conn, err := net.DialTimeout("tcp", t.Addr+":"+strconv.Itoa(t.Port), time.Duration(t.Monitor.Config.TCP_TIMEOUT*int(time.Second)))
if err != nil {
log.Println("err = ", err)
t.Result = 1
} else {
// 连接成功就就可以关掉原来的连接了
conn.Close()
}
// 通过chan传递执行状态
t.Monitor.Message <- t
}
func (m *Monitor) Load(config HadoopConfig) {
// 启动一个采集定时器
m.Crond = cron.New()
// 启动一个协程去更新metric信息
go m.Update()
// 将需要的转换成定时任务
for server, c := range config.Hadoop {
log.Println("服务器:", server)
for _, role := range c.Role {
for _, port := range m.Config.Config[role].Port {
// 在这里初始化一下。默认值都是0,代表正常的
m.Metrcis[role].WithLabelValues(server, strconv.Itoa(port)).Set(0)
// 添加到定时任务里面,根据配置文件设置的时间规则去定时执行检测
m.Crond.AddJob(m.Config.Config[role].Crond, &TCPTask{Port: port, Addr: server, Role: role, Monitor: m})
}
}
}
m.Crond.Start()
}
func (m *Monitor) Update() {
// 用来更新metric
for msg := range m.Message {
log.Printf("update metric: server=>%s role=>%s port=>%d result=>%d", msg.Addr, msg.Role, msg.Port, msg.Result)
m.Metrcis[msg.Role].WithLabelValues(msg.Addr, strconv.Itoa(msg.Port)).Set(float64(msg.Result))
}
}
上面的代码就实现了怎么模块化的注册Prometheus指标
for n, d := range GlobalConfig.Hadoop.Config {
monitor.Metrcis[n] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: d.Name,
Help: d.Describe,
},
[]string{"addr", "port"},
)
prometheus.MustRegister(monitor.Metrcis[n])
}
模块化的设置监控主机,主要就是xxx.hadoop里面的配置,以下面的配置文件作为参考
chengdu:
dingding: x
tasks:
v_report:
module: hdfs
type: CompareSize
args: /user/hive/warehouse/zcsy.db/v_report/ptdate={{.Yesterday}} gt 40282905455
crond: 00 09 11 * * *
desc: 成都{{.Yesterday}} v_report状态
hadoop:
172.16.4.29:
role:
- DN
- NN
172.16.4.29这台主机既是DataNode,又是namenode,我们需要监控其8020,50070,50020,50010,50075端口。代码部分
// 将需要的转换成定时任务
for server, c := range config.Hadoop {
log.Println("服务器:", server)
for _, role := range c.Role {
for _, port := range m.Config.Config[role].Port {
// 在这里初始化一下。默认值都是0,代表正常的
m.Metrcis[role].WithLabelValues(server, strconv.Itoa(port)).Set(0)
// 添加到定时任务里面,根据配置文件设置的时间规则去定时执行检测
m.Crond.AddJob(m.Config.Config[role].Crond, &TCPTask{Port: port, Addr: server, Role: role, Monitor: m})
}
}
}
日志打印:
2021/06/21 04:40:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>50070 result=>0
2021/06/21 04:40:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>8020 result=>0
2021/06/21 04:40:30 monitor.go:75: update metric: server=>172.16.4.29 role=>DN port=>50075 result=>0
2021/06/21 04:40:30 monitor.go:75: update metric: server=>172.16.4.29 role=>DN port=>50020 result=>0
2021/06/21 04:40:30 monitor.go:75: update metric: server=>172.16.4.29 role=>DN port=>50010 result=>0
2021/06/21 04:42:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>8020 result=>0
2021/06/21 04:42:01 monitor.go:75: update metric: server=>172.16.4.29 role=>NN port=>50070 result=>0
Promethues Web:
# HELP hadoop_go_DataNode hadoop DataNode
# TYPE hadoop_go_DataNode gauge
hadoop_go_DataNode{addr="172.16.4.29",port="50010"} 0
hadoop_go_DataNode{addr="172.16.4.29",port="50020"} 0
hadoop_go_DataNode{addr="172.16.4.29",port="50075"} 0
# HELP hadoop_go_NameNode hadoop NameNode
# TYPE hadoop_go_NameNode gauge
hadoop_go_NameNode{addr="172.16.4.29",port="50070"} 0
hadoop_go_NameNode{addr="172.16.4.29",port="8020"} 0
当异常发生时候,怎么通知到我
我们通过配置prometheus alert规则就能实现。
- alert: "hadoop_go_datanode"
# 表达式,不等于0时候
expr: hadoop_go_DataNode!=0
# 持续1分钟
for: 1m
labels:
# 这里是我们自定义的,receiver是一个接受者,我们自己开发的告警平台
receiver: yunwei
severity: error
annotations:
summary: "成都datanode异常"
description: "{{$labels.addr}}的{{$labels.port}}状态异常"
value: "{{ $value }}"
本系列到此就告一段落了,大家有什么想法可以留言交流。每做一个项目我都会有很大的收获。后面如果要更新的话,可能是讲一下监控hadoop的其他信息。
往期回顾1.使用golang开发并监控hadoop篇(1)hdfs文件夹大小监控
2.使用golang开发并监控hadoop篇(2)hdfs使用情况和定时任务功能
3.使用golang开发并监控hadoop篇(3)监控hadoop的端口并整合到Prometheus
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)