python的在这里 --> Python中实战小案例
自定义Log模块注意:
1、ctblog/log为自定义的包,前面是文件夹名字,使用需要修改为自己的文件夹名字
2、本文中所有代码均在我Gitee上
需要修改文件目录为自己的目录,实现向终端与文件输出日志
package log
import (
"io"
"log"
"os"
)
var (
WarningLogger *log.Logger
InfoLogger *log.Logger
ErrorLogger *log.Logger
)
func init() {
path, _ := os.Getwd()
f, err := os.OpenFile(path+"/log/blog.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
ErrorLogger.Println("日志文件打开失败,请检查", err)
return
}
mw := io.MultiWriter(os.Stdout, f)
log.SetOutput(mw)
InfoLogger = log.New(mw, "INFO: ", log.Ldate|log.Ltime|log.Lshortfile)
WarningLogger = log.New(mw, "Warning: ", log.Ldate|log.Ltime|log.Lshortfile)
ErrorLogger = log.New(mw, "Error: ", log.Ldate|log.Ltime|log.Lshortfile)
}
Golang完成对Redis的增删改查
使用NewRedisServer创建对象使用Start连接redis使用WriteToRedis向redis中写入数据,取消注释支持设置过期时间,但是相同的过期时间会间接导致redis发生雪崩使用DeleteFromRedis从redis删除数据使用GetFromRedis从redis查询数据是否存在
package redis
import (
"errors"
"RedisWithKafak/log"
"github.com/garyburd/redigo/redis"
)
type RedisServer struct {
Host string
Password string
Conn redis.Conn
}
func NewRedisServer(host, password string) *RedisServer {
redisserver := &RedisServer{
Host: host,
Password: password,
}
return redisserver
}
func (r *RedisServer) Start() {
var err error
r.Conn, err = redis.Dial("tcp", r.Host)
if err != nil {
log.ErrorLogger.Println("连接失败", err)
return
}
log.InfoLogger.Println("redis 连接成功")
if _, err := r.Conn.Do("AUTH", r.Password); err != nil {
log.ErrorLogger.Println("认证失败", err)
return
}
if _, err := r.Conn.Do("SELECT", 1); err != nil {
log.ErrorLogger.Println("切库失败", err)
return
}
}
func (r *RedisServer) WriteToRedis(key, value string) {
_, err := r.Conn.Do("Set", key, value)
if err != nil {
log.ErrorLogger.Println(err)
return
}
// if _, err := r.Conn.Do("expire", key, 10); err != nil {
// log.InfoLogger.Println("expire err", err)
// }
// log.InfoLogger.Printf("Write and Expire %v To Redis success", key)
}
func (r *RedisServer) GetFromRedis(key string) (string, error) {
value, err := redis.String(r.Conn.Do("Get", key))
if err != nil {
err := errors.New("查询失败")
return "", err
}
return value, nil
}
func (r *RedisServer) DeleteFromRedis(key string) error {
_, err := redis.Int64(r.Conn.Do("DEL", key))
if err != nil {
return err
}
log.InfoLogger.Printf("Delete %v From Redis success!\n", key)
return nil
}
kafka服务端
使用NewKafka创建对象使用Start连接kafka使用SendToKafka向kafka中发送消息
注:使用kafka官方推荐的sarama包 *** 作
package kafka
import (
"RedisWithKafak/log"
"github.com/Shopify/sarama"
)
type Kafka struct {
Addr string
Topic string
Partition int32
client sarama.SyncProducer
config *sarama.Config
}
func NewKafka(addr string, topic string, partition int32) *Kafka {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
return &Kafka{
Addr: addr,
Topic: topic,
Partition: partition,
config: config,
}
}
func (k *Kafka) Start() {
client, err := sarama.NewSyncProducer([]string{k.Addr}, k.config)
if err != nil {
panic(err)
}
k.client = client
log.InfoLogger.Println("kafka连接成功")
}
func (k *Kafka) SendToKafka(data string) {
msg := &sarama.ProducerMessage{}
msg.Topic = k.Topic
msg.Value = sarama.StringEncoder(data)
pid, offset, err := k.client.SendMessage(msg)
if err != nil {
log.ErrorLogger.Println("发送失败", err)
return
}
log.InfoLogger.Println("kafka消息发送成功")
log.InfoLogger.Printf("pid : %v, offset : %v", pid, offset)
}
Kafka客户端
启动方式:.\kafka-consumer.exe -t your_topic_name -h xxx.xxx.xxx.xxx:9092
package main
import (
"flag"
"github.com/Shopify/sarama"
"fmt"
"sync"
)
var topic string
var host string
func init(){
flag.StringVar(&topic,"t","logagent","Set topic,default:logagent")
flag.StringVar(&host,"h","127.0.0.1:9092","Set host,default:127.0.0.1:9092")
}
func main(){
flag.Parse()
consumer , err := sarama.NewConsumer([]string{host},nil)
if err != nil {
fmt.Println("kafka连接失败")
return
}
partitionlist,err:= consumer.Partitions(topic) // 根据topic取到所有的分区
if err != nil {
fmt.Println("分区查询失败")
return
}
for partition := range partitionlist {
pr ,err := consumer.ConsumePartition(topic,int32(partition),sarama.OffsetNewest) //为每个分区创建一个消费者
if err != nil {
fmt.Println("分区内主题失败",partition)
return
}
defer pr.AsyncClose()
var wg sync.WaitGroup
wg.Add(1)
go func(sarama.PartitionConsumer){
defer wg.Done()
for msg:= range pr.Messages(){
fmt.Printf("分区: %v ,偏移位: %v , key: %v , value: %v \n",msg.Partition,msg.Offset,msg.Key,string(msg.Value))
}
}(pr)
wg.Wait()
}
}
使用配置文件反射到结构体中
配置文件如下:
[kafka]
address = "129.198.32.147:9092"
topic = "logagent"
[collect]
logfile_path = "/home/logagent.log"
package main
import (
"fmt"
"github.com/go-ini/ini"
)
type Kafka struct {
Addr string `ini:"address"`
Topic string `ini:"topic"`
}
func main() {
var k = new(Kafka)
cfg, err := ini.Load("/root/gopath/src/loghandler/config/logagent.ini")
if err != nil {
panic(err)
}
err = cfg.Section("kafka").MapTo(k)
if err != nil {
panic(err)
}
fmt.Printf("k.Topic = %v", k.Topic)
}
//k.Topic = logagent
简单的生产者消费者模型
package main
import (
"fmt"
"sync"
"time"
)
type Queue struct {
queue []string
cond *sync.Cond
}
func main() {
s := &Queue{
queue: []string{},
cond: sync.NewCond(&sync.Mutex{}),
}
go func() {
for {
s.InQueue("chenteng")
time.Sleep(1 * time.Second)
}
}()
for {
res := s.OutQueue()
fmt.Println("result = ", res)
time.Sleep(1 * time.Second)
}
}
func (q *Queue) InQueue(item string) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.queue = append(q.queue, item)
fmt.Println(item, "入队")
q.cond.Broadcast()
}
func (q *Queue) OutQueue() string {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if len(q.queue) == 0 {
fmt.Println("no data available,wait")
q.cond.Wait()
}
result := q.queue[0]
q.queue = q.queue[1:]
return result
}
//no data available,wait
//chenteng 入队
//result = chenteng
//no data available,wait
//chenteng 入队
//result = chenteng
//no data available,wait
//chenteng 入队
//result = chenteng
MysqlDump备份数据库
注意:宿主机需要mysql命令,已放弃,后面我会用xorm是实现
加载配置文件,也可以写死
package config
import (
"ctblog/log"
"github.com/BurntSushi/toml"
"os"
"sync"
"ctblog/config"
"time"
)
var Cfg *TomlConfig
type MysqlInfo struct {
Host string
Port string
User string
Passwd string
Database string
}
type DBBak struct {
MysqlInfo config.MysqlInfo
BakTime time.Time
isBakOk bool
}
type TomlConfig struct {
DBInfo MysqlInfo
}
func init() {
once.Do(func() {
Cfg = new(TomlConfig)
var err error
Cfg.System.CurrentDir, err = os.Getwd()
if err != nil {
panic(err)
}
Cfg.System.AppName = "mszlu-go-blog"
Cfg.System.Version = 1.0
_, err = toml.DecodeFile("config/config.toml", &Cfg)
if err != nil {
log.ErrorLogger.Println("读取配置文件失败", err)
panic(err)
}
})
}
Mysql备份逻辑
package others
import (
"ctblog/config"
"ctblog/log"
"io/ioutil"
"os"
"os/exec"
"time"
)
func MysqlBakInit() *DBBak {
return &DBBak{
config.Cfg.DBInfo,
time.Now(),
false}
}
func (d *DBBak) StartMysqlBak() (error, string) {
log.InfoLogger.Println("开始备份")
//定义Cmd结构体对象指针
var cmd *exec.Cmd
cmd = exec.Command("mysqldump", "-h"+d.MysqlInfo.Host, "-P"+d.MysqlInfo.Port, "-u"+d.MysqlInfo.User, "-p"+d.MysqlInfo.Passwd, d.MysqlInfo.Database)
//StdinPipe方法返回一个在命令Start后与命令标准输入关联的管道。
stdout, err := cmd.StdoutPipe()
if err != nil {
log.ErrorLogger.Println(err)
return err, ""
}
if err := cmd.Start(); err != nil {
log.ErrorLogger.Println(err)
return err, ""
}
bytes, err := ioutil.ReadAll(stdout)
if err != nil {
log.ErrorLogger.Println(err)
return err, ""
}
//获得一个当前的时间戳
now := d.BakTime.Format("20060102150405")
var backupPath string
path, _ := os.Getwd()
backupPath = path + "/bakfile/" + d.MysqlInfo.Database + "_" + now + ".sql"
//写入文件并设置文件权限
err = ioutil.WriteFile(backupPath, bytes, 0644)
if err != nil {
log.ErrorLogger.Println(err)
return err, ""
}
//备份成功
d.isBakOk = true
return nil, backupPath
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)