golang常见的几种并发模型框架

golang常见的几种并发模型框架,第1张

概述在golang中,经常使用协程做高并发,本文列举了几种常见并发模型。 package mainimport ( "fmt" "math/rand" "os" "runtime" "sync" "sync/atomic" "time")type Scenario struct { Name string Description []string Examp

在golang中,经常使用协程做高并发,本文列举了几种常见并发模型。

package mainimport (	"fmt"	"math/rand"	"os"	"runtime"	"sync"	"sync/atomic"	"time")type Scenario struct {	@R_502_6889@        string	Description []string	Examples    []string	RunExample  func()}var s1 = &Scenario{	@R_502_6889@: "s1",Description: []string{		"简单并发执行任务",},Examples: []string{		"比如并发的请求后端某个接口",RunExample: RunScenario1,}var s2 = &Scenario{	@R_502_6889@: "s2",Description: []string{		"持续一定时间的高并发模型",Examples: []string{		"在规定时间内,持续的高并发请求后端服务, 防止服务死循环",RunExample: RunScenario2,}var s3 = &Scenario{	@R_502_6889@: "s3",Description: []string{		"基于大数据量的并发任务模型,goroutine worker pool",Examples: []string{		"比如技术支持要给某个客户删除几个TB/GB的文件",RunExample: RunScenario3,}var s4 = &Scenario{	@R_502_6889@: "s4",Description: []string{		"等待异步任务执行结果(goroutine+select+channel)",Examples: []string{		"",RunExample: RunScenario4,}var s5 = &Scenario{	@R_502_6889@: "s5",Description: []string{		"定时的反馈结果(Ticker)",Examples: []string{		"比如测试上传接口的性能,要实时给出指标: 吞吐率,IOPS,成功率等",RunExample: RunScenario5,}var Scenarios []*Scenariofunc init() {	Scenarios = append(Scenarios,s1)	Scenarios = append(Scenarios,s2)	Scenarios = append(Scenarios,s3)	Scenarios = append(Scenarios,s4)	Scenarios = append(Scenarios,s5)}// 常用的并发与同步场景func main() {	if len(os.Args) == 1 {		fmt.Println("请选择使用场景 ==> ")		for _,sc := range Scenarios {			fmt.Printf("场景: %s,",sc.@R_502_6889@)			printDescription(sc.Description)		}		return	}	for _,arg := range os.Args[1:] {		sc := matchScenario(arg)		if sc != nil {			printDescription(sc.Description)			printExamples(sc.Examples)			sc.RunExample()		}	}}func printDescription(str []string) {	fmt.Printf("场景描述: %s \n",str)}func printExamples(str []string) {	fmt.Printf("场景举例: %s \n",str)}func matchScenario(@R_502_6889@ string) *Scenario {	for _,sc := range Scenarios {		if sc.@R_502_6889@ == @R_502_6889@ {			return sc		}	}	return nil}var doSomething = func(i int) string {	time.Sleep(time.Millisecond * time.Duration(10))	fmt.Printf("Goroutine %d do things .... \n",i)	return fmt.Sprintf("Goroutine %d",i)}var takeSomthing = func(res string) string {	time.Sleep(time.Millisecond * time.Duration(10))	tmp := fmt.Sprintf("Take result from %s.... \n",res)	fmt.Println(tmp)	return tmp}// 场景1: 简单并发任务func RunScenario1() {	count := 10	var wg sync.WaitGroup	for i := 0; i < count; i++ {		wg.Add(1)		go func(index int) {			defer wg.Done()			doSomething(index)		}(i)	}	wg.Wait()}// 场景2: 按时间来持续并发func RunScenario2() {	timeout := time.Now().Add(time.Second * time.Duration(10))	n := runtime.Numcpu()	waitForAll := make(chan struct{})	done := make(chan struct{})	concurrentCount := make(chan struct{},n)	for i := 0; i < n; i++ {		concurrentCount <- struct{}{}	}	go func() {		for time.Now().Before(timeout) {			<-done			concurrentCount <- struct{}{}		}		waitForAll <- struct{}{}	}()	go func() {		for {			<-concurrentCount			go func() {				doSomething(rand.Intn(n))				done <- struct{}{}			}()		}	}()	<-waitForAll}// 场景3:以 worker pool 方式 并发做事/发送请求func RunScenario3() {	numOfConcurrency := runtime.Numcpu()	taskTool := 10	jobs := make(chan int,taskTool)	results := make(chan int,taskTool)	var wg sync.WaitGroup	// workExample	workExampleFunc := func(ID int,jobs <-chan int,results chan<- int,wg *sync.WaitGroup) {		defer wg.Done()		for job := range jobs {			res := job * 2			fmt.Printf("Worker %d do things,produce result %d \n",ID,res)			time.Sleep(time.Millisecond * time.Duration(100))			results <- res		}	}	for i := 0; i < numOfConcurrency; i++ {		wg.Add(1)		go workExampleFunc(i,jobs,results,&wg)	}	totalTasks := 100	wg.Add(1)	go func() {		defer wg.Done()		for i := 0; i < totalTasks; i++ {			n := <-results			fmt.Printf("Got results %d \n",n)		}		close(results)	}()	for i := 0; i < totalTasks; i++ {		jobs <- i	}	close(jobs)	wg.Wait()}// 场景4: 等待异步任务执行结果(goroutine+select+channel)func RunScenario4() {	sth := make(chan string)	result := make(chan string)	go func() {		ID := rand.Intn(100)		for {			sth <- doSomething(ID)		}	}()	go func() {		for {			result <- takeSomthing(<-sth)		}	}()	select {	case c := <-result:		fmt.Printf("Got result %s ",c)	case <-time.After(time.Duration(30 * time.Second)):		fmt.Errorf("指定时间内都没有得到结果")	}}var doUploadMock = func() bool {	time.Sleep(time.Millisecond * time.Duration(100))	n := rand.Intn(100)	if n > 50 {		return true	} else {		return false	}}// 场景5: 定时的反馈结果(Ticker)// 测试上传接口的性能,要实时给出指标: 吞吐率,成功率等func RunScenario5() {	totalSize := int64(0)	totalCount := int64(0)	totalErr := int64(0)	concurrencyCount := runtime.Numcpu()	stop := make(chan struct{})	fileSizeExample := int64(10)	timeout := 10 // seconds to stop	go func() {		for i := 0; i < concurrencyCount; i++ {			go func(index int) {				for {					select {					case <-stop:						return					default:						break					}					res := doUploadMock()					if res {						atomic.AddInt64(&totalCount,1)						atomic.AddInt64(&totalSize,fileSizeExample)					} else {						atomic.AddInt64(&totalErr,1)					}				}			}(i)		}	}()	t := time.NewTicker(time.Second)	index := 0	for {		select {		case <-t.C:			index++			tmpCount := atomic.LoadInt64(&totalCount)			tmpSize := atomic.LoadInt64(&totalSize)			tmpErr := atomic.LoadInt64(&totalErr)			fmt.Printf("吞吐率: %d,成功率: %d \n",tmpSize/int64(index),tmpCount*100/(tmpCount+tmpErr))			if index > timeout {				t.Stop()				close(stop)				return			}		}	}}
总结

以上是内存溢出为你收集整理的golang常见的几种并发模型框架全部内容,希望文章能够帮你解决golang常见的几种并发模型框架所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1275535.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-09
下一篇 2022-06-09

发表评论

登录后才能评论

评论列表(0条)

保存