注意
:
Go
语言中的goroutine
虽然相对于系统线程来说比较轻量级(初始栈大小仅2KB
),但是在高并发量下的goroutine
频繁创建和销毁对于性能损耗以及GC
来说压力也不小
package main
import (
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
)
var (
// ErrInvalidPoolCap return if pool size <= 0
ErrInvalidPoolCap = errors.New("invalid pool cap")
// ErrPoolAlreadyClosed put task but pool already closed
ErrPoolAlreadyClosed = errors.New("pool already closed")
)
const (
// RUNNING pool is running
RUNNING = 1
// STOPED pool is stoped
STOPED = 0
)
// Task task to-do
type Task struct {
Handler func(v ...interface{})
Params []interface{}
}
// Pool task pool
type Pool struct {
capacity uint64
runningWorkers uint64
state int64
taskC chan *Task
PanicHandler func(interface{})
sync.Mutex
}
// NewPool init pool
func NewPool(capacity uint64) (*Pool, error) {
if capacity <= 0 {
return nil, ErrInvalidPoolCap
}
return &Pool{
capacity: capacity,
state: RUNNING,
taskC: make(chan *Task, capacity),
}, nil
}
// GetCap get capacity
func (p *Pool) GetCap() uint64 {
return p.capacity
}
// GetRunningWorkers get running workers
func (p *Pool) GetRunningWorkers() uint64 {
return atomic.LoadUint64(&p.runningWorkers)
}
func (p *Pool) incRunning() {
atomic.AddUint64(&p.runningWorkers, 1)
}
func (p *Pool) decRunning() {
atomic.AddUint64(&p.runningWorkers, ^uint64(0))
}
// Put put a task to pool
func (p *Pool) Put(task *Task) error {
if p.getState() == STOPED {
return ErrPoolAlreadyClosed
}
// safe run worker
p.Lock()
if p.GetRunningWorkers() < p.GetCap() {
p.run()
}
p.Unlock()
// send task safe
p.Lock()
if p.state == RUNNING {
p.taskC <- task
}
p.Unlock()
return nil
}
func (p *Pool) run() {
p.incRunning()
go func() {
defer func() {
p.decRunning()
if r := recover(); r != nil {
if p.PanicHandler != nil {
p.PanicHandler(r)
} else {
log.Printf("Worker panic: %s\n", r)
}
}
}()
for {
select {
case task, ok := <-p.taskC:
if !ok {
return
}
task.Handler(task.Params...)
}
}
}()
}
func (p *Pool) getState() int64 {
p.Lock()
defer p.Unlock()
return p.state
}
func (p *Pool) setState(state int64) {
p.Lock()
defer p.Unlock()
p.state = state
}
// close safe
func (p *Pool) close() {
p.Lock()
defer p.Unlock()
close(p.taskC)
}
// Close close pool graceful
func (p *Pool) Close() {
if p.getState() == STOPED {
return
}
p.setState(STOPED) // stop put task
for len(p.taskC) > 0 { // wait all task be consumed
time.Sleep(1e6) // reduce CPU load
}
p.close()
}
//https://github.com/wazsmwazsm/mortar
func main() {
// 创建容量为 10 的任务池
pool, err := NewPool(10)
if err != nil {
panic(err)
}
wg := new(sync.WaitGroup)
for i := 0; i < 1000; i++ {
wg.Add(1)
// 创建任务
task := &Task{
Handler: func(v ...interface{}) {
wg.Done()
fmt.Println(v)
},
}
// 添加任务函数的参数
task.Params = []interface{}{i, i * 2, "hello"}
// 将任务放入任务池
pool.Put(task)
}
wg.Add(1)
// 再创建一个任务
pool.Put(&Task{
Handler: func(v ...interface{}) {
wg.Done()
fmt.Println(v)
},
Params: []interface{}{"hi!"}, // 也可以在创建任务时设置参数
})
wg.Wait()
// 安全关闭任务池(保证已加入池中的任务被消费完)
pool.Close()
// 如果任务池已经关闭, Put() 方法会返回 ErrPoolAlreadyClosed 错误
err = pool.Put(&Task{
Handler: func(v ...interface{}) {},
})
if err != nil {
fmt.Println(err) // print: pool already closed
}
}
package main
import (
"fmt"
"time"
)
// 任务的属性应该是一个业务函数
type Task struct {
f func() error // 函数名f, 无参,返回值为error
}
// 创建Task任务
func NewTask(arg_f func() error) *Task {
task := Task{
f: arg_f,
}
return &task
}
// Task绑定业务方法
func (task *Task) Execute() {
task.f() // 调用任务中已经绑定好的业务方法
}
// ------------------------------------------------
type Pool struct {
EntryChannel chan *Task // 对外的Task入口
JobsChannel chan *Task // 内部的Task队列
workerNum int // 协程池中最大的woker数量
}
// 创建Pool
func NewPool(cap int) *Pool {
pool := Pool{
EntryChannel: make(chan *Task),
JobsChannel: make(chan *Task),
workerNum: cap,
}
return &pool
}
// Pool绑定干活的方法
func (pool *Pool) worker(workID int) {
// worker工作 : 永久从JobsChannel取任务 然后执行任务
for task := range pool.JobsChannel {
task.Execute()
fmt.Println("work ID ", workID, " has executed")
}
}
// Pool绑定协程池工作方法
func (pool *Pool) run() {
// 定义worker数量
for i := 0; i < pool.workerNum; i++ {
go pool.worker(i)
}
// 从EntryChannel去任务,发送给JobsChannel
for task := range pool.EntryChannel {
pool.JobsChannel <- task // 添加task优先级排序逻辑
}
}
// ------------------------------------------------
func main() {
// 创建一些任务
task := NewTask(func() error { // 匿名函数
fmt.Println(time.Now())
return nil
})
// 创建协程池
pool := NewPool(4)
// 创建多任务,抛给协程池
go func() { // 开启新的协程,防止阻塞
for {
pool.EntryChannel <- task
}
}()
// 启动协程池
pool.run()
}
package main
import (
"io/ioutil"
"net/http"
"runtime"
"github.com/Jeffail/tunny"
)
func main() {
numCPUs := runtime.NumCPU()
pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
var result []byte
// TODO: Something CPU heavy with payload
return result
})
defer pool.Close()
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
input, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Internal error", http.StatusInternalServerError)
}
defer r.Body.Close()
// Funnel this work into our pool. This call is synchronous and will
// block until the job is completed.
result := pool.Process(input)
w.Write(result.([]byte))
})
http.ListenAndServe(":8080", nil)
}
package main
import (
"fmt"
"sync"
"time"
)
// 每个协程都会运行该函数。
// 注意,WaitGroup 必须通过指针传递给函数。
func worker(id int, wg *sync.WaitGroup) {
fmt.Printf("Worker %d starting\n", id)
// 睡眠一秒钟,以此来模拟耗时的任务。
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
// 通知 WaitGroup ,当前协程的工作已经完成。
wg.Done()
}
func main() {
// 这个 WaitGroup 被用于等待该函数开启的所有协程。
var wg sync.WaitGroup
// 开启几个协程,并为其递增 WaitGroup 的计数器。
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, &wg)
}
// 阻塞,直到 WaitGroup 计数器恢复为 0,即所有协程的工作都已经完成。
wg.Wait()
}
总结
:
1、限制并发的goroutine
数量;
2、复用goroutine
,减轻runtime
调度压力,提升程序性能;
3、规避过多的goroutine
侵占系统资源(CPU&内存
)。
线程包含
:内核级线程模型、系统级线程模型、用户级线程模型、混合型线程模型。
package main
import (
"fmt"
"time"
)
// 这里是 worker,我们将并发执行多个 worker。
// worker 将从 `jobs` 通道接收任务,并且通过 `results` 发送对应的结果。
// 我们将让每个任务间隔 1s 来模仿一个耗时的任务。
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "processing job", j)
time.Sleep(time.Second)
results <- j * 2
}
}
func main() {
// 为了使用 worker 线程池并且收集他们的结果,我们需要 2 个通道。
jobs := make(chan int, 100)
results := make(chan int, 100)
// 这里启动了 3 个 worker,初始是阻塞的,因为还没有传递任务。
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 这里我们发送 9 个 `jobs`,然后 `close` 这些通道
// 来表示这些就是所有的任务了。
for j := 1; j <= 9; j++ {
jobs <- j
}
close(jobs)
// 最后,我们收集所有这些任务的返回值。
for a := 1; a <= 9; a++ {
<-results
}
}
三、go连接池
package main
import (
"context"
"fmt"
"strconv"
"sync/atomic"
pool "github.com/jolestar/go-commons-pool"
)
func Example_simple() {
type myPoolObject struct {
s string
}
v := uint64(0)
factory := pool.NewPooledObjectFactorySimple(
func(context.Context) (interface{}, error) {
return &myPoolObject{
s: strconv.FormatUint(atomic.AddUint64(&v, 1), 10),
},
nil
})
ctx := context.Background()
p := pool.NewObjectPoolWithDefaultConfig(ctx, factory)
obj, err := p.BorrowObject(ctx)
if err != nil {
panic(err)
}
o := obj.(*myPoolObject)
fmt.Println(o.s)
err = p.ReturnObject(ctx, obj)
if err != nil {
panic(err)
}
// Output: 1
}
参考:github.com/jolestar/go-commons-pool
参考:github.com/Jeffail/tunny
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)