GO *** 作influxdb

GO *** 作influxdb,第1张

influxDB influxDB名词 database:数据库;measurement:数据库中的表;points:表里面的一行数据。

还有一个重要的名词: series

所有在数据库中的数据,都需要通过图表来表示,series表示这个表里面的所有的数据可以在图标上画成几条线(注:线条的个数由tags排列组合计算出来)

对influxdb的 *** 作:
#创建数据库
create database "db_name"

#显示所有的数据库
show databases

#删除数据库
drop database "db_name"

#使用数据库
use db_name

#显示该数据库中所有的表
show measurements

#创建表,直接在插入数据的时候指定表名
insert test,host=127.0.0.1,monitor_name=test count=1

#删除表
drop measurement "measurement_name"
point

influxDB中的point相当于传统数据库里的一行数据,由时间戳(time)、数据(field)、标签(tag)组成。

属性传统数据库概念
time每个数据记录时间,是数据库中的主索引
field各种记录值(没有索引的属性),例如温度、湿度
tags各种有索引的属性,例如地区、海拔
Go *** 作influxDB,1.x版本

安装

influxDB 1.x版本

go get github.com/influxdata/influxdb1-client/v2
package client_test

import (
   "fmt"
   "os"
   "time"
   client "github.com/influxdata/influxdb1-client/v2"
)

// 创建一个client
func ExampleClient()client.Client {
   // NOTE: this assumes you've setup a user and have setup shell env variables,
   // namely INFLUX_USER/INFLUX_PWD. If not just omit Username/Password below.
   cli, err := client.NewHTTPClient(client.HTTPConfig{
      Addr:     "http://localhost:8086",
      Username: os.Getenv("INFLUX_USER"),
      Password: os.Getenv("INFLUX_PWD"),
   })
   if err != nil {
      fmt.Println("Error creating InfluxDB Client: ", err.Error())
   }
   return cli
}

// 把数据写入influxdb
func ExampleClient_write(cli client.Client) {
   // Make client
   c, err := client.NewHTTPClient(client.HTTPConfig{
      Addr: "http://localhost:8086",
   })
   if err != nil {
      fmt.Println("Error creating InfluxDB Client: ", err.Error())
   }
   defer c.Close()

   // Create a new point batch
   bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
      Database:  "BumbleBeeTuna",//数据库名
      Precision: "s",//时间精度秒
   })
   // Create a point and add to batch
   tags := map[string]string{"cpu": "cpu-total"}//查询的索引
   fields := map[string]interface{}{
      "idle":   10.1,
      "system": 53.3,
      "user":   46.6,
   }//记录值
   pt, err := client.NewPoint("cpu_usage", tags, fields, time.Now())//将创建的表名为cpu_usage的表以及内容字段放入pt
   if err != nil {
      fmt.Println("Error: ", err.Error())
   }
   bp.AddPoint(pt)//把表放入创建的point中

   // Write the batch
   c.Write(bp)//写入创建的client中
}
// 查询
func ExampleClient_query(cli client.Client) {
   // Make client
   c, err := client.NewHTTPClient(client.HTTPConfig{
      Addr: "http://localhost:8086",
   })
   if err != nil {
      fmt.Println("Error creating InfluxDB Client: ", err.Error())
   }
   defer c.Close()

   q := client.NewQuery("SELECT count(value) FROM shapes", "square_holes", "ns")
   if response, err := c.Query(q); err == nil && response.Error() == nil {
      fmt.Println(response.Results)
   }
}
func main() {
   conn := ExampleClient()
   ExampleClient_write(conn)
   ExampleClient_query(conn)
}
Go *** 作influxDB,2.0版本

安装

influxDB 2.x版本

将客户端包添加到项目依赖项中。

go get github.com/influxdata/influxdb-client-go

在Go程序中,导入必要的包,并指定可执行程序的入口点。

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/influxdata/influxdb-client-go/v2"
)

为fluxdb定义变量

bucket := "example-bucket"
org := "example-org"
token := "example-token"
// Store the URL of your InfluxDB instance
url := "http://localhost:8086"

创建fluxdb Go客户端并传入url和token参数。

为了避免重复验证,使用令牌的方式,这里默认在初始化influxdb的时候把用户名和密码已经设置,得到了token。

client := influxdb2.NewClient(url, token)

使用WriteAPIBlocking方法创建一个写客户端,并传入org和bucket参数。

writeAPI := client.WriteAPIBlocking(org, bucket)

若要查询数据,创建influxdb数据库查询客户端并传入influxdb数据库的org。

queryAPI := client.QueryAPI(org)
使用Go将数据写入fluxdb

1.创建一个点,并使用API writer结构的WritePoint方法将其写入fluxdb。

2.关闭客户端以刷新所有挂起的写并完成。

p := influxdb2.NewPoint("stat",
  map[string]string{"unit": "temperature"},
  map[string]interface{}{"avg": 24.5, "max": 45},
  time.Now())
writeAPI.WritePoint(context.Background(), p)
client.Close()
写入的实例
func main() {
    bucket := "example-bucket"
    org := "example-org"
    token := "example-token"
    // Store the URL of your InfluxDB instance
    url := "http://localhost:8086"
    // Create new client with default option for server url authenticate by token
    client := influxdb2.NewClient(url, token)
    // User blocking write client for writes to desired bucket
    writeAPI := client.WriteAPIBlocking(org, bucket)
    // Create point using full params constructor
    p := influxdb2.NewPoint("stat",
        map[string]string{"unit": "temperature"},
        map[string]interface{}{"avg": 24.5, "max": 45},
        time.Now())
    // Write point immediately
    writeAPI.WritePoint(context.Background(), p)
    // Ensures background processes finishes
    client.Close()
}

使用Go从fluxdb查询数据

1.创建一个Flux查询,提供你的bucket参数

from(bucket:"")
 |> range(start: -1h)
 |> filter(fn: (r) => r._measurement == "stat")

查询客户端将Flux查询发送到fluxdb,并以具有表结构的FluxRecord对象的形式返回结果。

查询客户端的方法:

Query:向fluxdb发送Flux查询。

Next:遍历查询响应。

TableChanged:当组键更改时进行标识。

Record:返回最后解析的FluxRecord,并提供对值和行属性的访问。

Value:返回实际的field值。

result, err := queryAPI.Query(context.Background(), `from(bucket:"")
    |> range(start: -1h) 
    |> filter(fn: (r) => r._measurement == "stat")`)
if err == nil {
    for result.Next() {
        if result.TableChanged() {
            fmt.Printf("table: %s\n", result.TableMetadata().String())
        }
        fmt.Printf("value: %v\n", result.Record().Value())
    }
    if result.Err() != nil {
        fmt.Printf("query parsing error: %s\n", result.Err().Error())
    }
} else {
    panic(err)
}

FluxRecord对象包括以下访问数据的方法:

Table():返回该记录所属的表的索引。

Start(): 返回当前表中所有记录的下限。

Stop():返回当前表中所有记录的独占上限。

Time(): 返回记录的时间。

Value() : 返回实际的field值。

Field(): 返回field名。

Measurement(): 返回记录所属的表名称。

Values():返回列值的映射。

ValueByKey():从记录中返回给定列键的值。

读出的实例
 func main() {
    // Create client
    client := influxdb2.NewClient(url, token)
    // Get query client
    queryAPI := client.QueryAPI(org)
    // Get QueryTableResult
    result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
    if err == nil {
        // Iterate over query response
        for result.Next() {
            // Notice when group key has changed
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            // Access data
            fmt.Printf("value: %v\n", result.Record().Value())
        }
        // Check for an error
        if result.Err() != nil {
            fmt.Printf("query parsing error: %s\n", result.Err().Error())
        }
    } else {
        panic(err)
    }
    // Ensures background processes finishes
    client.Close()
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/995971.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存