Go语言实战-nginx日志处理之RabbitMQ篇

Go语言实战-nginx日志处理之RabbitMQ篇,第1张

golang原生channel很好的解决多个go程之间通信和共享数据问题,但也局限于单个服务器之内,如果是多个服务器之间呢,消息队列无疑是个很好的选择。消息队列有不少,RabbitMQ、ReocketMQ、kafka等,实际使用中会根据不同的业务场景选择不同MQ。

下面简单介绍rabbitMQ的使用,还是基于之前的nginx分析处理项目,从nginx读取到的string就不存入chan中了,直接publish到RabbitMQ中,然后再起多个go程从RabbitMQ获取string进行解析处理。

// 起五个go程,消费rabbitMQ中的消息
go worker("worker1", db)
go worker("worker2", db)
go worker("worker3", db)
go worker("worker4", db)
go worker("worker5", db)

// 起一个go程,读取nginx日志,并publish到rabbitMQ
go task()

task关键代码

// 声明队列
q, err := ch.QueueDeclare(
	"task_queue", // name
	true,         // durable 是否持久化
	false,        // delete when unused
	false,        // exclusive
	false,        // no-wait
	nil,          // arguments
)
// 将读取到的string,publish到rabbitMQ
err = ch.Publish(
	"",     // exchange
	q.Name, // routing key
	false,  // mandatory
	false,
	amqp.Publishing{
		DeliveryMode: amqp.Transient, // 指定交付模式 Persistent:轮询,Transient: 谁快谁优先
		ContentType:  "text/plain",
		Body:         []byte(body),
    }
)

worker关键代码

// 声明队列
q, err := ch.QueueDeclare(
	"task_queue", // name
	true,         // durable
	false,        // delete when unused
	false,        // exclusive
	false,        // no-wait
	nil,          // arguments
)
// 消费消息
for d := range msgs {
	parse(string(d.Body), db) //
	d.Ack(false)
	count++
	log.Println(workerName, ": ", count)
}

完整代码

package main

import (
	"bufio"
	"io"
	"log"
	"os"
	"regexp"
	"strconv"
	"strings"
	"time"

	"github.com/jinzhu/gorm"
	_ "github.com/jinzhu/gorm/dialects/mysql"
	"github.com/streadway/amqp"
)

type LogsInfo struct {
	Ip        string
	Time      int64
	Method    string
	Path      string
	Protocol  string
	Status    int
	Size      int
	Referer   string
	UserAgent string
}

type LogsInfoException struct {
	Ip    string
	Time  int64
	Other string
}

func main() {

	db, err := gorm.Open("mysql", "root:root@/test?charset=utf8&parseTime=True&loc=Local")
	defer db.Close()
	if err != nil {
		println("connection mysql fail")
	} else {
		println("connectiton mysql success")
	}
	// 设置全局表名禁用复数
	db.SingularTable(true)

	forever := make(chan bool)

	// 起五个go程,消费rabbitMQ中的消息
	go worker("worker1", db)
	go worker("worker2", db)
	go worker("worker3", db)
	go worker("worker4", db)
	go worker("worker5", db)

	// 起一个go程,读取nginx日志,并publish到rabbitMQ
	go task()

	<-forever

}

// read log file and publish
func task() error {
	conn, err := amqp.Dial("amqp://guest:guest@193.112.117.65:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"task_queue", // name
		true,         // durable 是否持久化
		false,        // delete when unused
		false,        // exclusive
		false,        // no-wait
		nil,          // arguments
	)
	failOnError(err, "Failed to declare a queue")

	// read log file
	filePath := "log/access.log"
	f, err := os.Open(filePath)
	defer f.Close()
	if err != nil {
		log.Panic(err)
		return err
	}
	buf := bufio.NewReader(f)

	count := 0
	for {
		if count > 1000000 {
			return nil
		}
		line, _, err := buf.ReadLine()
		if err != nil {
			if err == io.EOF {
				return nil
			}
			return err
		}

		body := strings.TrimSpace(string(line))

		// 将读取到的string入队列
		err = ch.Publish(
			"",     // exchange
			q.Name, // routing key
			false,  // mandatory
			false,
			amqp.Publishing{
				DeliveryMode: amqp.Transient, // 指定交付模式 Persistent:轮询,Transient: 谁快谁优先
				ContentType:  "text/plain",
				Body:         []byte(body),
			})
		failOnError(err, "Failed to publish a message")
		log.Println("task: ", count)

		count += 1
	}
}

// consumer
func worker(workerName string, db *gorm.DB) {
	conn, err := amqp.Dial("amqp://guest:guest@193.112.117.65:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"task_queue", // name
		true,         // durable
		false,        // delete when unused
		false,        // exclusive
		false,        // no-wait
		nil,          // arguments
	)
	failOnError(err, "Failed to declare a queue")

	err = ch.Qos(
		1,     // prefetch count
		0,     // prefetch size
		false, // global
	)
	failOnError(err, "Failed to set QoS")

	// consumer
	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		false,  // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)

	count := 0
	go func() {
		for d := range msgs {
			parse(string(d.Body), db) //
			d.Ack(false)
			count++
			log.Println(workerName, ": ", count)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

// parse and write
func parse(str string, db *gorm.DB) {
	re := `^([\d]{1,3}\.[\d]{1,3}\.[\d]{1,3}\.[\d]{1,3}) - - \[(.*)\] "([^\s]+) ([^\s]+) ([^\s]+?)" ([\d]{3}) ([\d]{1,9}) "([^"]*?)" "([^"]*?)"`
	reg := regexp.MustCompile(re)

	parseInfo := reg.FindStringSubmatch(str)
	if len(parseInfo) == 0 {
		re1 := `^([\d]{1,3}\.[\d]{1,3}\.[\d]{1,3}\.[\d]{1,3}) - - \[(.*)\] (.*)`
		reg1 := regexp.MustCompile(re1)
		parseInfo1 := reg1.FindStringSubmatch(str)
		if len(parseInfo1) == 0 {
			return
		}

		t1, _ := time.Parse("02/Jan/2006:15:04:05 -0700", parseInfo1[2])
		infoException := LogsInfoException{
			Ip:    parseInfo1[1],
			Time:  t1.Unix(),
			Other: parseInfo1[3],
		}

		db.Create(&infoException)
		return
	}

	t, _ := time.Parse("02/Jan/2006:15:04:05 -0700", parseInfo[2])
	status, _ := strconv.Atoi(parseInfo[6])
	size, _ := strconv.Atoi(parseInfo[7])

	info := LogsInfo{
		Ip:        parseInfo[1],
		Time:      t.Unix(),
		Method:    parseInfo[3],
		Path:      parseInfo[4],
		Protocol:  parseInfo[5],
		Status:    status,
		Size:      size,
		Referer:   parseInfo[8],
		UserAgent: parseInfo[9],
	}

	db.Create(&info)

}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

demo已放码云,有兴趣的可以clone到本地跑

欢迎分享,转载请注明来源:内存溢出

原文地址:http://outofmemory.cn/langs/995298.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-05-21
下一篇2022-05-21

随机推荐

  • 2017年新加坡管理大学奖学金

    新加坡管理大学 (Singapore Management University),简称新大 (SMU),是新加坡5所公立大学之一,是亚洲顶级的财经类院校。新加坡公立大学现有5所,分别是新加坡国立大学

    2022-07-05
    300
  • 美国加州州立大学富尔顿分校优势

    加州州立大学富勒尔顿分校 是加州州立大学系统(California State University)二十三所分校之一,拥有加州大学系统傲人的学术名声。立思辰留学360介绍,加州州立大学富勒顿分校 C

    2022-07-05
    100
  • 加拿大约克大学语言要求如何

      加拿大约克大学位于加拿大第一大城市多伦多北郊,是全加拿大第三大的大学。据立思辰留学360介绍,学校建立于1959 年,校园宽阔,共占地635 公顷,面积之大可以用“一望无际”来形容。学校

    2022-07-05
    000
  • 美国东部大学申请

    立思辰留学360介绍,美国东部大学位于宾夕法尼亚州圣大卫区,是一所基督教大学,信仰、真理、平等是学校的箴言。学校1925年成立,命名为东方浸礼神学院,在 1932年开始分化,并在1952年成为四年制大

    2022-07-05
    300
  • 美国亚利桑那大学录取条件是什么

    立思辰留学360介绍,亚利桑那大学(The University of Arizona)坐落于美国亚利桑那州的图森市,是为了高等教育和研究而设立的赠地大学及太空辅助公立机构,是美国西南最富盛名的大学之

    2022-07-05
    300
  • 乔治布朗学院学习费用介绍

    乔治布朗学院立思辰留学360介绍,乔治布朗学院(George Brown College)是加拿大最大的社区学院之一。自一九六七年即成为多伦多市的正式学院,是一家由政府立案的教育机构。学校的声誉吸引著

    2022-07-05
    300
  • 诺丁汉特伦特大学周围交通怎么样

    据立思辰河北留学360张晶老师介绍,诺丁汉特伦特大学位于诺丁汉市,由三大校区组成,分别是位于市中心的City Campus, 离市中心30分钟车程的Clifton Campus,和40分钟车程的bra

    2022-07-05
    300
  • 2017年北安普顿大学与南安普顿索伦特大学哪个好

      北安普顿大学与南安普顿索伦特大学都有自己的优势专业,具体请咨询立思辰留学360专业顾问团队,咨询电话:4008-941-360北安普顿大学北安普顿大学是一所规模大型、设施一流、师资力量出众的高等学

    2022-07-05
    300
  • 英国萨塞克斯大学优势课程包括哪些

    萨塞克斯大学拥有优秀的师资力量,在科学院系里就有两位曾获得诺贝尔奖的教授,John教授和Harry Kroto教授。其中Harry教授在1996年发现了碳的另一种形态并命名为富勒烯,因此而成为问鼎诺贝

    2022-07-05
    600

发表评论

登录后才能评论

评论列表(0条)

    保存