Go语言使用定时器实现任务队列

Go语言使用定时器实现任务队列,第1张

概述Go语言中提供了两种定时器 timer 和 ticker,分别是一次性定时器和重复任务定时器。本节咱们主要介绍如何用Go语言实现一个定时器任务队列实例,非常具有实用价值。 Go语言中定时器 一 Go语言中提供了两种定时器 timer 和 ticker,分别是一次性定时器和重复任务定时器。本节咱们主要介绍如何使用Go语言的定时器实现一个任务队列,非常具有实用价值。
Go语言中定时器一般用法:
package mainimport(    "fmt"    "time")func main() {    input := make(chan interface{})    //producer - produce the messages    go func() {        for i := 0; i < 5; i++ {            input <- i        }        input <- "hello,world"    }()    t1 := time.NewTimer(time.Second * 5)    t2 := time.NewTimer(time.Second * 10)    for {        select {            //consumer - consume the messages            case msg := <-input:                fmt.Println(msg)            case <-t1.C:                println("5s timer")                t1.reset(time.Second * 5)            case <-t2.C:                println("10s timer")                t2.reset(time.Second * 10)        }    }}
上面代码中的这个 C 是啥呢,我们去源码看看,以 timer 为例:

type Timer struct {
    C <-chan Time
    r runtimeTimer
}

原来是一个 channel,其实有 GO 基础的都知道,GO 的运算符当出现的 -> 或者 <- 的时候,必然是有一端是指 channel。按照上面的例子来看,就是阻塞在一个 for 循环内,等待到了定时器的 C 从 channel 出来,当获取到值的时候,进行想要的 *** 作。
设计我们的定时任务队列当时的需求是这样,需要接收到客户端的请求并产生一个定时任务,会在固定时间执行,可能是一次,也可能是多次,也可能到指定时间自动停止,可能当任务终止的时候,还要能停止掉。

具体的流程如下图所示:


定义结构
type OnceCron struct {    tasks []*Task   //任务的列队    add chan *Task  //当遭遇到新任务的时候    remove chan string  //当遭遇到删除任务的时候    stop chan struct{}  //当遇到停止信号的时候    Logger *log.Logger  //日志}type Job interface {    Run()     //执行接口}type Task struct {    Job  Job   //要执行的任务    UuID string   //任务标识,删除时用    RunTime int64   //执行时间    Spacing int64   //间隔时间    EndTime int64   //结束时间    Number int    //总共要次数}
队列实现首先,我们要获得一个队列任务

func NewCron() *OnceCron常规 *** 作,为了节省篇幅,就不写出来,具体可以看源码,贴在了底部。

然后,开始定时器队列的运行,一般,都会命名为 Start。那么就有一个问题,我们刚开始启动程序的时候,这个时候是没有任务队列,那岂不是 for{select{}} 在等待个毛毛球?所以,我们需要在 Start 的时候添加一个默认的任务,防止队列退出。
func (one *OnceCron) Start() {    //初始化的时候加入一个一年的长定时器,间隔1小时执行一次    task := getTaskWithFuncSpacing(3600,time.Now().Add(time.Hour*24*365).Unix(),func() {    log.Println("It's a Hour timer!")    }) //为了代码格式 markdown 里面有个括号改成全角了    one.tasks = append(one.tasks,task)    go one.run() //协成执行 防止主进程被阻塞}
执行部分应该是重点的,分成三部:
首先获得一个最先执行的任务然后产生一个定时器,用于执行任务进行阻塞判断,获取我们要进行的 *** 作
package mainimport (    "time"    "log"    "github.com/Google/uuID"    "os")//compatible old nametype OnceCron struct {    *TaskScheduler}//only exec cron timer crontype TaskScheduler struct {    tasks  []TaskInterface    swap   []TaskInterface    add    chan TaskInterface    remove chan string    stop   chan struct{}    Logger TaskLogInterface    lock    bool}type Lock interface {    Lock()    UnLock()}//return old name with OnceCronfunc NewCron() *OnceCron {    return &OnceCron{        TaskScheduler:NewScheduler(),}}//return a Controller Schedulerfunc NewScheduler() *TaskScheduler {    return &TaskScheduler{        tasks:  make([]TaskInterface,0),swap:   make([]TaskInterface,add:    make(chan TaskInterface),stop:   make(chan struct{}),remove: make(chan string),Logger: log.New(os.Stdout,"[Control]: ",log.Ldate|log.Ltime|log.Lshortfile),lock:   false,}}//add spacing time job to List with numberfunc (scheduler *TaskScheduler) AddFuncSpaceNumber(spaceTime int64,number int,f func()) {    task := getTaskWithFuncSpacingNumber(spaceTime,number,f)    scheduler.addTask(task)}//add spacing time job to List with endTimefunc (scheduler *TaskScheduler) AddFuncSpace(spaceTime int64,endTime int64,f func()) {    task := getTaskWithFuncSpacing(spaceTime,endTime,f)    scheduler.addTask(task)}//add func to Listfunc (scheduler *TaskScheduler) AddFunc(unixTime int64,f func()) {    task := getTaskWithFunc(unixTime,f)    scheduler.addTask(task)}func (scheduler *TaskScheduler) AddTaskInterface(task TaskInterface) {    scheduler.addTask(task)}//add a task to Listfunc (scheduler *TaskScheduler) AddTask(task *Task) string {    if task.RunTime != 0 {        if task.RunTime < 100000000000 {            task.RunTime = task.RunTime * int64(time.Second)        }        if task.RunTime < time.Now().UnixNano() {            //延遲1秒            task.RunTime = time.Now().UnixNano() + int64(time.Second)        }    } else {        if task.Spacing > 0 {            task.RunTime = time.Now().UnixNano() + task.Spacing * int64(time.Second)        }else{            scheduler.Logger.Println("error too add task! Runtime error")            return ""        }    }    if task.UuID == "" {        task.UuID = uuID.New().String()    }    return scheduler.addTask(task)}//if lock add to swapfunc (scheduler *TaskScheduler) addTask(task TaskInterface) string  {    if scheduler.lock {        scheduler.swap = append(scheduler.swap,task)        scheduler.add <- task    } else{        scheduler.tasks = append(scheduler.tasks,task)        scheduler.add <- task    }    return task.GetUuID()}//new exportfunc (scheduler *TaskScheduler) ExportInterface() []TaskInterface {    return scheduler.tasks}//compatible old export tasksfunc (scheduler *TaskScheduler) Export() []*Task {    task := make([]*Task,0)    for _,v := range scheduler.tasks {        task = append(task,v.(*Task))    }    return task}//stop task with uuIDfunc (scheduler *TaskScheduler) StopOnce(uuIDStr string) {    scheduler.remove <- uuIDStr}//run Cronfunc (scheduler *TaskScheduler) Start() {    //初始化的时候加入一个一年的长定时器,time.Now().Add(time.Hour * 24 * 365).UnixNano(),func() {        log.Println("It's a Hour timer!")    })    scheduler.tasks = append(scheduler.tasks,task)    go scheduler.run()}//stop allfunc (scheduler *TaskScheduler) Stop() {    scheduler.stop <- struct{}{}}//run task List//if is empty,run a year timer taskfunc (scheduler *TaskScheduler) run() {    for {        Now := time.Now()        task,key := scheduler.GetTask()        runTime := task.GetRunTime()        i64 := runTime - Now.UnixNano()        var d time.Duration        if i64 < 0 {            scheduler.tasks[key].SetRuntime(Now.UnixNano())            if task != nil {                go task.RunJob()            }            scheduler.doAndreset(key)            continue        } else {            sec := runTime / int64(time.Second)            nsec := runTime % int64(time.Second)            d = time.Unix(sec,nsec).Sub(Now)        }        timer := time.NewTimer(d)        //catch a chan and do something        for {            select {            //if time has expired do task and shift key if is task List            case <-timer.C:                scheduler.doAndreset(key)                if task != nil {                    //fmt.Println(scheduler.tasks[key])                    go task.RunJob()                    timer.Stop()                }                //if add task            case <-scheduler.add:                timer.Stop()                // remove task with remove uuID            case uuIDstr := <-scheduler.remove:                scheduler.removeTask(uuIDstr)                timer.Stop()                //if get a stop single exit            case <-scheduler.stop:                timer.Stop()                return            }            break        }    }}//return a task and key In task Listfunc (scheduler *TaskScheduler) GetTask() (task TaskGetInterface,tempKey int) {    scheduler.Lock()    defer scheduler.UnLock()    min := scheduler.tasks[0].GetRunTime()    tempKey = 0    for key,task := range scheduler.tasks {        tTime := task.GetRunTime()        if min <= tTime {            continue        }        if min > tTime {            tempKey = key            min = tTime            continue        }    }    task = scheduler.tasks[tempKey]    return task,tempKey}//if add a new task and runtime < Now task runtime// stop Now timer and againfunc (scheduler *TaskScheduler) doAndreset(key int) {    scheduler.Lock()    defer scheduler.UnLock()    //null pointer    if key < len(scheduler.tasks) {        NowTask := scheduler.tasks[key]        scheduler.tasks = append(scheduler.tasks[:key],scheduler.tasks[key+1:]...)        if NowTask.GetSpacing() > 0 {            tTime := NowTask.GetRunTime()            NowTask.SetRuntime(NowTask.GetSpacing() * int64(time.Second) + tTime)            number := NowTask.GetRunNumber()            if number > 1 {                NowTask.SetRunNumber(number - 1)                scheduler.tasks = append(scheduler.tasks,NowTask)            } else if NowTask.GetEndTime() >= tTime {                scheduler.tasks = append(scheduler.tasks,NowTask)            }        }    }}//remove task by uuIDfunc (scheduler *TaskScheduler) removeTask(uuIDStr string) {    scheduler.Lock()    defer scheduler.UnLock()    for key,task := range scheduler.tasks {        if task.GetUuID() == uuIDStr {            scheduler.tasks = append(scheduler.tasks[:key],scheduler.tasks[key+1:]...)            break        }    }}//lock task []func (scheduler *TaskScheduler) Lock() {    scheduler.lock = true}//unlock task []func (scheduler *TaskScheduler) UnLock() {    scheduler.lock = false    if len(scheduler.swap) > 0 {        for _,task := range scheduler.swap {            scheduler.tasks = append(scheduler.tasks,task)        }        scheduler.swap = make([]TaskInterface,0)    }}
总结

以上是内存溢出为你收集整理的Go语言使用定时器实现任务队列全部内容,希望文章能够帮你解决Go语言使用定时器实现任务队列所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1271158.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-08
下一篇 2022-06-08

发表评论

登录后才能评论

评论列表(0条)

保存