network architecture of nsqd

network architecture of nsqd,第1张

// the first parameter is the listening port, when a request comes, a go routine is opened to process the link request
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
	for {
			clientConn, err := listener.Accept()
			go func() {
				handler.Handle(clientConn)
				wg.Done()
			}()
		}
}

func (p *tcpServer) Handle(conn net.Conn) {

	// The client should initialize itself by sending a 4 byte sequence indicating
	// the version of the protocol that it intends to communicate, this will allow us
	// to gracefully upgrade the protocol away from text/line oriented to whatever...
	buf := make([]byte, 4)
	//Successfully establish a connection, process the link according to the corresponding protocol number
	err = prot.IOLoop(client)
	if err != nil {
		p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err)
	}
}

func (p *protocolV2) IOLoop(c protocol.Client) error {
	//Create a new Client object
	client := c.(*clientV2)
	//Open another goroutine, send heartbeat information regularly, and the client will reply after receiving the heartbeat information.
	//If nsqd does not receive the heartbeat response of the connection for a long time, it means that there is a problem with the connection and the connection will be disconnected. This is the heartbeat implementation mechanism of nsq
	go p.messagePump(client, messagePumpStartedChan)
	//If the client does not receive the command sent by the client within the time interval of client.HeartbeatInterval * 2, it means that there is a problem with the connection and the link needs to be closed.
	//Under normal circumstances, the client will send a heartbeat reply every HeartbeatInterval.
	for {
		if client.HeartbeatInterval > 0 {
			client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
		} else {
			client.SetReadDeadline(zeroTime)
		}
		//nsq stipulates that all commands end with "\n", and the commands and parameters are separated by spaces
		line, err = client.Reader.ReadSlice('\n')
		//params[0] is the type of command, params[1:] is the command parameter
		params := bytes.Split(line, separatorBytes)
		//Process the command sent by the client
		response, err = p.Exec(client, params)
		//Send the processing result of the command to the client
		if response != nil {
			err = p.Send(client, frameTypeResponse, response)
			if err != nil {
				err = fmt.Errorf("failed to send response - %s", err)
				break
			}
		}		
	}
	//There is a problem with the connection, the connection needs to be closed
	p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
	close(client.ExitChan)
	//client.Channel records the Channel subscribed by the client, and the subscriber needs to be removed from the Channel when the client is closed.
	if client.Channel != nil {
		client.Channel.RemoveClient(client.ID)
	}
	
}



func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
	if bytes.Equal(params[0], []byte("IDENTIFY")) {
		return p.IDENTIFY(client, params)
	}
	err := enforceTLSPolicy(client, p, params[0])
	if err != nil {
		return nil, err
	}
	switch {
	case bytes.Equal(params[0], []byte("FIN")):
		return p.FIN(client, params)
	case bytes.Equal(params[0], []byte("RDY")):
		return p.RDY(client, params)
	case bytes.Equal(params[0], []byte("REQ")):
		return p.REQ(client, params)
	case bytes.Equal(params[0], []byte("PUB")):
		return p.PUB(client, params)
	case bytes.Equal(params[0], []byte("MPUB")):
		return p.MPUB(client, params)
	case bytes.Equal(params[0], []byte("DPUB")):
		return p.DPUB(client, params)
	case bytes.Equal(params[0], []byte("NOP")):
		return p.NOP(client, params)
	case bytes.Equal(params[0], []byte("TOUCH")):
		return p.TOUCH(client, params)
	case bytes.Equal(params[0], []byte("SUB")):
		return p.SUB(client, params)
	case bytes.Equal(params[0], []byte("CLS")):
		return p.CLS(client, params)
	case bytes.Equal(params[0], []byte("AUTH")):
		return p.AUTH(client, params)
	}
	return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}


func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
	//Associate Client with Channel
	// This retry-loop is a work-around for a race condition, where the
	// last client can leave the channel between GetChannel() and AddClient().
	// Avoid adding a client to an ephemeral channel / topic which has started exiting.
	for i := 1; ; i++ {
		if err := channel.AddClient(client.ID, client); err != nil {
			return nil, protocol.NewFatalClientErr(err, "E_SUB_FAILED", "SUB failed "+err.Error())
		}		
	}

	//update message pump
	client.SubEventChan <- channel

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/994119.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存