Asynq 是一个 Go 库,用于对任务进行排队并与工作人员异步处理它们。它由Redis提供支持,旨在可扩展且易于上手。开发自谷歌员工。
Asynq 工作原理的高级概述:
客户端将任务放入队列服务器从队列中拉出任务并为每个任务启动一个工作 goroutine多个工作人员同时处理任务任务队列用作跨多台机器分配工作的机制。一个系统可以由多个工作服务器和代理组成,让位于高可用性和水平扩展。
稳定性和兼容性状态:该库目前正在进行大量开发,API 更改频繁且中断。
快速开始☝️ 重要提示:当前主要版本为零 (
v0.x.x
),以适应快速开发和快速迭代,同时获得用户的早期反馈(感谢 API 的反馈!)。公共 API 可能会在发布前没有主要版本更新的情况下发生变化v1.0.0
。
首先,确保你在本地运行Redis服务器。
$ redis-server
安装Asynq库
go get -u github.com/hibiken/asynq
go get -u github.com/hibiken/asynq/tools/asynqmon
创建项目
mkdir ziji && cd ziji
go mod init ziji
mkdir tasks
touch tasks/beta.go tasks/worker.go tasks/task.go
Redis 连接选项
Asynq 使用 Redis 作为消息代理, beta.go
和worker.go
都需要连接到 Redis 进行写入和读取。
我们将使用 RedisClientOpt
指定如何连接到本地 Redis 实例。
beta.go
package tasks
import (
"github.com/hibiken/asynq"
"log"
"pigs/common"
"pigs/models/cmdb"
)
func TaskBeta() {
c := common.CONFIG.Redis
// 周期性任务
scheduler := asynq.NewScheduler(
asynq.RedisClientOpt{
Addr: c.Host,
Username: c.UserName,
Password: c.PassWord,
DB: c.DB,
}, nil)
var account cmdb.CloudPlatform
common.DB.Table("cloud_platform").Where("enable != ? and type = ?", 0, "aliyun").Find(&account)
syncResource := NewAliCloudTask(&account)
// 每隔5分钟同步一次
entryID, err := scheduler.Register("*/5 * * * *", syncResource)
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
}
NewScheduler
将运行调度程序,用于定期处理任务。调度器定期对任务排队,然后由集群中可用的工作服务器执行。
时区默认情况下,定期任务计划使用UTC时区,更改默认时区可以使用SchedulerOpts参数
scheduler.Register` 接受三个参数,cron时间、任务、队列名`asynq.Queue("cloud")
// Example of using America/Los_Angeles timezone instead of the default UTC timezone.
loc, err := time.LoadLocation("America/Los_Angeles")
if err != nil {
panic(err)
}
scheduler := asynq.NewScheduler(
redisConnOpt,
&asynq.SchedulerOpts{
Location: loc,
},
)
task.go
package tasks
import (
"context"
"encoding/json"
"github.com/hibiken/asynq"
"log"
"pigs/inner/cloud/cloudsync"
"pigs/inner/cloud/cloudvendor"
"pigs/models/cmdb"
)
const (
SyncAliYunCloud = "cmdb:aliyun"
SyncTencentCloud = "cmdb:tencent"
)
// NewAliCloudTask 同步阿里云资产同步任务
func NewAliCloudTask(conf *cmdb.CloudPlatform) *asynq.Task {
payload, err := json.Marshal(conf)
if err != nil {
panic(err)
}
return asynq.NewTask(SyncAliYunCloud, payload)
}
func HandleAliCloudTask(ctx context.Context, t *asynq.Task) error {
var a cmdb.CloudPlatform
if err := json.Unmarshal(t.Payload(), &a); err != nil {
return err
}
_, err := cloudvendor.GetVendorClient(&a)
if err != nil {
log.Fatalf("AccountVerify GetVendorClient failed,%v", err)
return err
}
cloudsync.SyncAliYunHost(&a)
log.Printf("Aliyun Cloud assets are successfully synchronized...")
return nil
}
Tasks 任务
在 asynq
中,工作单元被封装为 Task
类型。
其中有两个字段:“类型” 和 “有效载荷”。
// Task represents a task to be performed.
type Task struct {
// Type indicates the type of a task to be performed.
Type string
// Payload holds data needed to perform the task.
Payload Payload
}
Type
是一个简单的字符串值,指示给定任务的类型。
Payload
保存执行任务所需的数据,您可以将其视为 map[string]interface{}
。需要注意的重要一件事是有效负载值必须是可序列化的。
worker.go
package tasks
import (
"context"
"pigs/common"
"time"
"log"
"os"
"os/signal"
"github.com/hibiken/asynq"
"golang.org/x/sys/unix"
)
// loggingMiddleware 记录任务日志中间件
func loggingMiddleware(h asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
start := time.Now()
log.Printf("Start processing %q", t.Type())
err := h.ProcessTask(ctx, t)
if err != nil {
return err
}
log.Printf("Finished processing %q: Elapsed Time = %v", t.Type(), time.Since(start))
return nil
})
}
func TaskWorker() {
c := common.CONFIG.Redis
srv := asynq.NewServer(
asynq.RedisClientOpt{
Addr: c.Host,
Username: c.UserName,
Password: c.PassWord,
DB: c.DB,
},
asynq.Config{Concurrency: 20},
)
mux := asynq.NewServeMux()
mux.Use(loggingMiddleware)
// 任务执行时的handle
mux.HandleFunc(SyncAliYunCloud, HandleAliCloudTask)
// start server
if err := srv.Start(mux); err != nil {
log.Fatalf("could not start server: %v", err)
}
// Wait for termination signal.
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
for {
s := <-sigs
if s == unix.SIGTSTP {
srv.Shutdown()
continue
}
break
}
// Stop worker server.
srv.Stop()
}
示例
建立任务使用NewTask
方法,并为任务传递类型和有效负载。 可以通过Client.Schedule
传入任务和需要处理的时间来计划任务
func main() {
client := asynq.NewClient(redis)
// 创建任务,声明任务类型,有效负载
t1 := asynq.NewTask("send_register_email", map[string]interface{}{"userName": "zhangsan"})
t2 := asynq.NewTask("send_forget_email", map[string]interface{}{"userName": "zhangsan"})
// 立即处理任务
err := client.Enqueue(t1, time.Now())
if err != nil {
log.Fatal(err)
}
// 2小时后处理任务, 延迟任务
err := client.Enqueue(t2, asynq.ProcessIn(time.Now().Add(2 * time.Hour)))
if err != nil {
log.Fatal(err)
}
}
asynq.Client 支持三种调度任务的方法:Enqueue,EnqueueIn 和 EnqueueAt。
使用 client.Enqueue 将任务立即加入队列。
使用 client.EnqueueIn 或 client.EnqueueAt 来安排将来要处理的任务。 EnqueueAt支持 2021-11-11 15:10:00 时间格式定时执行任务
// 定时任务
package tasks
import (
"fmt"
"github.com/hibiken/asynq"
"log"
"pigs/common"
"pigs/models/cmdb"
"time"
)
type EmailTaskPayloadTest struct {
UserID int64
Msg string
}
func TaskBeta() {
client := asynq.NewClient(
asynq.RedisClientOpt{
Addr: ":6379",
Password: "",
})
payload, err := json.Marshal(EmailTaskPayloadTest{
UserID: 100,
Msg: "test",
})
if err != nil {
log.Fatal(err)
}
t1 := asynq.NewTask("task:oneTask", payload)
// 定时执行任务时间
setDate := "2021-11-11 15:10:00"
dateFormats := "2006-01-02 15:04:05"
// 获取时区
loc, _ := time.LoadLocation("Local")
// 指定日期 转 当地 日期对象 类型为 time.Time
timeObj, err := time.ParseInLocation(dateFormats, setDate, loc)
if err != nil {
fmt.Println("parse time failed err :", err)
return
}
info, err := client.Enqueue(t1, asynq.ProcessAt(timeObj), asynq.Queue("test"))
if err != nil {
log.Fatal(err)
}
log.Printf(" [*] Successfully enqueued task: %+v", info)
启动任务
go run beta.go
go run worker.go
asynq: pid=1274467 2021/11/17 04:27:34.960443 INFO: Starting processing
2021/11/17 12:27:34 /home/risk/code/pigs/tasks/beat.go:25
2021/11/17 12:27:34 registered an entry: “fe209263-1561-4b94-8b72-98b29bab6efe”
asynq: pid=1274467 2021/11/17 04:27:34.962365 INFO: Scheduler starting
asynq: pid=1274467 2021/11/17 04:27:34.962371 INFO: Scheduler timezone is set to UTC
asynq: pid=1274467 2021/11/17 04:27:34.962379 INFO: Send signal TERM or INT to stop the scheduler
2021/11/17 12:45:13 Aliyun Cloud assets are successfully synchronized…
2021/11/17 12:45:13 Finished processing “cmdb:aliyun”: Elapsed Time = 12.471785932s
Asynqmon是一个基于web的工具,用于监视和管理Asynq队列和任务。有关详细信息,请参阅工具的README。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)