将要并发执行的任务包装成一个函数,调用函数的时候前面加上go
关键字,就能够开启一个goroutine去执行该函数的任务
goroutine对应的函数执行完,该goroutine就结束了。
程序启动的时候就会自动创建一个goroutine去执行main函数
main函数结束了,那么程序也就结束了,由该程序启动的所有其他goroutine也都结束了。
goroutine的本质goroutine的调度模型:GMP
M:N
:把m个goroutine分配给n个 *** 作系统线程
goroutine是用户态的线程,比内核态的线程更轻量级一点。初始时只占用2KB的栈空间,可以轻松开启数十万的goroutine也不会崩内存
runtime.GOMAXPROCSGo1.5之后默认就是 *** 作系统的逻辑核心数,默认跑满cpu
runtime.GOMAXPROCS(1)
:只占用一个核。多用于日志监控等轻量级程序
开启一定数量的goroutine
package mainimport ( "fmt" "sync" "time")// worker poolvar wg sync.WaitGroupvar notice = make(chan struct{}, 5)func worker(ID int, jobs <-chan int, results chan<- int) { defer wg.Done() for j := range jobs { fmt.Printf("worker:%d start job:%d\n", ID, j) time.Sleep(time.Second) fmt.Printf("worker:%d end job:%d\n", ID, j) results <- j * 2 notice <- struct{}{} // 通知 }}func main() { jobs := make(chan int, 100) results := make(chan int, 100) // 5个任务 go func() { for j := 0; j < 5; j++ { jobs <- j } close(jobs) }() // 开启3个goroutine wg.Add(3) for w := 0; w < 3; w++ { go worker(w, jobs, results) } go func() { for i := 0; i < 5; i++ { <-notice } close(results) }() // 输出结果 for x := range results { fmt.Println(x) } // for a := 1; a < 5; a++ { // <-results // }}
sync.WaitGroupvar wg sync.WaitGroup
通过channel实现多个goroutine之间的通信
CSP
通过通信来共享内存
channel是一种类型,一种引用类型。make函数初始化之后才能使用.(slice,map,channel)
channel的声明:var ch chan 元素类型
ch = make(chan 元素类型, [缓冲区大小])
ch <- 100
接收:x:= <- ch
关闭:close(ch)
带缓冲区的通道和无缓冲区的通道:快递员送快递的示例,有缓冲区就是有快递柜
for { x, ok := <-ch // 再取阻塞 if !ok{ // 什么时候ok=false? ch通道被关闭的时候 break } fmt.Println(x, ok) time.Sleep(time.Second) }for x := range ch { fmt.Println(x) }
单向通道:通常是用作函数的参数,只读通道<- chan
和只写通道chan <- int
同一时刻有多个通道要 *** 作的场景下,使用select。
使用select
语句能提高代码的可读性。
case
同时满足,select
会随机选择一个。对于没有case
的select{}
会一直等待,可用于阻塞main函数。今日内容同步锁互斥锁package mainimport ( "fmt" "sync")// 锁var x = 0var wg sync.WaitGroupvar lock sync.Mutexfunc add() { defer wg.Done() for i := 0; i < 50000; i++ { lock.Lock() x++ lock.Unlock() }}func main() { wg.Add(2) go add() go add() wg.Wait() fmt.Println(x)}
读写互斥锁package mainimport ( "fmt" "sync" "time")// 读写互斥锁rwlockvar lock sync.Mutexvar rwlock sync.RWMutexvar wg sync.WaitGroupvar x = 0func read() { defer wg.Done() rwlock.RLock() fmt.Println(x) time.Sleep(time.Millisecond) rwlock.RUnlock()}func write() { defer wg.Done() rwlock.RLock() x++ time.Sleep(5 * time.Millisecond) rwlock.RUnlock()}func main() { start := time.Now() for i := 0; i < 10; i++ { wg.Add(1) go write() } for i := 0; i < 1000; i++ { wg.Add(1) go read() } wg.Wait() fmt.Println(time.Since(start))}
sync包sync.oncepackage mainimport ( "fmt" "sync")// sync.oncevar wg sync.WaitGroupvar once sync.Oncefunc f1(ch1 chan<- int) { defer wg.Done() for i := 0; i < 100; i++ { ch1 <- i } close(ch1)}func f2(ch1 <-chan int, ch2 chan<- int) { defer wg.Done() for { x, ok := <-ch1 if !ok { break } ch2 <- x * x } once.Do(func() { close(ch2) }) // 确保某个 *** 作只执行一次}func main() { a := make(chan int, 100) b := make(chan int, 100) wg.Add(3) go f1(a) go f2(a, b) go f2(a, b) wg.Wait() for ret := range b { fmt.Println(ret) }}
sync.map版本1:慢
package mainimport ( "fmt" "strconv" "sync")// sync.map// Go内置的map不是并发安全的var ( m = make(map[string]int) lock sync.Mutex)func get(key string) int { return m[key]}func set(key string, value int) { m[key] = value}func main() { wg := sync.WaitGroup{} for i := 0; i < 21; i++ { wg.Add(1) go func(n int) { key := strconv.Itoa(n) lock.Lock() set(key, n) lock.Unlock() fmt.Printf("k=:%v,v:=%v\n", key, get(key)) wg.Done() }(i) } wg.Wait()}
版本2:快
func main() { m2 := sync.Map{} for i := 0; i < 25; i++ { wg.Add(1) go func(n int) { key := strconv.Itoa(n) m2.Store(key, n) // 必须使用sync.map内置的store去存值 value, _ := m2.Load(key) // 必须使用sync.map内置的load方法根据key去取值 fmt.Printf("k=:%v,v:=%v\n", key, value) wg.Done() }(i) } wg.Wait()}
atomic包(原子性)package mainimport ( "fmt" "sync" "sync/atomic")// 原子 *** 作var x int64var wg sync.WaitGroupvar lock sync.Mutex// func add() {// defer wg.Done()// lock.Lock()// x++// lock.Unlock()// }func add1() { atomic.AddInt64(&x, 1) wg.Done()}func main() { // wg.Add(100000) // for i := 0; i < 100000; i++ { // go add1() // } // wg.Wait() // fmt.Println(x) // 比较并交换 x = 100 ok := atomic.CompareAndSwAPInt64(&x, 100, 300) // x是否等于100,若是则改为300,不是则返回false fmt.Println(ok, x)}
网络编程server端package mainimport ( "fmt" "net" "strings")// tcp server端func processConn(conn net.Conn) { defer conn.Close() // 3.与客户端通信 var tmp [128]byte for { n, err := conn.Read(tmp[:]) if err != nil { fmt.Printf("read Failed, err:%v\n", err) return } msg := string(tmp[:n]) fmt.Println(msg) reback := strings.toupper(msg) conn.Write([]byte(reback)) }}func main() { // 1.本地端口启动服务 Listener, err := net.Listen("tcp", "localhost:9000") if err != nil { fmt.Printf("start Failed, err:%v\n", err) return } defer Listener.Close() // 2.等待别人来连接 for { conn, err := Listener.Accept() if err != nil { fmt.Printf("accept Failed, err:%v\n", err) return } go processConn(conn) }}
clIEnt端
package mainimport ( "bufio" "fmt" "net" "os")// tcp clIEntfunc main() { // 1.与server端建立链接 conn, err := net.Dial("tcp", "localhost:9000") if err != nil { fmt.Printf("dial Failed, err:%v\n", err) return } // 2.发送数据 // var msg string var tmp [128]byte reader := bufio.NewReader(os.Stdin) for { fmt.Print("发送到服务端") // fmt.Scan(&msg) // 有空格会有点小问题 text, _ := reader.ReadString('\n') // 读到换行 if text == "quit" { break } conn.Write([]byte(text)) n, err := conn.Read(tmp[:]) if err != nil { fmt.Printf("read Failed, err:%v\n", err) return } fmt.Println(string(tmp[:n])) }}
TCP粘包大端和小端
解决粘包问题
// socket_stick/proto/proto.gopackage protoimport ( "bufio" "bytes" "enCoding/binary")// Encode 将消息编码func Encode(message string) ([]byte, error) { // 读取消息的长度,转换成int32类型(占4个字节) var length = int32(len(message)) var pkg = new(bytes.Buffer) // 写入消息头 err := binary.Write(pkg, binary.littleEndian, length) if err != nil { return nil, err } // 写入消息实体 err = binary.Write(pkg, binary.littleEndian, []byte(message)) if err != nil { return nil, err } return pkg.Bytes(), nil}// Decode 解码消息func Decode(reader *bufio.Reader) (string, error) { // 读取消息的长度 lengthByte, _ := reader.Peek(4) // 读取前4个字节的数据 lengthBuff := bytes.NewBuffer(lengthByte) var length int32 err := binary.Read(lengthBuff, binary.littleEndian, &length) if err != nil { return "", err } // Buffered返回缓冲中现有的可读取的字节数。 if int32(reader.Buffered()) < length+4 { return "", err } // 读取真正的消息数据 pack := make([]byte, int(4+length)) _, err = reader.Read(pack) if err != nil { return "", err } return string(pack[4:]), nil}
server服务端
package mainimport ( "bufio" "fmt" "io" "net" proto "go.study.com/hina/day01/day09/04nianbao/protocol")// socket_stick/server/main.gofunc process(conn net.Conn) { defer conn.Close() reader := bufio.NewReader(conn) for { recvStr, err := proto.Decode(reader) // n, err := reader.Read(buf[:]) if err == io.EOF { break } if err != nil { fmt.Println("read from clIEnt Failed, err:", err) break } fmt.Println("收到clIEnt发来的数据:", recvStr) }}func main() { Listen, err := net.Listen("tcp", "127.0.0.1:30000") if err != nil { fmt.Println("Listen Failed, err:", err) return } defer Listen.Close() for { conn, err := Listen.Accept() if err != nil { fmt.Println("accept Failed, err:", err) continue } go process(conn) }}
clIEnt客户端
package mainimport ( "fmt" "net" proto "go.study.com/hina/day01/day09/04nianbao/protocol")// socket_stick/clIEnt/main.gofunc main() { conn, err := net.Dial("tcp", "127.0.0.1:30000") if err != nil { fmt.Println("dial Failed, err", err) return } defer conn.Close() for i := 0; i < 20; i++ { msg := `Hello, Hello. How are you?` // 调用协议编码数据 b, _ := proto.Encode(msg) conn.Write([]byte(b)) }}
udp服务端server
package mainimport ( "fmt" "net" "strings")// UDP serverfunc main() { conn, err := net.ListenUDP("udp", &net.UDPAddr{ IP: net.IPv4(127, 0, 0, 1), Port: 9000, }) if err != nil { fmt.Printf("conn Failed, err:%v\n", err) return } defer conn.Close() // 不需要建立链接,直接发数据 var data [1024]byte for { n, addr, err := conn.ReadFromUDP(data[:]) if err != nil { fmt.Printf("read Failed, err:%v\n", err) return } fmt.Println(data[:n]) reply := strings.toupper(string(data[:n])) // 发送数据 conn.WritetoUDP([]byte(reply), addr) }}
客户端clIEnt
package mainimport ( "bufio" "fmt" "net" "os")// UDP clIEntfunc main() { socket, err := net.DialUDP("udp", nil, &net.UDPAddr{ IP: net.IPv4(127, 0, 0, 1), Port: 9000, }) if err != nil { fmt.Printf("socket Failed, err:%v\n", err) return } defer socket.Close() var reply [1024]byte reader := bufio.NewReader(os.Stdin) for { fmt.Print("发送到服务端:") msg, _ := reader.ReadString('\n') socket.Write([]byte(msg)) // 收回复的数据 n, _, err := socket.ReadFromUDP(reply[:]) if err != nil { fmt.Printf("read Failed, err:%v\n", err) return } fmt.Println("收到回复信息:", string(reply[:n])) }}
总结 以上是内存溢出为你收集整理的Go 09锁、sync、网络编程全部内容,希望文章能够帮你解决Go 09锁、sync、网络编程所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)