Golang中实战小案例(持续更新中)

Golang中实战小案例(持续更新中),第1张

Golang中实战小案例(持续更新中)

文章目录 Golang中实战小案例(持续更新中)自定义Log模块Golang完成对Redis的增删改查kafka服务端Kafka客户端使用配置文件反射到结构体中简单的生产者消费者模型MysqlDump备份数据库
python的在这里 --> Python中实战小案例

注意:

1、ctblog/log为自定义的包,前面是文件夹名字,使用需要修改为自己的文件夹名字
2、本文中所有代码均在我Gitee上

自定义Log模块

需要修改文件目录为自己的目录,实现向终端与文件输出日志

package log

import (
	"io"
	"log"
	"os"
)

var (
	WarningLogger *log.Logger
	InfoLogger    *log.Logger
	ErrorLogger   *log.Logger
)

func init() {
	path, _ := os.Getwd()
	f, err := os.OpenFile(path+"/log/blog.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
	if err != nil {
		ErrorLogger.Println("日志文件打开失败,请检查", err)
		return
	}
	mw := io.MultiWriter(os.Stdout, f)
	log.SetOutput(mw)
	InfoLogger = log.New(mw, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
	WarningLogger = log.New(mw, "Warning: ", log.Ldate|log.Ltime|log.Lshortfile)
	ErrorLogger = log.New(mw, "Error: ", log.Ldate|log.Ltime|log.Lshortfile)
}
Golang完成对Redis的增删改查 使用NewRedisServer创建对象使用Start连接redis使用WriteToRedis向redis中写入数据,取消注释支持设置过期时间,但是相同的过期时间会间接导致redis发生雪崩使用DeleteFromRedis从redis删除数据使用GetFromRedis从redis查询数据是否存在
package redis

import (
	"errors"

	"RedisWithKafak/log"

	"github.com/garyburd/redigo/redis"
)

type RedisServer struct {
	Host     string
	Password string
	Conn     redis.Conn
}

func NewRedisServer(host, password string) *RedisServer {
	redisserver := &RedisServer{
		Host:     host,
		Password: password,
	}
	return redisserver
}

func (r *RedisServer) Start() {
	var err error
	r.Conn, err = redis.Dial("tcp", r.Host)
	if err != nil {
		log.ErrorLogger.Println("连接失败", err)
		return
	}
	log.InfoLogger.Println("redis 连接成功")
	if _, err := r.Conn.Do("AUTH", r.Password); err != nil {
		log.ErrorLogger.Println("认证失败", err)
		return
	}
	if _, err := r.Conn.Do("SELECT", 1); err != nil {
		log.ErrorLogger.Println("切库失败", err)
		return
	}
}

func (r *RedisServer) WriteToRedis(key, value string) {
	_, err := r.Conn.Do("Set", key, value)
	if err != nil {
		log.ErrorLogger.Println(err)
		return
	}
	// if _, err := r.Conn.Do("expire", key, 10); err != nil {
	// 	log.InfoLogger.Println("expire err", err)
	// }
	// log.InfoLogger.Printf("Write and Expire %v To Redis success", key)
}

func (r *RedisServer) GetFromRedis(key string) (string, error) {
	value, err := redis.String(r.Conn.Do("Get", key))
	if err != nil {
		err := errors.New("查询失败")
		return "", err
	}
	return value, nil
}

func (r *RedisServer) DeleteFromRedis(key string) error {
	_, err := redis.Int64(r.Conn.Do("DEL", key))
	if err != nil {
		return err
	}
	log.InfoLogger.Printf("Delete %v From Redis success!\n", key)
	return nil
}

kafka服务端 使用NewKafka创建对象使用Start连接kafka使用SendToKafka向kafka中发送消息

注:使用kafka官方推荐的sarama包 *** 作

package kafka

import (
	"RedisWithKafak/log"

	"github.com/Shopify/sarama"
)

type Kafka struct {
	Addr      string
	Topic     string
	Partition int32
	client    sarama.SyncProducer
	config    *sarama.Config
}

func NewKafka(addr string, topic string, partition int32) *Kafka {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	config.Producer.Return.Successes = true
	return &Kafka{
		Addr:      addr,
		Topic:     topic,
		Partition: partition,
		config:    config,
	}
}

func (k *Kafka) Start() {
	client, err := sarama.NewSyncProducer([]string{k.Addr}, k.config)
	if err != nil {
		panic(err)
	}
	k.client = client
	log.InfoLogger.Println("kafka连接成功")
}

func (k *Kafka) SendToKafka(data string) {
	msg := &sarama.ProducerMessage{}
	msg.Topic = k.Topic
	msg.Value = sarama.StringEncoder(data)
	pid, offset, err := k.client.SendMessage(msg)
	if err != nil {
		log.ErrorLogger.Println("发送失败", err)
		return
	}
	log.InfoLogger.Println("kafka消息发送成功")
	log.InfoLogger.Printf("pid : %v, offset : %v", pid, offset)
}

Kafka客户端

启动方式:.\kafka-consumer.exe -t your_topic_name -h xxx.xxx.xxx.xxx:9092

package main

import (
	"flag"
	"github.com/Shopify/sarama"
	"fmt"
	"sync"
)


var topic string
var host string
func init(){
	flag.StringVar(&topic,"t","logagent","Set topic,default:logagent")
	flag.StringVar(&host,"h","127.0.0.1:9092","Set host,default:127.0.0.1:9092")
}

func main(){
	flag.Parse()
	consumer , err := sarama.NewConsumer([]string{host},nil)
	if err != nil {
		fmt.Println("kafka连接失败")
		return
	}
	partitionlist,err:= consumer.Partitions(topic) // 根据topic取到所有的分区
	if err != nil {
		fmt.Println("分区查询失败")
		return
	}
	for partition := range partitionlist {
		pr ,err := consumer.ConsumePartition(topic,int32(partition),sarama.OffsetNewest) //为每个分区创建一个消费者
		if err != nil  {
			fmt.Println("分区内主题失败",partition)
			return
		}
		defer  pr.AsyncClose()
		var wg sync.WaitGroup
		wg.Add(1)
		go func(sarama.PartitionConsumer){
			defer wg.Done()
			for msg:= range pr.Messages(){
				fmt.Printf("分区: %v ,偏移位: %v , key: %v , value: %v \n",msg.Partition,msg.Offset,msg.Key,string(msg.Value))
			}
		}(pr)
		wg.Wait()
	}
}

使用配置文件反射到结构体中

配置文件如下:

[kafka]
address = "129.198.32.147:9092"
topic = "logagent"

[collect]
logfile_path = "/home/logagent.log"
package main

import (
	"fmt"

	"github.com/go-ini/ini"
)

type Kafka struct {
	Addr  string `ini:"address"`
	Topic string `ini:"topic"`
}

func main() {
	var k = new(Kafka)
	cfg, err := ini.Load("/root/gopath/src/loghandler/config/logagent.ini")
	if err != nil {
		panic(err)
	}
	err = cfg.Section("kafka").MapTo(k)
	if err != nil {
		panic(err)
	}
	fmt.Printf("k.Topic = %v", k.Topic)
}


//k.Topic = logagent

简单的生产者消费者模型
package main

import (
        "fmt"
        "sync"
        "time"
)

type Queue struct {
        queue []string
        cond  *sync.Cond
}

func main() {
        s := &Queue{
                queue: []string{},
                cond:  sync.NewCond(&sync.Mutex{}),
        }
        go func() {
                for {
                        s.InQueue("chenteng")
                        time.Sleep(1 * time.Second)
                }
        }()
        for {
                res := s.OutQueue()
                fmt.Println("result = ", res)
                time.Sleep(1 * time.Second)
        }
}

func (q *Queue) InQueue(item string) {
        q.cond.L.Lock()
        defer q.cond.L.Unlock()
        q.queue = append(q.queue, item)
        fmt.Println(item, "入队")
        q.cond.Broadcast()
}

func (q *Queue) OutQueue() string {
        q.cond.L.Lock()
        defer q.cond.L.Unlock()
        if len(q.queue) == 0 {
                fmt.Println("no data available,wait")
                q.cond.Wait()
        }
        result := q.queue[0]
        q.queue = q.queue[1:]
        return result
}


//no data available,wait
//chenteng 入队
//result =  chenteng
//no data available,wait
//chenteng 入队
//result =  chenteng
//no data available,wait
//chenteng 入队
//result =  chenteng
MysqlDump备份数据库

注意:宿主机需要mysql命令,已放弃,后面我会用xorm是实现

加载配置文件,也可以写死

package config

import (
	"ctblog/log"
	"github.com/BurntSushi/toml"
	"os"
	"sync"
	"ctblog/config"
	"time"
)
var Cfg *TomlConfig

type MysqlInfo struct {
	Host     string
	Port     string
	User     string
	Passwd   string
	Database string
}

type DBBak struct {
	MysqlInfo config.MysqlInfo
	BakTime   time.Time
	isBakOk   bool
}

type TomlConfig struct {
	DBInfo MysqlInfo
}

func init() {
	once.Do(func() {
		Cfg = new(TomlConfig)
		var err error
		Cfg.System.CurrentDir, err = os.Getwd()
		if err != nil {
			panic(err)
		}
		Cfg.System.AppName = "mszlu-go-blog"
		Cfg.System.Version = 1.0
		_, err = toml.DecodeFile("config/config.toml", &Cfg)
		if err != nil {
			log.ErrorLogger.Println("读取配置文件失败", err)
			panic(err)
		}
	})
}

Mysql备份逻辑

package others

import (
	"ctblog/config"
	"ctblog/log"
	"io/ioutil"
	"os"
	"os/exec"
	"time"
)

func MysqlBakInit() *DBBak {
	return &DBBak{
		config.Cfg.DBInfo,
		time.Now(),
		false}
}

func (d *DBBak) StartMysqlBak() (error, string) {
	log.InfoLogger.Println("开始备份")
	//定义Cmd结构体对象指针
	var cmd *exec.Cmd
	cmd = exec.Command("mysqldump", "-h"+d.MysqlInfo.Host, "-P"+d.MysqlInfo.Port, "-u"+d.MysqlInfo.User, "-p"+d.MysqlInfo.Passwd, d.MysqlInfo.Database)
	//StdinPipe方法返回一个在命令Start后与命令标准输入关联的管道。
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		log.ErrorLogger.Println(err)
		return err, ""
	}
	if err := cmd.Start(); err != nil {
		log.ErrorLogger.Println(err)
		return err, ""
	}
	bytes, err := ioutil.ReadAll(stdout)
	if err != nil {
		log.ErrorLogger.Println(err)
		return err, ""
	}
	//获得一个当前的时间戳
	now := d.BakTime.Format("20060102150405")
	var backupPath string
	path, _ := os.Getwd()
	backupPath = path + "/bakfile/" + d.MysqlInfo.Database + "_" + now + ".sql"

	//写入文件并设置文件权限
	err = ioutil.WriteFile(backupPath, bytes, 0644)

	if err != nil {
		log.ErrorLogger.Println(err)
		return err, ""
	}
	//备份成功
	d.isBakOk = true
	return nil, backupPath
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/995066.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存