项目中用的是"github.com/xtaci/kcp-go",这个仓库不仅仅实现了kcp算法,而且在kcp算法层面上又包装了一层,比如说提供了多种数据加密方式和FEC前向纠错,用起来非常方便。
这一篇先分析kcp-go库对kcp算法包装这部分,先分析怎么用,怎么用都不知道去谈算法个人觉得没有意义,kcp算法层面的东西后面再分析。
1:先分析服务器层面的
func Listen(laddr string) (net.Listener, error) { return ListenWithOptions(laddr, nil, 0, 0) }
func ListenWithOptions(laddr string, block BlockCrypt, dataShards, parityShards int) (*Listener, error) {
udpaddr, err := net.ResolveUDPAddr("udp", laddr)
if err != nil {
return nil, errors.WithStack(err)
}
conn, err := net.ListenUDP("udp", udpaddr)
if err != nil {
return nil, errors.WithStack(err)
}
return ServeConn(block, dataShards, parityShards, conn)
}
func ServeConn(block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*Listener, error) {
l := new(Listener)
l.conn = conn
l.sessions = make(map[string]*UDPSession)
l.chAccepts = make(chan *UDPSession, acceptBacklog)
l.chSessionClosed = make(chan net.Addr)
l.die = make(chan struct{})
l.dataShards = dataShards
l.parityShards = parityShards
l.block = block
l.fecDecoder = newFECDecoder(rxFECMulti*(dataShards+parityShards), dataShards, parityShards)
l.chSocketReadError = make(chan struct{})
// calculate header size
if l.block != nil {
l.headerSize += cryptHeaderSize
}
if l.fecDecoder != nil {
l.headerSize += fecHeaderSizePlus2
}
go l.monitor()
return l, nil
}
调用kcp.Listen 或者 kcp.ListenWithOptions 这是入口,ServeConn会调用monitor,这个monitor方法是Listener类中的一个方法,不同的平台有不同的实现,readloop_generic.go和readloop_linux.go中都有相应的实现,最终都会回到defaultMonitor()中来,具体看实现。
func (l *Listener) defaultMonitor() {
buf := make([]byte, mtuLimit)
for {
if n, from, err := l.conn.ReadFrom(buf); err == nil {
if n >= l.headerSize+IKCP_OVERHEAD {
l.packetInput(buf[:n], from)
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
}
} else {
l.notifyReadError(errors.WithStack(err))
return
}
}
}
/*
ReadFrom(p []byte) (n int, addr Addr, err error)
type Addr interface {
Network() string // name of the network (for example, "tcp", "udp")
String() string // string form of address (for example, "192.0.2.1:25", "[2001:db8::1]:80")
}
*/
其实就是不停的从conn中读数据,这个conn就是调用net.ListenUDP返回的对象,类似twisted中的服务器对象factory。n, from, err := l.conn.ReadFrom(buf) buf的空间是设置的mutLimit大小,每次最大读这么多内容,n是成功读取的长度,from是对方(client)的详细信息,如果数据满足一个整包的要求就把这些数据塞到packetInput中,
func (l *Listener) packetInput(data []byte, addr net.Addr) {
dataValid := false
if l.block != nil {
l.block.Decrypt(data, data)
data = data[nonceSize:]
checksum := crc32.ChecksumIEEE(data[crcSize:])
if checksum == binary.LittleEndian.Uint32(data) {
data = data[crcSize:]
dataValid = true
} else {
atomic.AddUint64(&DefaultSnmp.InCsumErrors, 1)
}
} else if l.block == nil {
dataValid = true
}
if dataValid {
l.sessionLock.Lock()
s, ok := l.sessions[addr.String()]
l.sessionLock.Unlock()
var conv, sn uint32
convValid := false
if l.fecDecoder != nil {
isfec := binary.LittleEndian.Uint16(data[4:])
if isfec == typeData {
conv = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2:])
sn = binary.LittleEndian.Uint32(data[fecHeaderSizePlus2+IKCP_SN_OFFSET:])
convValid = true
}
} else {
conv = binary.LittleEndian.Uint32(data)
sn = binary.LittleEndian.Uint32(data[IKCP_SN_OFFSET:])
convValid = true
}
if ok { // existing connection
if !convValid || conv == s.kcp.conv { // parity or valid data shard
s.kcpInput(data)
} else if sn == 0 { // should replace current connection
s.Close()
s = nil
}
}
if s == nil && convValid { // new session
if len(l.chAccepts) < cap(l.chAccepts) { // do not let the new sessions overwhelm accept queue
s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, addr, l.block)
s.kcpInput(data)
l.sessionLock.Lock()
l.sessions[addr.String()] = s
l.sessionLock.Unlock()
l.chAccepts <- s
}
}
}
}
这里边主要干了一下几件事儿了
a:数据有加密就执行解密
b:读取kcp协议头里边的conv和sn
c:通过s, ok := l.sessions[addr.String()]判断当前这个连接是旧的连接还是新的连接,sessions里边保存了所有的client连接信息,如果是旧连接就把调用session的s.kcpInput(),如果是一个新连接进来并且conv合法就新创建一个客户端(udpsession)对象,把这个对象保存到listener的sessions当中。这里边在极端情况下可能会有问题,如果2个客户端在同一个局域网的情况下连接服务器 外网IP又相同,恰好2个客户端随机的端口又一样这样的话key=addr.String()会是一模一样,2个不同的客户端会被服务器认为是同一个。
func (s *UDPSession) kcpInput(data []byte) {
var kcpInErrors, fecErrs, fecRecovered, fecParityShards uint64
if s.fecDecoder != nil {
if len(data) > fecHeaderSize { // must be larger than fec header size
f := fecPacket(data)
if f.flag() == typeData || f.flag() == typeParity { // header check
if f.flag() == typeParity {
fecParityShards++
}
// lock
s.mu.Lock()
recovers := s.fecDecoder.decode(f)
if f.flag() == typeData {
if ret := s.kcp.Input(data[fecHeaderSizePlus2:], true, s.ackNoDelay); ret != 0 {
kcpInErrors++
}
}
for _, r := range recovers {
if len(r) >= 2 { // must be larger than 2bytes
sz := binary.LittleEndian.Uint16(r)
if int(sz) <= len(r) && sz >= 2 {
if ret := s.kcp.Input(r[2:sz], false, s.ackNoDelay); ret == 0 {
fecRecovered++
} else {
kcpInErrors++
}
} else {
fecErrs++
}
} else {
fecErrs++
}
// recycle the recovers
xmitBuf.Put(r)
}
// to notify the readers to receive the data
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
// to notify the writers
waitsnd := s.kcp.WaitSnd()
if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) {
s.notifyWriteEvent()
}
s.uncork()
s.mu.Unlock()
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
}
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
}
} else {
s.mu.Lock()
if ret := s.kcp.Input(data, true, s.ackNoDelay); ret != 0 {
kcpInErrors++
}
if n := s.kcp.PeekSize(); n > 0 {
s.notifyReadEvent()
}
waitsnd := s.kcp.WaitSnd()
if waitsnd < int(s.kcp.snd_wnd) && waitsnd < int(s.kcp.rmt_wnd) {
s.notifyWriteEvent()
}
s.uncork()
s.mu.Unlock()
}
atomic.AddUint64(&DefaultSnmp.InPkts, 1)
atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data)))
if fecParityShards > 0 {
atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards)
}
if kcpInErrors > 0 {
atomic.AddUint64(&DefaultSnmp.KCPInErrors, kcpInErrors)
}
if fecErrs > 0 {
atomic.AddUint64(&DefaultSnmp.FECErrs, fecErrs)
}
if fecRecovered > 0 {
atomic.AddUint64(&DefaultSnmp.FECRecovered, fecRecovered)
}
}
kcpInput看着挺复杂,前边长篇大论都是如果启动了FEC时根据FEC的规则读取数据。如果没有启用FEC,数据就不用处理就调用 s.kcp.Input(data, true, s.ackNoDelay)把数据塞到kcp算法中进行处理,然后判断kcp还能不能继续读和写进而发送相应的事件。
func (s *UDPSession) notifyReadEvent() {
select {
case s.chReadEvent <- struct{}{}:
default:
}
}
func (s *UDPSession) notifyWriteEvent() {
select {
case s.chWriteEvent <- struct{}{}:
default:
}
}
如果能继续读就向chReadEvent中发送数据,而Read()函数中有监听chReadEvent这个channel的事件 如果能继续写就向chWriteEvent中发送数据,而Write()中有监听chWriteEvent这个channel的事件 Read()就是经过kcp解包拿数据,Write()就是把数据写到kcp经过它的包装加上kcp包头。
2:在来看看客户端的一些逻辑
func Dial(raddr string) (net.Conn, error) { return DialWithOptions(raddr, nil, 0, 0) }
func DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int) (*UDPSession, error) {
// network type detection
udpaddr, err := net.ResolveUDPAddr("udp", raddr)
if err != nil {
return nil, errors.WithStack(err)
}
network := "udp4"
if udpaddr.IP.To4() == nil {
network = "udp"
}
conn, err := net.ListenUDP(network, nil)
if err != nil {
return nil, errors.WithStack(err)
}
return NewConn(raddr, block, dataShards, parityShards, conn)
}
// NewConn3 establishes a session and talks KCP protocol over a packet connection.
func NewConn3(convid uint32, raddr net.Addr, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
return newUDPSession(convid, dataShards, parityShards, nil, conn, raddr, block), nil
}
// NewConn2 establishes a session and talks KCP protocol over a packet connection.
func NewConn2(raddr net.Addr, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
var convid uint32
binary.Read(rand.Reader, binary.LittleEndian, &convid)
return NewConn3(convid, raddr, block, dataShards, parityShards, conn)
}
// NewConn establishes a session and talks KCP protocol over a packet connection.
func NewConn(raddr string, block BlockCrypt, dataShards, parityShards int, conn net.PacketConn) (*UDPSession, error) {
udpaddr, err := net.ResolveUDPAddr("udp", raddr)
if err != nil {
return nil, errors.WithStack(err)
}
return NewConn2(udpaddr, block, dataShards, parityShards, conn)
}
客户端调用Dial 或者 DialWithOptions就会得到一个Session对象(类似twisted中的protocol对象)
这里边重点说一下NewConn2方法,通过rand.Reader随机的产生了4个字节的convid,这个是convid是kcp协议头的组成部分,服务器收到数据后读取convid 作为数据合法性判断的重要依据。
客户端获取了session对象之后 就可以正常的read和write了,跟上边server介绍的read、write是一抹一样的情况。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)