package main// golang实现带有心跳检测的tcp长连接// serverimport ( "fmt" "net" "time")// message struct:// c#dvar ( Req_REGISTER byte = 1 // 1 --- c register cID Res_REGISTER byte = 2 // 2 --- s response Req_HEARTBEAT byte = 3 // 3 --- s send heartbeat req Res_HEARTBEAT byte = 4 // 4 --- c send heartbeat res Req byte = 5 // 5 --- cs send data Res byte = 6 // 6 --- cs send ack)type CS struct { Rch chan []byte Wch chan []byte Dch chan bool u string}func NewCs(uID string) *CS { return &CS{Rch: make(chan []byte),Wch: make(chan []byte),u: uID}}var CMap map[string]*CSfunc main() { CMap = make(map[string]*CS) Listen,err := net.ListenTCP("tcp",&net.TCPAddr{net.ParseIP("127.0.0.1"),6666,""}) if err != nil { fmt.Println("监听端口失败:",err.Error()) return } fmt.Println("已初始化连接,等待客户端连接...") go PushGRT() Server(Listen) select {}}func PushGRT() { for { time.Sleep(15 * time.Second) for k,v := range CMap { fmt.Println("push msg to user:" + k) v.Wch <- []byte{Req,'#','p','u','s','h','!'} } }}func Server(Listen *net.TCPListener) { for { conn,err := Listen.AcceptTCP() if err != nil { fmt.Println("接受客户端连接异常:",err.Error()) continue } fmt.Println("客户端连接来自:",conn.RemoteAddr().String()) // handler goroutine go Handler(conn) }}func Handler(conn net.Conn) { defer conn.Close() data := make([]byte,128) var uID string var C *CS for { conn.Read(data) fmt.Println("客户端发来数据:",string(data)) if data[0] == Req_REGISTER { // register conn.Write([]byte{Res_REGISTER,'o','k'}) uID = string(data[2:]) C = NewCs(uID) CMap[uID] = C // fmt.Println("register clIEnt") // fmt.Println(uID) break } else { conn.Write([]byte{Res_REGISTER,'e','r'}) } } // WHandler go WHandler(conn,C) // RHandler go RHandler(conn,C) // Worker go Work(C) select { case <-C.Dch: fmt.Println("close handler goroutine") }}// 正常写数据// 定时检测 conn dIE => goroutine dIEfunc WHandler(conn net.Conn,C *CS) { // 读取业务Work 写入Wch的数据 ticker := time.NewTicker(20 * time.Second) for { select { case d := <-C.Wch: conn.Write(d) case <-ticker.C: if _,ok := CMap[C.u]; !ok { fmt.Println("conn dIE,close WHandler") return } } }}// 读客户端数据 + 心跳检测func RHandler(conn net.Conn,C *CS) { // 心跳ack // 业务数据 写入Wch for { data := make([]byte,128) // setReadTimeout err := conn.SetReadDeadline(time.Now().Add(10 * time.Second)) if err != nil { fmt.Println(err) } if _,derr := conn.Read(data); derr == nil { // 可能是来自客户端的消息确认 // 数据消息 fmt.Println(data) if data[0] == Res { fmt.Println("recv clIEnt data ack") } else if data[0] == Req { fmt.Println("recv clIEnt data") fmt.Println(data) conn.Write([]byte{Res,'#'}) // C.Rch <- data } continue } conn.Write([]byte{Req_HEARTBEAT,'#'}) fmt.Println("send ht packet") conn.SetReadDeadline(time.Now().Add(2 * time.Second)) if _,herr := conn.Read(data); herr == nil { // fmt.Println(string(data)) fmt.Println("resv ht packet ack") } else { delete(CMap,C.u) fmt.Println("delete user!") return } }}func Work(C *CS) { time.Sleep(5 * time.Second) C.Wch <- []byte{Req,'l','o'} time.Sleep(15 * time.Second) C.Wch <- []byte{Req,'o'} // 从读ch读信息 /* ticker := time.NewTicker(20 * time.Second) for { select { case d := <-C.Rch: C.Wch <- d case <-ticker.C: if _,ok := CMap[C.u]; !ok { return } } } */ // 往写ch写信息}
package main// golang实现带有心跳检测的tcp长连接// serverimport ( "fmt" "net")var ( Req_REGISTER byte = 1 // 1 --- c register cID Res_REGISTER byte = 2 // 2 --- s response Req_HEARTBEAT byte = 3 // 3 --- s send heartbeat req Res_HEARTBEAT byte = 4 // 4 --- c send heartbeat res Req byte = 5 // 5 --- cs send data Res byte = 6 // 6 --- cs send ack)var Dch chan boolvar Rch chan []bytevar Wch chan []bytefunc main() { Dch = make(chan bool) Rch = make(chan []byte) Wch = make(chan []byte) addr,err := net.ResolveTCPAddr("tcp","127.0.0.1:6666") conn,err := net.DialTCP("tcp",nil,addr)// conn,err := net.Dial("tcp","127.0.0.1:6666") if err != nil { fmt.Println("连接服务端失败:",err.Error()) return } fmt.Println("已连接服务器") defer conn.Close() go Handler(conn) select { case <- Dch: fmt.Println("关闭连接") }}func Handler(conn *net.TCPConn) { // 直到register ok data := make([]byte,128) for { conn.Write([]byte{Req_REGISTER,'2'}) conn.Read(data)// fmt.Println(string(data)) if data[0] == Res_REGISTER { break } }// fmt.Println("i'm register") go RHandler(conn) go WHandler(conn) go Work()}func RHandler(conn *net.TCPConn) { for { // 心跳包,回复ack data := make([]byte,128) i,_ := conn.Read(data) if i == 0 { Dch <- true return } if data[0] == Req_HEARTBEAT { fmt.Println("recv ht pack") conn.Write([]byte{Res_REGISTER,'h'}) fmt.Println("send ht pack ack") } else if data[0] == Req { fmt.Println("recv data pack") fmt.Printf("%v\n",string(data[2:])) Rch <- data[2:] conn.Write([]byte{Res,'#'}) } }}func WHandler(conn net.Conn) { for { select { case msg := <- Wch: fmt.Println((msg[0])) fmt.Println("send data after: " + string(msg[1:])) conn.Write(msg) } }}func Work() { for { select { case msg := <- Rch: fmt.Println("work recv " + string(msg)) Wch <- []byte{Req,'x','x'} } }}总结
以上是内存溢出为你收集整理的golang实现带有心跳检测的tcp长连接全部内容,希望文章能够帮你解决golang实现带有心跳检测的tcp长连接所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)