- 多个Goroutine交替执行输出
- 两个协程交替打印1-100的奇偶数
- 使用channel作为信号传递实现
- 使用runtime让协程竞争CPU
- 使用sync的锁竞争实现
- N个线程交替打印0-100
- 没有严格的交替顺序的时候
- 有严格交替顺序的时候
- channel实现线程池
- 参考资料
Go语言的协程goroutine在高并发方面具有优势,最简单的可以写一个简单的生产者消费者模型。
func mode1() {
rand.Seed(time.Now().UnixNano())
wgreceivers := sync.WaitGroup{}
wgreceivers.Add(NumReceiver)
datach := make(chan int)
Max := 100000
NumReceiver := 10
// sender
go func() {
for {
if value := rand.Intn(Max); value == 0 {
close(datach)
return
} else {
datach <- value
}
}
}()
// receiver
for i := 0; i < NumReceiver; i++ {
go func() {
defer wgreceivers.Done()
for value := range datach {
fmt.Println(value)
}
}()
}
wgreceivers.Wait()
}
还有经典的两个线程交替输出的问题,例如:
- 两个线程交替打印 0~100 的奇偶数
- 用两个线程,一个输出字母,一个输出数字,交替输出 1A2B3C4D…26Z
- 通过 N 个线程顺序循环打印从 0 至 100
其实这类就是线程通信相关的问题。Go语言的多线程、并发相关的知识,给我的感觉就是就是用起来容易,控制起来很难,存在很多似是而非的过程。如果想要掌握Go的并发,除了多加练习之外,还需要查看一些优秀的实践(比如channel的优雅关闭、goroutine线程池的实现等等——文末会有相关参考的链接)。
下面会从两个线程交替打印奇偶数开始讲起,会使用channel实现,使用sync的Lock实现,还有使用runtime的Gosched控制CPU实现。进一步拓展到N个线程打印的。
刚学习goroutine的时候可能会这样写:
func withChannel(num int) {
ch := make(chan int)
wg := sync.WaitGroup{}
wg.Add(1)
// sender
go func() {
for i := 1; i <= num; {
fmt.Println("sender:", i)
i++
ch <- i
i++
//time.Sleep(2 * time.Millisecond)
//可以人为制造停顿,努力让结果适配 :)
}
close(ch)
}()
// receiver
go func() {
defer wg.Done()
for i := range ch {
fmt.Println("receiver:", i)
}
}()
wg.Wait()
fmt.Println("stop")
}
// output:
// sender: 1
// sender: 3
// receiver: 2
// receiver: 4
// ...
上面的函数利用channel承载值的传递(并没有作为信号传递)。修改方法可以将channel变为信号传递,控制协程输出的时机:
func withChannel2(num int) {
ch1, ch2 := make(chan struct{}), make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(2)
fmt.Println("start print:")
go func() {
defer wg.Done()
for i := 1; i <= num; i += 2 {
<-ch1
fmt.Println("one: ", i)
ch2 <- struct{}{}
}
<-ch1
// 如果这里没有这一步,会直接死锁
}()
go func() {
defer wg.Done()
// for i := 2; i < num; i += 2 { // 如果在这里没有= ,会直接死锁
for i := 2; i <= num; i += 2 {
<-ch2
fmt.Println("two: ", i)
ch1 <- struct{}{}
}
}()
ch1 <- struct{}{} // 启动的信号
wg.Wait()
fmt.Println("stop the work!")
}
使用runtime让协程竞争CPU
直接使用runtime的Gosched让出CPU,达到线程阻塞。
func withCPU() {
//设置可同时使用的CPU核数为1
var wg sync.WaitGroup
// 定义使用核心的数量的数量
runtime.GOMAXPROCS(1)
wg.Add(2)
go func() {
defer wg.Done()
for i := 1; i < 101; i++ {
//奇数
if i%2 == 1 {
fmt.Println("线程1打印:", i)
}
//让出cpu
runtime.Gosched()
}
}()
go func() {
defer wg.Done()
for i := 1; i < 101; i++ {
// 偶数
if i%2 == 0 {
fmt.Println("线程2打印:", i)
}
// 让出cpu
runtime.Gosched()
}
}()
wg.Wait()
}
使用sync的锁竞争实现
这个部分实现的比较冗杂,因为要显式让线程睡眠,实现交替的效果,使用了sync包的Cond控制条件,signal唤醒线程,这里就存在资源竞争。比较冗余。如果有更好的办法,欢迎和我交流 😃 .
func twoWithmu(num int) {
muc := sync.NewCond(&sync.Mutex{})
// 定义条件cond
// 启动两个工作线程
// 然后需要明确竞争的是一个控制条件control
wg := sync.WaitGroup{}
wg.Add(2)
var control int = 0 // 控制条件一开始没有接入,1会输出奇数,2会输出偶数
var i = 1
go func() {
defer wg.Done()
for i <= num {
muc.L.Lock()
for control != 1 { // 判断情况,选择线程等待,等待的过程中会解锁
muc.Wait()
}
control++
muc.L.Unlock()
fmt.Println("one :", i) // 为了尽快解锁,这个还是放在外面
}
}()
go func() {
defer wg.Done()
for i <= num {
muc.L.Lock()
for control != 2 {
muc.Wait()
}
control--
muc.L.Unlock()
fmt.Println("two :", i)
}
}()
muc.L.Lock()
control++
muc.L.Unlock()
// 控制输出值递进的
for i <= num {
i++
muc.Signal()
// 唤醒睡眠的线程
}
wg.Wait()
fmt.Println("stop")
}
以上的几种解法,channel的信号传递,sync的资源竞争,runtime的CPU竞争。
接下来拓展到N个线程交替执行的。
这个部分,想到一个点,直接用生产者消费者的模式做会比较具有适用性,单纯用channel实现也比较方便。
N个线程交替打印
不知道题目是不是要N个线程按照严格的顺序交替打印,还是只要借助N个线程的并发快速输出,又或者是将范围切分成N份,然后N个线程分别领任务执行。
首先,目前实现的思路就是单纯利用channel,然后使用生产者消费者模型,N个线程作为N个消费者,然后使用一个生产者控制变量循环——单个生产者对应多个消费者。
没有严格的交替顺序的时候// 不在乎调度顺序,只求快速并发到最终的结果--利用生产者消费者模型
// gn: N个线程 num: 0~num
func goWithChanSpawn(gn int, num int) {
datach := make(chan int)
stopch := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(gn)
stop := func() {
close(stopch)
}
// 生产者
go func() {
// wg.Add(1)
for i := 0; i < num; i++ {
datach <- i + 1
}
fmt.Println("sender done !")
stop() // 在发送端关闭chan
}()
// 消费者
for i := 0; i < gn; i++ {
go func(id int) {
defer wg.Done()
for {
select {
case <-stopch:
fmt.Println("id:", id, "stop!")
return
case v := <-datach:
fmt.Println("id:", id, "-", v)
default:
}
}
}(i)
}
wg.Wait()
close(datach)
}
有严格交替顺序的时候
并发场景下的调度比较麻烦,因为执行的不确定性需要多次判断。并且,会因为阻塞的问题,降低效率。这里的代码实现,因为goroutine执行的时候,会传入goroutine的id,通过这个id来控制调度N个线程的输出顺序。这份代码的效率比较低,有改进的建议的,欢迎和我交流 😃.
func gwc2(gn int, num int) {
datach := make(chan int)
closed, closing := make(chan struct{}), make(chan struct{})
signal := make(chan struct{}) // 这个控制是不是需要将输出值+1
wg := sync.WaitGroup{}
wg.Add(gn)
res := 1
// res为输出的变量
// 关闭信号--
stop := func() {
select {
case <-closing:
<-closed
case <-closed:
}
}
// 生产者
go func() {
defer func() {
close(closed)
close(datach)
}()
for res <= num {
select {
case <-closing:
return
case datach <- res:
}
}
fmt.Println("done")
}()
go func() {
for res <= num {
select {
case <-signal:
res++
default:
}
}
stop() // 停止
}()
// 消费者
for i := 0; i < gn; i++ {
go func(id int) {
defer wg.Done()
for v := range datach {
if v%gn == id {// 结合id判断
fmt.Println("id:", id, " -", v)
signal <- struct{}{} // 执行成功了,res就+1
}
}
}(i)
}
wg.Wait()
}
channel实现线程池
使用channel实现线程池,这部分代码来自TonyBai老师(链接见参考资料),代码如下:
package workerpool
import (
"errors"
"fmt"
"sync"
)
var (
ErrNoIdleWorkerInPool = errors.New("no idle worker in pool") // workerpool中任务已满,没有空闲goroutine用于处理新任务
ErrWorkerPoolFreed = errors.New("workerpool freed") // workerpool已终止运行
)
type Pool struct {
capacity int // workerpool大小
active chan struct{}
tasks chan Task
wg sync.WaitGroup
quit chan struct{}
}
type Task func()
const (
defaultCapacity = 100
maxCapacity = 10000
)
func New(capacity int) *Pool {
if capacity <= 0 {
capacity = defaultCapacity
}
if capacity > maxCapacity {
capacity = maxCapacity
}
p := &Pool{
capacity: capacity,
tasks: make(chan Task),
quit: make(chan struct{}),
active: make(chan struct{}, capacity),
}
fmt.Printf("workerpool start\n")
go p.run()
return p
}
func (p *Pool) newWorker(i int) {
p.wg.Add(1)
go func() {
defer func() {
if err := recover(); err != nil {
fmt.Printf("worker[%03d]: recover panic[%s] and exit\n", i, err)
<-p.active
}
p.wg.Done()
}()
fmt.Printf("worker[%03d]: start\n", i)
for {
select {
case <-p.quit:
fmt.Printf("worker[%03d]: exit\n", i)
<-p.active
return
case t := <-p.tasks:
fmt.Printf("worker[%03d]: receive a task\n", i)
t()
}
}
}()
}
func (p *Pool) run() {
idx := 0
for {
select {
case <-p.quit:
return
case p.active <- struct{}{}:
// create a new worker
idx++
p.newWorker(idx)
}
}
}
func (p *Pool) Schedule(t Task) error {
select {
case <-p.quit:
return ErrWorkerPoolFreed
case p.tasks <- t:
return nil
}
}
func (p *Pool) Free() {
close(p.quit) // make sure all worker and p.run exit and schedule return error
p.wg.Wait()
fmt.Printf("workerpool freed\n")
}
参考资料
两个Goroutine交替执行:https://blog.csdn.net/xingyu97/article/details/118116971
多个线程打印的问题:https://zhuanlan.zhihu.com/p/270171164
优雅关闭Go channel:https://juejin.cn/post/6947985246562287647#heading-2
channel实现线程池:https://github.com/bigwhite/publication/tree/master/column/timegeek/go-first-course/35
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)