Golang同步和异步执行

Golang同步和异步执行,第1张

同步适合多个连续执行的,每一步的执行依赖于上一步 *** 作,异步执行则和任务执行顺序无关(如从10个站点抓取数据)

同步执行类RunnerAsync 支持返回超时检测,系统中断检测

错误常量定义,task/err.go

package task
 
import "errors"
 
 
//超时错误
var ErrTimeout = errors.New("received timeout")
 
// *** 作系统系统中断错误
var ErrInterrupt = errors.New("received interrupt")

实现代码如下,task/runner_async.go

package task
 
import (
    "os"
    "os/signal"
    "time"
)
 
//同步执行任务
type RunnerAsync struct {
    // *** 作系统的信号检测
    interrupt chan os.Signal
 
    //记录执行完成的状态
    complete chan error
 
    //超时检测
    timeout <-chan time.Time
 
    //保存所有要执行的任务,顺序执行
    tasks []func(id int)
}
 
//new一个RunnerAsync对象
func NewRunnerAsync(d time.Duration) *RunnerAsync {
    return &RunnerAsync{
        interrupt: make(chan os.Signal, 1),
        complete:  make(chan error),
        timeout:   time.After(d),
    }
}
 
//添加一个任务
func (this *RunnerAsync) Add(tasks ...func(id int)) {
    this.tasks = append(this.tasks, tasks...)
}
 
//启动RunnerAsync,监听错误信息
func (this *RunnerAsync) Start() error {
 
    //接收操作系统信号
    signal.Notify(this.interrupt, os.Interrupt)
 
    //执行任务
    go func() {
        this.complete <- this.Run()
    }()
 
    select {
    //返回执行结果
    case err := <-this.complete:
        return err
 
        //超时返回
    case <-this.timeout:
        return ErrTimeout
    }
}
 
//顺序执行所有的任务
func (this *RunnerAsync) Run() error {
    for id, task := range this.tasks {
        if this.gotInterrupt() {
            return ErrInterrupt
        }
        //执行任务
        task(id)
    }
    return nil
}
 
//判断是否接收到操作系统中断信号
func (this *RunnerAsync) gotInterrupt() bool {
    select {
    case <-this.interrupt:
        //停止接收别的信号
        signal.Stop(this.interrupt)
        return true
        //正常执行
    default:
        return false
    }
}

使用方法    

Add添加一个任务,任务为接收int类型的一个闭包

Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C *** 作)

测试代码

task/runner_async_test.go

package task
 
import (
    "fmt"
    "os"
    "runtime"
    "testing"
    "time"
)
 
func TestRunnerAsync_Start(t *testing.T) {
 
    //开启多核
    runtime.GOMAXPROCS(runtime.NumCPU())
 
    //创建runner对象,设置超时时间
    runner := NewRunnerAsync(8 * time.Second)
    //添加运行的任务
    runner.Add(
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
    )
 
    fmt.Println("同步执行任务")
 
    //开始执行任务
    if err := runner.Start(); err != nil {
        switch err {
        case ErrTimeout:
            fmt.Println("执行超时")
            os.Exit(1)
        case ErrInterrupt:
            fmt.Println("任务被中断")
            os.Exit(2)
        }
    }
 
    t.Log("执行结束")
 
}
 
//创建要执行的任务
func createTaskAsync() func(id int) {
    return func(id int) {
        fmt.Printf("正在执行%v个任务\n", id)
        //模拟任务执行,sleep两秒
        //time.Sleep(1 * time.Second)
    }
}

执行结果

同步执行任务
正在执行0个任务
正在执行1个任务
正在执行2个任务
正在执行3个任务
正在执行4个任务
正在执行5个任务
正在执行6个任务
正在执行7个任务
正在执行8个任务
正在执行9个任务
正在执行10个任务
正在执行11个任务
正在执行12个任务
异步执行类Runner 支持返回超时检测,系统中断检测

实现代码如下,task/runner.go

package task
 
import (
    "os"
    "time"
    "os/signal"
    "sync"
)
 
//异步执行任务
type Runner struct {
    // *** 作系统的信号检测
    interrupt chan os.Signal
 
    //记录执行完成的状态
    complete chan error
 
    //超时检测
    timeout <-chan time.Time
 
    //保存所有要执行的任务,顺序执行
    tasks []func(id int) error
 
    waitGroup sync.WaitGroup
 
    lock sync.Mutex
 
    errs []error
}
 
//new一个Runner对象
func NewRunner(d time.Duration) *Runner {
    return &Runner{
        interrupt: make(chan os.Signal, 1),
        complete:  make(chan error),
        timeout:   time.After(d),
        waitGroup: sync.WaitGroup{},
        lock:      sync.Mutex{},
    }
}
 
//添加一个任务
func (this *Runner) Add(tasks ...func(id int) error) {
    this.tasks = append(this.tasks, tasks...)
}
 
//启动Runner,监听错误信息
func (this *Runner) Start() error {
 
    //接收操作系统信号
    signal.Notify(this.interrupt, os.Interrupt)
 
    //并发执行任务
    go func() {
        this.complete <- this.Run()
    }()
 
    select {
    //返回执行结果
    case err := <-this.complete:
        return err
        //超时返回
    case <-this.timeout:
        return ErrTimeout
    }
}
 
//异步执行所有的任务
func (this *Runner) Run() error {
    for id, task := range this.tasks {
        if this.gotInterrupt() {
            return ErrInterrupt
        }
 
        this.waitGroup.Add(1)
        go func(id int) {
            this.lock.Lock()
 
            //执行任务
            err := task(id)
            //加锁保存到结果集中
            this.errs = append(this.errs, err)
 
            this.lock.Unlock()
            this.waitGroup.Done()
        }(id)
    }
    this.waitGroup.Wait()
 
    return nil
}
 
//判断是否接收到操作系统中断信号
func (this *Runner) gotInterrupt() bool {
    select {
    case <-this.interrupt:
        //停止接收别的信号
        signal.Stop(this.interrupt)
        return true
        //正常执行
    default:
        return false
    }
}
 
//获取执行完的error
func (this *Runner) GetErrs() []error {
    return this.errs
}

使用方法    

Add添加一个任务,任务为接收int类型,返回类型error的一个闭包

Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C *** 作)

getErrs获取所有的任务执行结果

测试示例代码

task/runner_test.go

package task
 
import (
    "testing"
    "time"
    "fmt"
    "os"
    "runtime"
)
 
func TestRunner_Start(t *testing.T) {
    //开启多核心
    runtime.GOMAXPROCS(runtime.NumCPU())
 
    //创建runner对象,设置超时时间
    runner := NewRunner(18 * time.Second)
    //添加运行的任务
    runner.Add(
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
    )
 
    fmt.Println("异步执行任务")
 
    //开始执行任务
    if err := runner.Start(); err != nil {
        switch err {
        case ErrTimeout:
            fmt.Println("执行超时")
            os.Exit(1)
        case ErrInterrupt:
            fmt.Println("任务被中断")
            os.Exit(2)
        }
    }
 
    t.Log("执行结束")
 
    t.Log(runner.GetErrs())
 
}
 
//创建要执行的任务
func createTask() func(id int) error {
    return func(id int) error {
        fmt.Printf("正在执行%v个任务\n", id)
        //模拟任务执行,sleep
        //time.Sleep(1 * time.Second)
        return nil
    }
}

执行结果

异步执行任务
正在执行2个任务
正在执行1个任务
正在执行4个任务
正在执行3个任务
正在执行6个任务
正在执行5个任务
正在执行9个任务
正在执行7个任务
正在执行10个任务
正在执行13个任务
正在执行8个任务
正在执行11个任务
正在执行12个任务
正在执行0个任务 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/995941.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存