Go语言中定时器一般用法:
package mainimport( "fmt" "time")func main() { input := make(chan interface{}) //producer - produce the messages go func() { for i := 0; i < 5; i++ { input <- i } input <- "hello,world" }() t1 := time.NewTimer(time.Second * 5) t2 := time.NewTimer(time.Second * 10) for { select { //consumer - consume the messages case msg := <-input: fmt.Println(msg) case <-t1.C: println("5s timer") t1.reset(time.Second * 5) case <-t2.C: println("10s timer") t2.reset(time.Second * 10) } }}上面代码中的这个 C 是啥呢,我们去源码看看,以 timer 为例:
type Timer struct {
C <-chan Time
r runtimeTimer
}
设计我们的定时任务队列当时的需求是这样,需要接收到客户端的请求并产生一个定时任务,会在固定时间执行,可能是一次,也可能是多次,也可能到指定时间自动停止,可能当任务终止的时候,还要能停止掉。
具体的流程如下图所示:
定义结构
type OnceCron struct { tasks []*Task //任务的列队 add chan *Task //当遭遇到新任务的时候 remove chan string //当遭遇到删除任务的时候 stop chan struct{} //当遇到停止信号的时候 Logger *log.Logger //日志}type Job interface { Run() //执行接口}type Task struct { Job Job //要执行的任务 UuID string //任务标识,删除时用 RunTime int64 //执行时间 Spacing int64 //间隔时间 EndTime int64 //结束时间 Number int //总共要次数}队列实现首先,我们要获得一个队列任务
func NewCron() *OnceCron
常规 *** 作,为了节省篇幅,就不写出来,具体可以看源码,贴在了底部。然后,开始定时器队列的运行,一般,都会命名为 Start。那么就有一个问题,我们刚开始启动程序的时候,这个时候是没有任务队列,那岂不是 for{select{}} 在等待个毛毛球?所以,我们需要在 Start 的时候添加一个默认的任务,防止队列退出。
func (one *OnceCron) Start() { //初始化的时候加入一个一年的长定时器,间隔1小时执行一次 task := getTaskWithFuncSpacing(3600,time.Now().Add(time.Hour*24*365).Unix(),func() { log.Println("It's a Hour timer!") }) //为了代码格式 markdown 里面有个括号改成全角了 one.tasks = append(one.tasks,task) go one.run() //协成执行 防止主进程被阻塞}执行部分应该是重点的,分成三部:
首先获得一个最先执行的任务然后产生一个定时器,用于执行任务进行阻塞判断,获取我们要进行的 *** 作
package mainimport ( "time" "log" "github.com/Google/uuID" "os")//compatible old nametype OnceCron struct { *TaskScheduler}//only exec cron timer crontype TaskScheduler struct { tasks []TaskInterface swap []TaskInterface add chan TaskInterface remove chan string stop chan struct{} Logger TaskLogInterface lock bool}type Lock interface { Lock() UnLock()}//return old name with OnceCronfunc NewCron() *OnceCron { return &OnceCron{ TaskScheduler:NewScheduler(),}}//return a Controller Schedulerfunc NewScheduler() *TaskScheduler { return &TaskScheduler{ tasks: make([]TaskInterface,0),swap: make([]TaskInterface,add: make(chan TaskInterface),stop: make(chan struct{}),remove: make(chan string),Logger: log.New(os.Stdout,"[Control]: ",log.Ldate|log.Ltime|log.Lshortfile),lock: false,}}//add spacing time job to List with numberfunc (scheduler *TaskScheduler) AddFuncSpaceNumber(spaceTime int64,number int,f func()) { task := getTaskWithFuncSpacingNumber(spaceTime,number,f) scheduler.addTask(task)}//add spacing time job to List with endTimefunc (scheduler *TaskScheduler) AddFuncSpace(spaceTime int64,endTime int64,f func()) { task := getTaskWithFuncSpacing(spaceTime,endTime,f) scheduler.addTask(task)}//add func to Listfunc (scheduler *TaskScheduler) AddFunc(unixTime int64,f func()) { task := getTaskWithFunc(unixTime,f) scheduler.addTask(task)}func (scheduler *TaskScheduler) AddTaskInterface(task TaskInterface) { scheduler.addTask(task)}//add a task to Listfunc (scheduler *TaskScheduler) AddTask(task *Task) string { if task.RunTime != 0 { if task.RunTime < 100000000000 { task.RunTime = task.RunTime * int64(time.Second) } if task.RunTime < time.Now().UnixNano() { //延遲1秒 task.RunTime = time.Now().UnixNano() + int64(time.Second) } } else { if task.Spacing > 0 { task.RunTime = time.Now().UnixNano() + task.Spacing * int64(time.Second) }else{ scheduler.Logger.Println("error too add task! Runtime error") return "" } } if task.UuID == "" { task.UuID = uuID.New().String() } return scheduler.addTask(task)}//if lock add to swapfunc (scheduler *TaskScheduler) addTask(task TaskInterface) string { if scheduler.lock { scheduler.swap = append(scheduler.swap,task) scheduler.add <- task } else{ scheduler.tasks = append(scheduler.tasks,task) scheduler.add <- task } return task.GetUuID()}//new exportfunc (scheduler *TaskScheduler) ExportInterface() []TaskInterface { return scheduler.tasks}//compatible old export tasksfunc (scheduler *TaskScheduler) Export() []*Task { task := make([]*Task,0) for _,v := range scheduler.tasks { task = append(task,v.(*Task)) } return task}//stop task with uuIDfunc (scheduler *TaskScheduler) StopOnce(uuIDStr string) { scheduler.remove <- uuIDStr}//run Cronfunc (scheduler *TaskScheduler) Start() { //初始化的时候加入一个一年的长定时器,time.Now().Add(time.Hour * 24 * 365).UnixNano(),func() { log.Println("It's a Hour timer!") }) scheduler.tasks = append(scheduler.tasks,task) go scheduler.run()}//stop allfunc (scheduler *TaskScheduler) Stop() { scheduler.stop <- struct{}{}}//run task List//if is empty,run a year timer taskfunc (scheduler *TaskScheduler) run() { for { Now := time.Now() task,key := scheduler.GetTask() runTime := task.GetRunTime() i64 := runTime - Now.UnixNano() var d time.Duration if i64 < 0 { scheduler.tasks[key].SetRuntime(Now.UnixNano()) if task != nil { go task.RunJob() } scheduler.doAndreset(key) continue } else { sec := runTime / int64(time.Second) nsec := runTime % int64(time.Second) d = time.Unix(sec,nsec).Sub(Now) } timer := time.NewTimer(d) //catch a chan and do something for { select { //if time has expired do task and shift key if is task List case <-timer.C: scheduler.doAndreset(key) if task != nil { //fmt.Println(scheduler.tasks[key]) go task.RunJob() timer.Stop() } //if add task case <-scheduler.add: timer.Stop() // remove task with remove uuID case uuIDstr := <-scheduler.remove: scheduler.removeTask(uuIDstr) timer.Stop() //if get a stop single exit case <-scheduler.stop: timer.Stop() return } break } }}//return a task and key In task Listfunc (scheduler *TaskScheduler) GetTask() (task TaskGetInterface,tempKey int) { scheduler.Lock() defer scheduler.UnLock() min := scheduler.tasks[0].GetRunTime() tempKey = 0 for key,task := range scheduler.tasks { tTime := task.GetRunTime() if min <= tTime { continue } if min > tTime { tempKey = key min = tTime continue } } task = scheduler.tasks[tempKey] return task,tempKey}//if add a new task and runtime < Now task runtime// stop Now timer and againfunc (scheduler *TaskScheduler) doAndreset(key int) { scheduler.Lock() defer scheduler.UnLock() //null pointer if key < len(scheduler.tasks) { NowTask := scheduler.tasks[key] scheduler.tasks = append(scheduler.tasks[:key],scheduler.tasks[key+1:]...) if NowTask.GetSpacing() > 0 { tTime := NowTask.GetRunTime() NowTask.SetRuntime(NowTask.GetSpacing() * int64(time.Second) + tTime) number := NowTask.GetRunNumber() if number > 1 { NowTask.SetRunNumber(number - 1) scheduler.tasks = append(scheduler.tasks,NowTask) } else if NowTask.GetEndTime() >= tTime { scheduler.tasks = append(scheduler.tasks,NowTask) } } }}//remove task by uuIDfunc (scheduler *TaskScheduler) removeTask(uuIDStr string) { scheduler.Lock() defer scheduler.UnLock() for key,task := range scheduler.tasks { if task.GetUuID() == uuIDStr { scheduler.tasks = append(scheduler.tasks[:key],scheduler.tasks[key+1:]...) break } }}//lock task []func (scheduler *TaskScheduler) Lock() { scheduler.lock = true}//unlock task []func (scheduler *TaskScheduler) UnLock() { scheduler.lock = false if len(scheduler.swap) > 0 { for _,task := range scheduler.swap { scheduler.tasks = append(scheduler.tasks,task) } scheduler.swap = make([]TaskInterface,0) }}总结
以上是内存溢出为你收集整理的Go语言使用定时器实现任务队列全部内容,希望文章能够帮你解决Go语言使用定时器实现任务队列所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)