Go 09锁、sync、网络编程

Go 09锁、sync、网络编程,第1张

概述Go9并发之goroutine并发和并行的区别goroutine的启动将要并发执行的任务包装成一个函数,调用函数的时候前面加上go关键字,就能够开启一个goroutine去执行该函数的任务goroutine对应的函数执行完,该goroutine就结束了。程序启动的时候就会自动创建一个goroutine去执行main函数 Go 9并发之goroutine并发和并行的区别goroutine的启动

将要并发执行的任务包装成一个函数,调用函数的时候前面加上go关键字,就能够开启一个goroutine去执行该函数的任务

goroutine对应的函数执行完,该goroutine就结束了。

程序启动的时候就会自动创建一个goroutine去执行main函数

main函数结束了,那么程序也就结束了,由该程序启动的所有其他goroutine也都结束了。

goroutine的本质

goroutine的调度模型:GMP

M:N:把m个goroutine分配给n个 *** 作系统线程

goroutine与 *** 作系统线程(os线程)的区别

goroutine是用户态的线程,比内核态的线程更轻量级一点。初始时只占用2KB的栈空间,可以轻松开启数十万的goroutine也不会崩内存

runtime.GOMAXPROCS

Go1.5之后默认就是 *** 作系统的逻辑核心数,默认跑满cpu

runtime.GOMAXPROCS(1):只占用一个核。多用于日志监控等轻量级程序

wooker pool模式

开启一定数量的goroutine

package mainimport (	"fmt"	"sync"	"time")// worker poolvar wg sync.WaitGroupvar notice = make(chan struct{}, 5)func worker(ID int, jobs <-chan int, results chan<- int) {	defer wg.Done()	for j := range jobs {		fmt.Printf("worker:%d start job:%d\n", ID, j)		time.Sleep(time.Second)		fmt.Printf("worker:%d end job:%d\n", ID, j)		results <- j * 2		notice <- struct{}{} // 通知	}}func main() {	jobs := make(chan int, 100)	results := make(chan int, 100)	// 5个任务	go func() {		for j := 0; j < 5; j++ {			jobs <- j		}		close(jobs)	}()	// 开启3个goroutine	wg.Add(3)	for w := 0; w < 3; w++ {		go worker(w, jobs, results)	}	go func() {		for i := 0; i < 5; i++ {			<-notice		}		close(results)	}()	// 输出结果	for x := range results {		fmt.Println(x)	}	// for a := 1; a < 5; a++ {	// 	<-results	// }}
sync.WaitGroup

var wg sync.WaitGroup

wg.Add(1) :计数器+1wg.Done() :计数器-1wg.Wait() :等channel为什么需要channel

通过channel实现多个goroutine之间的通信

CSP通过通信来共享内存

channel是一种类型,一种引用类型。make函数初始化之后才能使用.(slice,map,channel)

channel的声明:

var ch chan 元素类型

channel的初始化:

ch = make(chan 元素类型, [缓冲区大小])

channel的 *** 作:发送:ch <- 100接收:x:= <- ch关闭:close(ch)带缓冲区的通道和无缓冲区的通道:

快递员送快递的示例,有缓冲区就是有快递柜

for {		x, ok := <-ch // 再取阻塞		if !ok{  // 什么时候ok=false? ch通道被关闭的时候			break		}		fmt.Println(x, ok)		time.Sleep(time.Second)	}for x := range ch {		fmt.Println(x)	}
单向通道:

通常是用作函数的参数,只读通道<- chan和只写通道chan <- int

通道的各种考虑情况:

select多路复用

同一时刻有多个通道要 *** 作的场景下,使用select。

使用select语句能提高代码的可读性。

可处理一个或多个channel的发送/接收 *** 作。如果多个case同时满足,select会随机选择一个。对于没有caseselect{}会一直等待,可用于阻塞main函数。今日内容同步锁互斥锁
package mainimport (	"fmt"	"sync")// 锁var x = 0var wg sync.WaitGroupvar lock sync.Mutexfunc add() {	defer wg.Done()	for i := 0; i < 50000; i++ {		lock.Lock()		x++		lock.Unlock()	}}func main() {	wg.Add(2)	go add()	go add()	wg.Wait()	fmt.Println(x)}
读写互斥锁
package mainimport (	"fmt"	"sync"	"time")// 读写互斥锁rwlockvar lock sync.Mutexvar rwlock sync.RWMutexvar wg sync.WaitGroupvar x = 0func read() {	defer wg.Done()	rwlock.RLock()	fmt.Println(x)	time.Sleep(time.Millisecond)	rwlock.RUnlock()}func write() {	defer wg.Done()	rwlock.RLock()	x++	time.Sleep(5 * time.Millisecond)	rwlock.RUnlock()}func main() {	start := time.Now()	for i := 0; i < 10; i++ {		wg.Add(1)		go write()	}	for i := 0; i < 1000; i++ {		wg.Add(1)		go read()	}	wg.Wait()	fmt.Println(time.Since(start))}
sync包sync.once
package mainimport (	"fmt"	"sync")// sync.oncevar wg sync.WaitGroupvar once sync.Oncefunc f1(ch1 chan<- int) {	defer wg.Done()	for i := 0; i < 100; i++ {		ch1 <- i	}	close(ch1)}func f2(ch1 <-chan int, ch2 chan<- int) {	defer wg.Done()	for {		x, ok := <-ch1		if !ok {			break		}		ch2 <- x * x	}	once.Do(func() { close(ch2) }) // 确保某个 *** 作只执行一次}func main() {	a := make(chan int, 100)	b := make(chan int, 100)	wg.Add(3)	go f1(a)	go f2(a, b)	go f2(a, b)	wg.Wait()	for ret := range b {		fmt.Println(ret)	}}
sync.map

版本1:慢

package mainimport (	"fmt"	"strconv"	"sync")// sync.map// Go内置的map不是并发安全的var (	m    = make(map[string]int)	lock sync.Mutex)func get(key string) int {	return m[key]}func set(key string, value int) {	m[key] = value}func main() {	wg := sync.WaitGroup{}	for i := 0; i < 21; i++ {		wg.Add(1)		go func(n int) {			key := strconv.Itoa(n)			lock.Lock()			set(key, n)			lock.Unlock()			fmt.Printf("k=:%v,v:=%v\n", key, get(key))			wg.Done()		}(i)	}	wg.Wait()}

版本2:快

func main() {	m2 := sync.Map{}	for i := 0; i < 25; i++ {		wg.Add(1)		go func(n int) {			key := strconv.Itoa(n)			m2.Store(key, n)         // 必须使用sync.map内置的store去存值			value, _ := m2.Load(key) // 必须使用sync.map内置的load方法根据key去取值			fmt.Printf("k=:%v,v:=%v\n", key, value)			wg.Done()		}(i)	}	wg.Wait()}
atomic包(原子性)
package mainimport (	"fmt"	"sync"	"sync/atomic")// 原子 *** 作var x int64var wg sync.WaitGroupvar lock sync.Mutex// func add() {// 	defer wg.Done()// 	lock.Lock()// 	x++// 	lock.Unlock()// }func add1() {	atomic.AddInt64(&x, 1)	wg.Done()}func main() {	// wg.Add(100000)	// for i := 0; i < 100000; i++ {	// 	go add1()	// }	// wg.Wait()	// fmt.Println(x)	// 比较并交换	x = 100	ok := atomic.CompareAndSwAPInt64(&x, 100, 300) // x是否等于100,若是则改为300,不是则返回false	fmt.Println(ok, x)}
网络编程server端
package mainimport (	"fmt"	"net"	"strings")// tcp server端func processConn(conn net.Conn) {    defer conn.Close()    // 3.与客户端通信	var tmp [128]byte	for {		n, err := conn.Read(tmp[:])		if err != nil {			fmt.Printf("read Failed, err:%v\n", err)			return		}		msg := string(tmp[:n])		fmt.Println(msg)		reback := strings.toupper(msg)		conn.Write([]byte(reback))	}}func main() {	// 1.本地端口启动服务	Listener, err := net.Listen("tcp", "localhost:9000")	if err != nil {		fmt.Printf("start Failed, err:%v\n", err)		return	}    defer Listener.Close()	// 2.等待别人来连接	for {		conn, err := Listener.Accept()		if err != nil {			fmt.Printf("accept Failed, err:%v\n", err)			return		}		go processConn(conn)	}}

clIEnt端

package mainimport (	"bufio"	"fmt"	"net"	"os")// tcp clIEntfunc main() {	// 1.与server端建立链接	conn, err := net.Dial("tcp", "localhost:9000")	if err != nil {		fmt.Printf("dial Failed, err:%v\n", err)		return	}	// 2.发送数据	// var msg string	var tmp [128]byte	reader := bufio.NewReader(os.Stdin)	for {		fmt.Print("发送到服务端")		// fmt.Scan(&msg)  // 有空格会有点小问题		text, _ := reader.ReadString('\n') // 读到换行		if text == "quit" {			break		}		conn.Write([]byte(text))		n, err := conn.Read(tmp[:])		if err != nil {			fmt.Printf("read Failed, err:%v\n", err)			return		}		fmt.Println(string(tmp[:n]))	}}
TCP粘包

大端和小端

解决粘包问题

// socket_stick/proto/proto.gopackage protoimport (	"bufio"	"bytes"	"enCoding/binary")// Encode 将消息编码func Encode(message string) ([]byte, error) {	// 读取消息的长度,转换成int32类型(占4个字节)	var length = int32(len(message))	var pkg = new(bytes.Buffer)	// 写入消息头	err := binary.Write(pkg, binary.littleEndian, length)	if err != nil {		return nil, err	}	// 写入消息实体	err = binary.Write(pkg, binary.littleEndian, []byte(message))	if err != nil {		return nil, err	}	return pkg.Bytes(), nil}// Decode 解码消息func Decode(reader *bufio.Reader) (string, error) {	// 读取消息的长度	lengthByte, _ := reader.Peek(4) // 读取前4个字节的数据	lengthBuff := bytes.NewBuffer(lengthByte)	var length int32	err := binary.Read(lengthBuff, binary.littleEndian, &length)	if err != nil {		return "", err	}	// Buffered返回缓冲中现有的可读取的字节数。	if int32(reader.Buffered()) < length+4 {		return "", err	}	// 读取真正的消息数据	pack := make([]byte, int(4+length))	_, err = reader.Read(pack)	if err != nil {		return "", err	}	return string(pack[4:]), nil}

server服务端

package mainimport (	"bufio"	"fmt"	"io"	"net"	proto "go.study.com/hina/day01/day09/04nianbao/protocol")// socket_stick/server/main.gofunc process(conn net.Conn) {	defer conn.Close()	reader := bufio.NewReader(conn)	for {		recvStr, err := proto.Decode(reader)		// n, err := reader.Read(buf[:])		if err == io.EOF {			break		}		if err != nil {			fmt.Println("read from clIEnt Failed, err:", err)			break		}		fmt.Println("收到clIEnt发来的数据:", recvStr)	}}func main() {	Listen, err := net.Listen("tcp", "127.0.0.1:30000")	if err != nil {		fmt.Println("Listen Failed, err:", err)		return	}	defer Listen.Close()	for {		conn, err := Listen.Accept()		if err != nil {			fmt.Println("accept Failed, err:", err)			continue		}		go process(conn)	}}

clIEnt客户端

package mainimport (	"fmt"	"net"	proto "go.study.com/hina/day01/day09/04nianbao/protocol")// socket_stick/clIEnt/main.gofunc main() {	conn, err := net.Dial("tcp", "127.0.0.1:30000")	if err != nil {		fmt.Println("dial Failed, err", err)		return	}	defer conn.Close()	for i := 0; i < 20; i++ {		msg := `Hello, Hello. How are you?`		// 调用协议编码数据		b, _ := proto.Encode(msg)		conn.Write([]byte(b))	}}
udp

服务端server

package mainimport (	"fmt"	"net"	"strings")// UDP serverfunc main() {	conn, err := net.ListenUDP("udp", &net.UDPAddr{		IP:   net.IPv4(127, 0, 0, 1),		Port: 9000,	})	if err != nil {		fmt.Printf("conn Failed, err:%v\n", err)		return	}	defer conn.Close()	// 不需要建立链接,直接发数据	var data [1024]byte	for {		n, addr, err := conn.ReadFromUDP(data[:])		if err != nil {			fmt.Printf("read Failed, err:%v\n", err)			return		}		fmt.Println(data[:n])		reply := strings.toupper(string(data[:n]))		// 发送数据		conn.WritetoUDP([]byte(reply), addr)	}}

客户端clIEnt

package mainimport (	"bufio"	"fmt"	"net"	"os")// UDP clIEntfunc main() {	socket, err := net.DialUDP("udp", nil, &net.UDPAddr{		IP:   net.IPv4(127, 0, 0, 1),		Port: 9000,	})	if err != nil {		fmt.Printf("socket Failed, err:%v\n", err)		return	}	defer socket.Close()	var reply [1024]byte	reader := bufio.NewReader(os.Stdin)	for {		fmt.Print("发送到服务端:")		msg, _ := reader.ReadString('\n')		socket.Write([]byte(msg))		// 收回复的数据		n, _, err := socket.ReadFromUDP(reply[:])		if err != nil {			fmt.Printf("read Failed, err:%v\n", err)			return		}		fmt.Println("收到回复信息:", string(reply[:n]))	}}
总结

以上是内存溢出为你收集整理的Go 09锁、sync、网络编程全部内容,希望文章能够帮你解决Go 09锁、sync、网络编程所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1228954.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-05
下一篇 2022-06-05

发表评论

登录后才能评论

评论列表(0条)

保存