Asynq 高性能分布式任务队列实践

Asynq 高性能分布式任务队列实践,第1张

Asynq 是一个 Go 库,用于对任务进行排队并与工作人员异步处理它们。它由Redis提供支持,旨在可扩展且易于上手。开发自谷歌员工。

Asynq 工作原理的高级概述:

客户端将任务放入队列服务器从队列中拉出任务并为每个任务启动一个工作 goroutine多个工作人员同时处理任务

任务队列用作跨多台机器分配工作的机制。一个系统可以由多个工作服务器和代理组成,让位于高可用性和水平扩展。

稳定性和兼容性

状态:该库目前正在进行大量开发,API 更改频繁且中断。

☝️ 重要提示:当前主要版本为零 ( v0.x.x),以适应快速开发和快速迭代,同时获得用户的早期反馈(感谢 API 的反馈!)。公共 API 可能会在发布前没有主要版本更新的情况下发生变化v1.0.0

快速开始

首先,确保你在本地运行Redis服务器。

$ redis-server
安装Asynq库
go get -u github.com/hibiken/asynq
go get -u github.com/hibiken/asynq/tools/asynqmon

创建项目

mkdir ziji && cd ziji
go mod init ziji
mkdir tasks
touch tasks/beta.go  tasks/worker.go  tasks/task.go

Redis 连接选项

Asynq 使用 Redis 作为消息代理, beta.goworker.go都需要连接到 Redis 进行写入和读取。
我们将使用 RedisClientOpt 指定如何连接到本地 Redis 实例。

beta.go

package tasks

import (
   "github.com/hibiken/asynq"
   "log"
   "pigs/common"
   "pigs/models/cmdb"
)

func TaskBeta() {
   c := common.CONFIG.Redis
   // 周期性任务
   scheduler := asynq.NewScheduler(
      asynq.RedisClientOpt{
         Addr:     c.Host,
         Username: c.UserName,
         Password: c.PassWord,
         DB:       c.DB,
      }, nil)

   var account cmdb.CloudPlatform
   common.DB.Table("cloud_platform").Where("enable != ? and type = ?", 0, "aliyun").Find(&account)
   syncResource := NewAliCloudTask(&account)
   // 每隔5分钟同步一次
   entryID, err := scheduler.Register("*/5 * * * *", syncResource)

   if err != nil {
      log.Fatal(err)
   }
   log.Printf("registered an entry: %q\n", entryID)

   if err := scheduler.Run(); err != nil {
      log.Fatal(err)
   }
}
NewScheduler

将运行调度程序,用于定期处理任务。调度器定期对任务排队,然后由集群中可用的工作服务器执行。

时区

默认情况下,定期任务计划使用UTC时区,更改默认时区可以使用SchedulerOpts参数

scheduler.Register` 接受三个参数,cron时间、任务、队列名`asynq.Queue("cloud")
// Example of using America/Los_Angeles timezone instead of the default UTC timezone.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
    panic(err)
}
scheduler := asynq.NewScheduler(
    redisConnOpt, 
    &asynq.SchedulerOpts{
        Location: loc,
    },
)

task.go

package tasks

import (
   "context"
   "encoding/json"
   "github.com/hibiken/asynq"
   "log"
   "pigs/inner/cloud/cloudsync"
   "pigs/inner/cloud/cloudvendor"
   "pigs/models/cmdb"
)

const (
   SyncAliYunCloud  = "cmdb:aliyun"
   SyncTencentCloud = "cmdb:tencent"
)

// NewAliCloudTask 同步阿里云资产同步任务
func NewAliCloudTask(conf *cmdb.CloudPlatform) *asynq.Task {
   payload, err := json.Marshal(conf)
   if err != nil {
      panic(err)
   }
   return asynq.NewTask(SyncAliYunCloud, payload)
}

func HandleAliCloudTask(ctx context.Context, t *asynq.Task) error {

   var a cmdb.CloudPlatform
   if err := json.Unmarshal(t.Payload(), &a); err != nil {
      return err
   }

   _, err := cloudvendor.GetVendorClient(&a)
   if err != nil {
      log.Fatalf("AccountVerify GetVendorClient failed,%v", err)
      return err
   }

   cloudsync.SyncAliYunHost(&a)

   log.Printf("Aliyun Cloud assets are successfully synchronized...")
   return nil
}
Tasks 任务

asynq 中,工作单元被封装为 Task 类型。
其中有两个字段:“类型” 和 “有效载荷”。

// Task represents a task to be performed.
type Task struct {
    // Type indicates the type of a task to be performed.
    Type string

    // Payload holds data needed to perform the task.
    Payload Payload
}

Type 是一个简单的字符串值,指示给定任务的类型。
Payload 保存执行任务所需的数据,您可以将其视为 map[string]interface{}。需要注意的重要一件事是有效负载值必须是可序列化的。

worker.go

package tasks

import (
   "context"
   "pigs/common"
   "time"

   "log"
   "os"
   "os/signal"

   "github.com/hibiken/asynq"
   "golang.org/x/sys/unix"
)

// loggingMiddleware 记录任务日志中间件
func loggingMiddleware(h asynq.Handler) asynq.Handler {
   return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
      start := time.Now()
      log.Printf("Start processing %q", t.Type())
      err := h.ProcessTask(ctx, t)
      if err != nil {
         return err
      }
      log.Printf("Finished processing %q: Elapsed Time = %v", t.Type(), time.Since(start))
      return nil
   })
}

func TaskWorker() {
   c := common.CONFIG.Redis
   srv := asynq.NewServer(
      asynq.RedisClientOpt{
         Addr:     c.Host,
         Username: c.UserName,
         Password: c.PassWord,
         DB:       c.DB,
      },
      asynq.Config{Concurrency: 20},
   )

   mux := asynq.NewServeMux()
   mux.Use(loggingMiddleware)
   // 任务执行时的handle
   mux.HandleFunc(SyncAliYunCloud, HandleAliCloudTask)

   // start server
   if err := srv.Start(mux); err != nil {
      log.Fatalf("could not start server: %v", err)
   }

   // Wait for termination signal.
   sigs := make(chan os.Signal, 1)
   signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
   for {
      s := <-sigs
      if s == unix.SIGTSTP {
         srv.Shutdown()
         continue
      }
      break
   }

   // Stop worker server.
   srv.Stop()
}
示例

建立任务使用NewTask方法,并为任务传递类型和有效负载。 可以通过Client.Schedule传入任务和需要处理的时间来计划任务

func main() {
  client := asynq.NewClient(redis)
  
  // 创建任务,声明任务类型,有效负载
  t1 := asynq.NewTask("send_register_email", map[string]interface{}{"userName": "zhangsan"})
  t2 := asynq.NewTask("send_forget_email", map[string]interface{}{"userName": "zhangsan"})

  // 立即处理任务
  err := client.Enqueue(t1, time.Now())
  if err != nil {
    log.Fatal(err)
  }

  // 2小时后处理任务, 延迟任务
  err := client.Enqueue(t2, asynq.ProcessIn(time.Now().Add(2 * time.Hour)))
  if err != nil {
    log.Fatal(err)
  }
}

asynq.Client 支持三种调度任务的方法:Enqueue,EnqueueIn 和 EnqueueAt。
使用 client.Enqueue 将任务立即加入队列。
使用 client.EnqueueIn 或 client.EnqueueAt 来安排将来要处理的任务。 EnqueueAt支持 2021-11-11 15:10:00 时间格式定时执行任务

// 定时任务
package tasks

import (
   "fmt"
   "github.com/hibiken/asynq"
   "log"
   "pigs/common"
   "pigs/models/cmdb"
   "time"
)

type EmailTaskPayloadTest struct {
	UserID  int64
	Msg		string
}

func TaskBeta() {
   client := asynq.NewClient(
      asynq.RedisClientOpt{
         Addr:     ":6379",
         Password: "",
      })
   
   payload, err := json.Marshal(EmailTaskPayloadTest{
      UserID: 100,
      Msg:    "test",
   })
   if err != nil {
      log.Fatal(err)
   }
   t1 := asynq.NewTask("task:oneTask", payload)
   
   // 定时执行任务时间
   setDate := "2021-11-11 15:10:00"
   dateFormats := "2006-01-02 15:04:05"

   // 获取时区
   loc, _ := time.LoadLocation("Local")

   // 指定日期 转 当地 日期对象 类型为 time.Time
   timeObj, err := time.ParseInLocation(dateFormats, setDate, loc)
   if err != nil {
      fmt.Println("parse time failed err :", err)
      return
   }

   info, err := client.Enqueue(t1, asynq.ProcessAt(timeObj), asynq.Queue("test"))
   if err != nil {
      log.Fatal(err)
   }
   
   log.Printf(" [*] Successfully enqueued task: %+v", info)
启动任务
go run beta.go
go run worker.go

asynq: pid=1274467 2021/11/17 04:27:34.960443 INFO: Starting processing

2021/11/17 12:27:34 /home/risk/code/pigs/tasks/beat.go:25
2021/11/17 12:27:34 registered an entry: “fe209263-1561-4b94-8b72-98b29bab6efe”
asynq: pid=1274467 2021/11/17 04:27:34.962365 INFO: Scheduler starting
asynq: pid=1274467 2021/11/17 04:27:34.962371 INFO: Scheduler timezone is set to UTC
asynq: pid=1274467 2021/11/17 04:27:34.962379 INFO: Send signal TERM or INT to stop the scheduler

2021/11/17 12:45:13 Aliyun Cloud assets are successfully synchronized…
2021/11/17 12:45:13 Finished processing “cmdb:aliyun”: Elapsed Time = 12.471785932s

Web UI

Asynqmon是一个基于web的工具,用于监视和管理Asynq队列和任务。有关详细信息,请参阅工具的README。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/990136.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存