在golang中,经常使用协程做高并发,本文列举了几种常见并发模型。
package mainimport ( "fmt" "math/rand" "os" "runtime" "sync" "sync/atomic" "time")type Scenario struct { @R_502_6889@ string Description []string Examples []string RunExample func()}var s1 = &Scenario{ @R_502_6889@: "s1",Description: []string{ "简单并发执行任务",},Examples: []string{ "比如并发的请求后端某个接口",RunExample: RunScenario1,}var s2 = &Scenario{ @R_502_6889@: "s2",Description: []string{ "持续一定时间的高并发模型",Examples: []string{ "在规定时间内,持续的高并发请求后端服务, 防止服务死循环",RunExample: RunScenario2,}var s3 = &Scenario{ @R_502_6889@: "s3",Description: []string{ "基于大数据量的并发任务模型,goroutine worker pool",Examples: []string{ "比如技术支持要给某个客户删除几个TB/GB的文件",RunExample: RunScenario3,}var s4 = &Scenario{ @R_502_6889@: "s4",Description: []string{ "等待异步任务执行结果(goroutine+select+channel)",Examples: []string{ "",RunExample: RunScenario4,}var s5 = &Scenario{ @R_502_6889@: "s5",Description: []string{ "定时的反馈结果(Ticker)",Examples: []string{ "比如测试上传接口的性能,要实时给出指标: 吞吐率,IOPS,成功率等",RunExample: RunScenario5,}var Scenarios []*Scenariofunc init() { Scenarios = append(Scenarios,s1) Scenarios = append(Scenarios,s2) Scenarios = append(Scenarios,s3) Scenarios = append(Scenarios,s4) Scenarios = append(Scenarios,s5)}// 常用的并发与同步场景func main() { if len(os.Args) == 1 { fmt.Println("请选择使用场景 ==> ") for _,sc := range Scenarios { fmt.Printf("场景: %s,",sc.@R_502_6889@) printDescription(sc.Description) } return } for _,arg := range os.Args[1:] { sc := matchScenario(arg) if sc != nil { printDescription(sc.Description) printExamples(sc.Examples) sc.RunExample() } }}func printDescription(str []string) { fmt.Printf("场景描述: %s \n",str)}func printExamples(str []string) { fmt.Printf("场景举例: %s \n",str)}func matchScenario(@R_502_6889@ string) *Scenario { for _,sc := range Scenarios { if sc.@R_502_6889@ == @R_502_6889@ { return sc } } return nil}var doSomething = func(i int) string { time.Sleep(time.Millisecond * time.Duration(10)) fmt.Printf("Goroutine %d do things .... \n",i) return fmt.Sprintf("Goroutine %d",i)}var takeSomthing = func(res string) string { time.Sleep(time.Millisecond * time.Duration(10)) tmp := fmt.Sprintf("Take result from %s.... \n",res) fmt.Println(tmp) return tmp}// 场景1: 简单并发任务func RunScenario1() { count := 10 var wg sync.WaitGroup for i := 0; i < count; i++ { wg.Add(1) go func(index int) { defer wg.Done() doSomething(index) }(i) } wg.Wait()}// 场景2: 按时间来持续并发func RunScenario2() { timeout := time.Now().Add(time.Second * time.Duration(10)) n := runtime.Numcpu() waitForAll := make(chan struct{}) done := make(chan struct{}) concurrentCount := make(chan struct{},n) for i := 0; i < n; i++ { concurrentCount <- struct{}{} } go func() { for time.Now().Before(timeout) { <-done concurrentCount <- struct{}{} } waitForAll <- struct{}{} }() go func() { for { <-concurrentCount go func() { doSomething(rand.Intn(n)) done <- struct{}{} }() } }() <-waitForAll}// 场景3:以 worker pool 方式 并发做事/发送请求func RunScenario3() { numOfConcurrency := runtime.Numcpu() taskTool := 10 jobs := make(chan int,taskTool) results := make(chan int,taskTool) var wg sync.WaitGroup // workExample workExampleFunc := func(ID int,jobs <-chan int,results chan<- int,wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { res := job * 2 fmt.Printf("Worker %d do things,produce result %d \n",ID,res) time.Sleep(time.Millisecond * time.Duration(100)) results <- res } } for i := 0; i < numOfConcurrency; i++ { wg.Add(1) go workExampleFunc(i,jobs,results,&wg) } totalTasks := 100 wg.Add(1) go func() { defer wg.Done() for i := 0; i < totalTasks; i++ { n := <-results fmt.Printf("Got results %d \n",n) } close(results) }() for i := 0; i < totalTasks; i++ { jobs <- i } close(jobs) wg.Wait()}// 场景4: 等待异步任务执行结果(goroutine+select+channel)func RunScenario4() { sth := make(chan string) result := make(chan string) go func() { ID := rand.Intn(100) for { sth <- doSomething(ID) } }() go func() { for { result <- takeSomthing(<-sth) } }() select { case c := <-result: fmt.Printf("Got result %s ",c) case <-time.After(time.Duration(30 * time.Second)): fmt.Errorf("指定时间内都没有得到结果") }}var doUploadMock = func() bool { time.Sleep(time.Millisecond * time.Duration(100)) n := rand.Intn(100) if n > 50 { return true } else { return false }}// 场景5: 定时的反馈结果(Ticker)// 测试上传接口的性能,要实时给出指标: 吞吐率,成功率等func RunScenario5() { totalSize := int64(0) totalCount := int64(0) totalErr := int64(0) concurrencyCount := runtime.Numcpu() stop := make(chan struct{}) fileSizeExample := int64(10) timeout := 10 // seconds to stop go func() { for i := 0; i < concurrencyCount; i++ { go func(index int) { for { select { case <-stop: return default: break } res := doUploadMock() if res { atomic.AddInt64(&totalCount,1) atomic.AddInt64(&totalSize,fileSizeExample) } else { atomic.AddInt64(&totalErr,1) } } }(i) } }() t := time.NewTicker(time.Second) index := 0 for { select { case <-t.C: index++ tmpCount := atomic.LoadInt64(&totalCount) tmpSize := atomic.LoadInt64(&totalSize) tmpErr := atomic.LoadInt64(&totalErr) fmt.Printf("吞吐率: %d,成功率: %d \n",tmpSize/int64(index),tmpCount*100/(tmpCount+tmpErr)) if index > timeout { t.Stop() close(stop) return } } }}总结
以上是内存溢出为你收集整理的golang常见的几种并发模型框架全部内容,希望文章能够帮你解决golang常见的几种并发模型框架所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)