~~ 在前面几章,介绍了influxdb的基本概念,经常的用法,以及怎么编译源码。
influxdb概念详解1influxdb安装和使用influxdb概念详解2influxdb源码编译本章就开始源码的分析。分析之前,还是回顾一下influxdb的基本结构,从存储的角度来看,可以这样分:
rp代表retentionpolicy。 服务启动
在cmd/influxd/main.go中,main函数:
func main() {
rand.Seed(time.Now().UnixNano())
m := NewMain()
if err := m.Run(os.Args[1:]...); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
这里的逻辑相对简单.Run函数里面,构建了一下Command结构,调用了Command的Run函数:
if err := cmd.Run(args...); err != nil {
return fmt.Errorf("run: %s", err)
}
Command主要逻辑在第142-152这10行,构建Server结构,并且启动
buildInfo := &BuildInfo{
Version: cmd.Version,
Commit: cmd.Commit,
Branch: cmd.Branch,
Time: cmd.BuildTime,
}
s, err := NewServer(config, buildInfo)
if err != nil {
return fmt.Errorf("create server: %s", err)
}
s.Logger = cmd.Logger
s.CPUProfile = options.CPUProfile
s.MemProfile = options.MemProfile
if err := s.Open(); err != nil {
return fmt.Errorf("open server: %s", err)
}
influxdb的代码规范还是很好的,服务启动一般都是Open函数这个后面可以看到很多类似的例子。
Server介绍这里有一个重要的结构:Server它是服务端的抽象。定义在cmd/influxd/run/server.go中。一个Server主要有以下这些结构
MetaClient:meta 信息的访问抽象。meta信息主要包括RetentionPolicy,ContinuousQueryInfo,UserInfo,ShardInfo,ShardGroupInfo等。metaClient提供了对这些信息增删改查的接口。
TSDBStore:时序存储的抽象。提供了对所有的database,所有的shard的读写功能。可以理解为存储的核心
QueryExecutor:查询执行器。用来执行用户的查询请求,这些查询都被看作为statement。
PointWrite: point 写入抽象。时序数据都看做一个个数据点,所以叫做point。那么这里为什么是被单独作为一个模块出现,而不是集成到TSDBStore里面呢?原因是influxdb不仅仅支持自己独有的行协议(line protocol),还支持其他协议,例如openTSDB等,这里可以看做是一个统一的点写入接口。可以拓展。
[]Service,这里是一个数组。表示多种service。influxdb把一些不是存储查询相关的功能,大部分都定义为Service。类似插件一样集成到系统内部。例如支持http写入的HTTDService,支持
SnapshotterService 支持snapshot的服务。
除了这些关键的成员,还有一些其他的,例如绑定的端口信息,profile信息等。详细的结构如下:
type Server struct {
buildInfo BuildInfo
err chan error
closing chan struct{}
BindAddress string
Listener net.Listener
Logger *zap.Logger
MetaClient *meta.Client
TSDBStore *tsdb.Store
QueryExecutor *query.Executor
PointsWriter *coordinator.PointsWriter
Subscriber *subscriber.Service
Services []Service
// These references are required for the tcp muxer.
SnapshotterService *snapshotter.Service
Monitor *monitor.Monitor
// Server reporting and registration
reportingDisabled bool
// Profiling
CPUProfile string
CPUProfileWriteCloser io.WriteCloser
MemProfile string
MemProfileWriteCloser io.WriteCloser
// httpAPIAddr is the host:port combination for the main HTTP API for querying and writing data
httpAPIAddr string
// httpUseTLS specifies if we should use a TLS connection to the http servers
httpUseTLS bool
// tcpAddr is the host:port combination for the TCP listener that services mux onto
tcpAddr string
config *Config
}
Server构建
知道了Server的主要逻辑,在influxdb/cmd/run/server.go中,NewServer主要就是对这些成员赋值:
s := &Server{
buildInfo: *buildInfo,
err: make(chan error),
closing: make(chan struct{}),
BindAddress: bind,
Logger: logger.New(os.Stderr),
MetaClient: meta.NewClient(c.Meta),
reportingDisabled: c.ReportingDisabled,
httpAPIAddr: c.HTTPD.BindAddress,
httpUseTLS: c.HTTPD.HTTPSEnabled,
tcpAddr: bind,
config: c,
}
s.Monitor = monitor.New(s, c.Monitor)
s.config.registerDiagnostics(s.Monitor)
if err := s.MetaClient.Open(); err != nil {
return nil, err
}
s.TSDBStore = tsdb.NewStore(c.Data.Dir)
s.TSDBStore.EngineOptions.Config = c.Data
// Copy TSDB configuration.
s.TSDBStore.EngineOptions.EngineVersion = c.Data.Engine
s.TSDBStore.EngineOptions.IndexVersion = c.Data.Index
// Create the Subscriber service
s.Subscriber = subscriber.NewService(c.Subscriber)
// Initialize points writer.
s.PointsWriter = coordinator.NewPointsWriter()
s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout)
s.PointsWriter.TSDBStore = s.TSDBStore
Server 启动
server构建完成之后,就是启动Server,这里调用的是Open函数。在Open函数里,会开始装配[]Service数组,来确定开启哪些服务:
s.appendMonitorService()
s.appendPrecreatorService(s.config.Precreator)
s.appendSnapshotterService()
s.appendContinuousQueryService(s.config.ContinuousQuery)
s.appendHTTPDService(s.config.HTTPD)
s.appendRetentionPolicyService(s.config.Retention)
for _, i := range s.config.GraphiteInputs {
if err := s.appendGraphiteService(i); err != nil {
return err
}
}
for _, i := range s.config.CollectdInputs {
s.appendCollectdService(i)
}
for _, i := range s.config.OpenTSDBInputs {
if err := s.appendOpenTSDBService(i); err != nil {
return err
}
}
for _, i := range s.config.UDPInputs {
s.appendUDPService(i)
}
以及一些依赖的赋值:
s.Subscriber.MetaClient = s.MetaClient
s.PointsWriter.MetaClient = s.MetaClient
s.Monitor.MetaClient = s.MetaClient
s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
这里比如pointWriter也是依赖了metaClient的,因为pointWriter写入每个点的时候,需要知道写到哪个shard上,所以需要知道shard相关的信息,而shard相关的信息是metaClient提供的,所以这里会对内部的一些依赖赋值。
~~~
完成赋值之后,会开始启动内部的组件。
if err := s.TSDBStore.Open(); err != nil {
return fmt.Errorf("open tsdb store: %s", err)
}
// Open the subscriber service
if err := s.Subscriber.Open(); err != nil {
return fmt.Errorf("open subscriber: %s", err)
}
// Open the points writer service
if err := s.PointsWriter.Open(); err != nil {
return fmt.Errorf("open points writer: %s", err)
}
s.PointsWriter.AddWriteSubscriber(s.Subscriber.Points())
for _, service := range s.Services {
if err := service.Open(); err != nil {
return fmt.Errorf("open service: %s", err)
}
}
比如启动TSDBStore,启动各个Service等。这里又能看到了influxdb代码设计的优雅之处,所有的服务都是被抽象为Service,接口都是统一的,十分的方便管理和拓展。
总结到这里influxdb就完成了启动,总结下来核心逻辑在于装配Server这个结构体,以及装配Server携带的各个Service。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)