// the first parameter is the listening port, when a request comes, a go routine is opened to process the link request
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
for {
clientConn, err := listener.Accept()
go func() {
handler.Handle(clientConn)
wg.Done()
}()
}
}
func (p *tcpServer) Handle(conn net.Conn) {
// The client should initialize itself by sending a 4 byte sequence indicating
// the version of the protocol that it intends to communicate, this will allow us
// to gracefully upgrade the protocol away from text/line oriented to whatever...
buf := make([]byte, 4)
//Successfully establish a connection, process the link according to the corresponding protocol number
err = prot.IOLoop(client)
if err != nil {
p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err)
}
}
func (p *protocolV2) IOLoop(c protocol.Client) error {
//Create a new Client object
client := c.(*clientV2)
//Open another goroutine, send heartbeat information regularly, and the client will reply after receiving the heartbeat information.
//If nsqd does not receive the heartbeat response of the connection for a long time, it means that there is a problem with the connection and the connection will be disconnected. This is the heartbeat implementation mechanism of nsq
go p.messagePump(client, messagePumpStartedChan)
//If the client does not receive the command sent by the client within the time interval of client.HeartbeatInterval * 2, it means that there is a problem with the connection and the link needs to be closed.
//Under normal circumstances, the client will send a heartbeat reply every HeartbeatInterval.
for {
if client.HeartbeatInterval > 0 {
client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
} else {
client.SetReadDeadline(zeroTime)
}
//nsq stipulates that all commands end with "\n", and the commands and parameters are separated by spaces
line, err = client.Reader.ReadSlice('\n')
//params[0] is the type of command, params[1:] is the command parameter
params := bytes.Split(line, separatorBytes)
//Process the command sent by the client
response, err = p.Exec(client, params)
//Send the processing result of the command to the client
if response != nil {
err = p.Send(client, frameTypeResponse, response)
if err != nil {
err = fmt.Errorf("failed to send response - %s", err)
break
}
}
}
//There is a problem with the connection, the connection needs to be closed
p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
close(client.ExitChan)
//client.Channel records the Channel subscribed by the client, and the subscriber needs to be removed from the Channel when the client is closed.
if client.Channel != nil {
client.Channel.RemoveClient(client.ID)
}
}
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
if bytes.Equal(params[0], []byte("IDENTIFY")) {
return p.IDENTIFY(client, params)
}
err := enforceTLSPolicy(client, p, params[0])
if err != nil {
return nil, err
}
switch {
case bytes.Equal(params[0], []byte("FIN")):
return p.FIN(client, params)
case bytes.Equal(params[0], []byte("RDY")):
return p.RDY(client, params)
case bytes.Equal(params[0], []byte("REQ")):
return p.REQ(client, params)
case bytes.Equal(params[0], []byte("PUB")):
return p.PUB(client, params)
case bytes.Equal(params[0], []byte("MPUB")):
return p.MPUB(client, params)
case bytes.Equal(params[0], []byte("DPUB")):
return p.DPUB(client, params)
case bytes.Equal(params[0], []byte("NOP")):
return p.NOP(client, params)
case bytes.Equal(params[0], []byte("TOUCH")):
return p.TOUCH(client, params)
case bytes.Equal(params[0], []byte("SUB")):
return p.SUB(client, params)
case bytes.Equal(params[0], []byte("CLS")):
return p.CLS(client, params)
case bytes.Equal(params[0], []byte("AUTH")):
return p.AUTH(client, params)
}
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
//Associate Client with Channel
// This retry-loop is a work-around for a race condition, where the
// last client can leave the channel between GetChannel() and AddClient().
// Avoid adding a client to an ephemeral channel / topic which has started exiting.
for i := 1; ; i++ {
if err := channel.AddClient(client.ID, client); err != nil {
return nil, protocol.NewFatalClientErr(err, "E_SUB_FAILED", "SUB failed "+err.Error())
}
}
//update message pump
client.SubEventChan <- channel
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)