2022.8.6 更新了类图的
前言这篇文章实际上就是我的一篇学习笔记,书籍用的是《区块链国产化实践指南:基于Fabric 2.0》,这本书对于想要从源码角度去学习Fabric很有帮助。然后因为这只是我的学习笔记,所以难免有些内容是我刚学时的笔记认知很浅,也可能有些地方是我理解错误的,但是大部分应该是没问题的。
这篇文章我先会从gossip的功能、大致工作流程、模块组成等角度去大致介绍gossip,然后再从源码角度去分析它一次分享区块完整的工作流程和它的各个模块,最后我再去总结各种GossipMessage类型,它会覆盖到之前所有的模块。
还有就是,这篇文章好像有点长,贴了很多源码,全看很浪费时间,一定要先搞清楚想要了解什么再看,如你对某个GossipMessage感兴趣,你可以直接看最后的应用场景,会有指向对应部分源码的链接,想看gossip散播一个区块的流程就看gossip服务的散播流程。还有就是,阅读源码要自己看代码,我的源码只能当辅助。
Gossip概要介绍
接下来,我会用尽力形象的方式去介绍一下gossip。
Gossip的中文翻译就是流言蜚语,它会把区块、节点状态、证书等信息跟班级里的恋爱消息在一个个同学(peer节点)的努力下一传十十传百,最后班级(一个fabric网络)里的人都知道了,当然有些消息不能让有些人听到,只能在一个小圈子(如channel)里传播。
以上就是从整体的角度去形容gossip的功能,而要达到这样的功能,每个同学都得去做到一些事,这样才能保证整个班级正常地传播流言蜚语(怎么听起来感觉不大好。。。)
同学们得先有双耳朵去听到别人说给你的消息,还要有一张嘴去散播你的语言。负责这一块功能的gossip的Comm通信子模块,它会将自己作为Gossip服务注册在GRPC服务器上,grpc服务器接收到消息后发现消息被标注为Gossip消息,它就会交给Comm通信子模块来处理。此外,Comm还会提供Send功能,它会负责与要散播的节点建立连接并发送Gossip消息。同学们还应该知道班级里有哪些同学,哪些同学会理你等等。这就对应了gossip的discovery节点发现子模块,它将维护各个节点的Alive消息、属性(如账本高度),并不断交互(散播、或者一对一请求响应)来更新数据。最重要的,同学得有个统筹全局的大脑。Gossip的两个核心类,GossipService和Node,它们俩会负责统筹和初始化所有模块,了解其启动过程是我们的第一步。同学还需要将听到的消息分类并采用不同方式处理的能力。消息分类是交给Comm通信子模块下的ChannelMultiplexer的channel分发器完成,这里的channel不是fabric中的channel,而是类似于youtube中的频道,不同模块会根据其需要处理的消息去向该分发器订阅该类型的Gossip消息。除了以上的Comm和Discovery子模块以外,它还有以下子模块:
GossipChannel,GossipService和Node会负责统筹整个节点的gossip服务,那么GossipChannel则会负责这个Channel的gossip服务,它会处理所有指向特定Channel的所有Gossip消息。它的功能很复杂,实在有点难讲清楚。MsgStore,它会负责暂时存储消息,它会被其他子模块使用,尝尝应用于暂时存储从其他节点收到的消息,再将存储到的消息散播给其他节点的情况。当然,它本身就是消息存储器,可以配置不同内容可以达到不同功能,也要具体看对应模块。Pull,它会负责向其他节点拉取消息,暂时只有两个应用场景,Node类中作为certPuller拉取其他节点的身份信息,GossipChannel中作为blocksPuller拉取区块信息。GossipStateProvider,它会负责将收到的区块信息写入到账本中,还会向其他节点请求缺少的区块。其实还有处理私有数据和leader选举的子模块,我暂时还没有看 源码详细分析分析前,先分享一下我整理的gossip核心的类图,这个是动态的,我还在完善。。。
服务功能和原型定义gossip服务底层采用grpc进行节点间交流,服务原型如下:
// protos/gossip/message.pb.go
// gossip服务器
type GossipServer interface {
// GossipStream is the gRPC stream used for sending and receiving messages
GossipStream(Gossip_GossipStreamServer) error
// Ping is used to probe a remote peer's aliveness
Ping(context.Context, *Empty) (*Empty, error)
}
// gossip客户端
type GossipClient interface {
// GossipStream is the gRPC stream used for sending and receiving messages
GossipStream(ctx context.Context, opts ...grpc.CallOption) (Gossip_GossipStreamClient, error)
// Ping is used to probe a remote peer's aliveness
Ping(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error)
}
// 客户端默认实现
type gossipClient struct {
cc *grpc.ClientConn
}
// Gossip消息
type GossipMessage struct {
// 它决定了该信息转发给哪些节点
Tag GossipMessage_Tag
// 消息的内容,消息分为很多类型
Content isGossipMessage_Content
}
gossip服务的启动和配置
peer节点会启动gossip服务
// internal/peer/node/start.go
func serve(args []string) error {
gossipService, err := initGossipService(...)
}
// initGossipService will initialize the gossip service by:
// 1. Enable TLS if configured;
// 2. Init the message crypto service;
// 3. Init the security advisor;
// 4. Init gossip related struct.
func initGossipService(...) (*gossipservice.GossipService, error) {
var certs *gossipcommon.TLSCertificates
// 配置TLS
if peerServer.TLSEnabled() {
serverCert := peerServer.ServerCertificate()
clientCert, err := peer.GetClientCertificate()
if err != nil {
return nil, errors.Wrap(err, "failed obtaining client certificates")
}
certs = &gossipcommon.TLSCertificates{}
certs.TLSServerCert.Store(&serverCert)
certs.TLSClientCert.Store(&clientCert)
}
// 初始化消息加密服务
// 取得本地MSP,详见MSP部分
localMSP := mgmt.GetLocalMSP(factory.GetDefault())
// 创建MessageCryptoService
deserManager := peergossip.NewDeserializersManager(localMSP)
messageCryptoService := peergossip.NewMCS(
policyMgr,
signer,
deserManager,
factory.GetDefault(),
)
// 初始化SecurityAdvisor,它主要用于根据节点身份取得MSP身份
secAdv := peergossip.NewSecurityAdvisor(deserManager)
bootstrap := viper.GetStringSlice("peer.gossip.bootstrap")
// 提取配置信息
serviceConfig := gossipservice.GlobalConfig()
if serviceConfig.Endpoint != "" {
peerAddress = serviceConfig.Endpoint
}
gossipConfig, err := gossipgossip.GlobalConfig(peerAddress, certs, bootstrap...)
return gossipservice.New(...)
}
消息加密服务
// gossip/api/crypto.go
// MessageCryptoService 是 gossip 组件和 peer 的加密层之间的合约,gossip 组件使用它来验证和验证远程 peer 和它们发送的数据,以及验证从排序服务接收到的块。
type MessageCryptoService interface {
// 返回身份的PKIid
GetPKIidOfCert(peerIdentity PeerIdentityType) common.PKIidType
// 如果块被正确签名,VerifyBlock 返回 nil,并且确保seqNum就是块头包含的序列号。否则返回错误
VerifyBlock(channelID common.ChannelID, seqNum uint64, block *cb.Block) error
// 对消息进行签名,并返回签名
Sign(msg []byte) ([]byte, error)
// 根据身份判断签名是否符合该消息
Verify(peerIdentity PeerIdentityType, signature, message []byte) error
// 验证特定channel下的消息是否符合签名
VerifyByChannel(channelID common.ChannelID, peerIdentity PeerIdentityType, signature, message []byte) error
// 验证身份是否合法
ValidateIdentity(peerIdentity PeerIdentityType) error
// 返回身份的过期时间,如果能过期的,否则返回0
Expiration(peerIdentity PeerIdentityType) (time.Time, error)
}
// internal/peer/gossip/mcs.go
type MSPMessageCryptoService struct {
channelPolicyManagerGetter policies.ChannelPolicyManagerGetter
localSigner identity.SignerSerializer
deserializer DeserializersManager
hasher Hasher
}
// 其ValidateIdentity、Expiration,实际上就是依赖getValidatedIdentity方法来实现
func (s *MSPMessageCryptoService) getValidatedIdentity(peerIdentity api.PeerIdentityType) (msp.Identity, common.ChannelID, error) {
// 先尝试反序列化
sId, err := s.deserializer.Deserialize(peerIdentity)
// 再尝试使用本地MSP反序列化
lDes := s.deserializer.GetLocalDeserializer()
identity, err := lDes.DeserializeIdentity([]byte(peerIdentity))
if err == nil {
// 成功就返回
if identity.GetMSPIdentifier() == s.deserializer.GetLocalMSPIdentifier() {
// 这个比较貌似本应该在上一步完成,但是现在还没实现,因为不能把MSPID写入到X509中
return identity, nil, identity.Validate()
}
}
// 最后尝试用其他msp节点来反序列化该节点
for chainID, mspManager := range s.deserializer.GetChannelDeserializers() {
// Deserialize identity
identity, err := mspManager.DeserializeIdentity([]byte(peerIdentity))
if err != nil {
continue
}
if err := identity.Validate(); err != nil {
continue
}
return identity, common.ChannelID(chainID), nil
}
return nil, nil, fmt.Errorf("Peer Identity %s cannot be validated. No MSP found able to do that.", peerIdentity)
}
// 该方法大概率会在gossip收到区块时用来验证区块是否合法,这里貌似没有验证每个事务
func (s *MSPMessageCryptoService) VerifyBlock(chainID common.ChannelID, seqNum uint64, block *pcommon.Block) error {
// 比对所需序列号是否和区块头部的序号是否一致
blockSeqNum := block.Header.Number
if seqNum != blockSeqNum {
return fmt.Errorf("Claimed seqNum is [%d] but actual seqNum inside block is [%d]", seqNum, blockSeqNum)
}
// 比对通道id是否一致
channelID, err := protoutil.GetChannelIDFromBlock(block)
if channelID != string(chainID) {
return fmt.Errorf("Invalid block's channel id. Expected [%s]. Given [%s]", chainID, channelID)
}
// 解析出元数据
metadata, err := protoutil.GetMetadataFromBlock(block, pcommon.BlockMetadataIndex_SIGNATURES)
// 计算区块数据hash,并和它头部给出的hash比较
if !bytes.Equal(protoutil.BlockDataHash(block.Data), block.Header.DataHash) {
return fmt.Errorf("Header.DataHash is different from Hash(block.Data) for block with id [%d] on channel [%s]", block.Header.Number, chainID)
}
// 目前可以保证区块内容未被修改。除非头部hash值被修改,但这是没关系的,因为会导致断链,可以简单被查出
// 下要确保该区块中的交易符合策略
// 取得该通道的策略
cpm := s.channelPolicyManagerGetter.Manager(channelID)
// 找到区块检验的策略,/Channel/Orderer/BlockValidation
policy, ok := cpm.GetPolicy(policies.BlockValidation)
// 提取出区块的元数据中的签名,组装为签名集
signatureSet := []*protoutil.SignedData{}
for _, metadataSignature := range metadata.Signatures {
shdr, err := protoutil.UnmarshalSignatureHeader(metadataSignature.SignatureHeader)
signatureSet = append(
signatureSet,
&protoutil.SignedData{
Identity: shdr.Creator,
Data: util.ConcatenateBytes(metadata.Value, metadataSignature.SignatureHeader, protoutil.BlockHeaderBytes(block.Header)),
Signature: metadataSignature.Signature,
},
)
}
// 策略验证签名数据是否符合策略
return policy.EvaluateSignedData(signatureSet)
}
// 签名直接使用签名身份完成
func (s *MSPMessageCryptoService) Sign(msg []byte) ([]byte, error) {
return s.localSigner.Sign(msg)
}
func (s *MSPMessageCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error {
identity, chainID, err := s.getValidatedIdentity(peerIdentity)
if len(chainID) == 0 {
// 说明该身份是本地MSP中的,直接调用即可
return identity.Verify(message, signature)
}
// 来自其他MSP
return s.VerifyByChannel(chainID, peerIdentity, signature, message)
}
func (s *MSPMessageCryptoService) VerifyByChannel(chainID common.ChannelID, peerIdentity api.PeerIdentityType, signature, message []byte) error {
// 取得指定通道的策略管理员
cpm := s.channelPolicyManagerGetter.Manager(string(chainID))
// 取得应用通道的Readers策略, /Channel/Application/Readers
policy, flag := cpm.GetPolicy(policies.ChannelApplicationReaders)
// 最终调用策略的验证
return policy.EvaluateSignedData(
[]*protoutil.SignedData{{
Data: message,
Identity: []byte(peerIdentity),
Signature: signature,
}},
)
}
创建gossip服务对象
func New(
peerIdentity identity.SignerSerializer,
gossipMetrics *gossipmetrics.GossipMetrics,
endpoint string,
s *grpc.Server,
mcs api.MessageCryptoService,
secAdv api.SecurityAdvisor,
secureDialOpts api.PeerSecureDialOpts,
credSupport *corecomm.CredentialSupport,
gossipConfig *gossip.Config,
serviceConfig *ServiceConfig,
privdataConfig *gossipprivdata.PrivdataConfig,
deliverServiceConfig *deliverservice.DeliverServiceConfig,
) (*GossipService, error) {
// 它就是签名身份
serializedIdentity, err := peerIdentity.Serialize()
if err != nil {
return nil, err
}
// 它会维护和管理所有锚节点
anchorPeerTracker := &anchorPeerTracker{allEndpoints: map[string]map[string]struct{}{}}
// 创建gossip节点
gossipComponent := gossip.New(
gossipConfig,
s,
secAdv,
mcs,
serializedIdentity,
secureDialOpts,
gossipMetrics,
anchorPeerTracker,
)
// 创建gossip服务对象
return &GossipService{
gossipSvc: gossipComponent,
mcs: mcs,
privateHandlers: make(map[string]privateHandler),
chains: make(map[string]state.GossipStateProvider),
leaderElection: make(map[string]election.LeaderElectionService),
deliveryService: make(map[string]deliverservice.DeliverService),
deliveryFactory: &deliveryFactoryImpl{
signer: peerIdentity,
credentialSupport: credSupport,
deliverServiceConfig: deliverServiceConfig,
},
peerIdentity: serializedIdentity,
secAdv: secAdv,
metrics: gossipMetrics,
serviceConfig: serviceConfig,
privdataConfig: privdataConfig,
anchorPeerTracker: anchorPeerTracker,
}, nil
}
type anchorPeerTracker struct {
// [channelName: [endpoint: 不重要的值]]
allEndpoints map[string]map[string]struct{}
mutex sync.RWMutex
}
// gossip/gossip/gossip_impl.go
// 创建一个gossip节点,并绑定到grpc服务器上
func New(conf *Config, s *grpc.Server, sa api.SecurityAdvisor,
mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType,
secureDialOpts api.PeerSecureDialOpts, gossipMetrics *metrics.GossipMetrics,
anchorPeerTracker discovery.AnchorPeerTracker) *Node {
// 创建对象
g := &Node{...}
// 创建状态消息缓冲存储器
g.stateInfoMsgStore = g.newStateInfoMsgStore()
// 创建身份映射器,一个线程同步的map[PKIID: identity]
g.idMapper = identity.NewIdentityMapper(mcs, selfIdentity, func(pkiID common.PKIidType, identity api.PeerIdentityType) {
g.comm.CloseConn(&comm.RemotePeer{PKIID: pkiID})
g.certPuller.Remove(string(pkiID))
}, sa)
// 创建通信模块
commConfig := comm.CommConfig{
DialTimeout: conf.DialTimeout,
ConnTimeout: conf.ConnTimeout,
RecvBuffSize: conf.RecvBuffSize,
SendBuffSize: conf.SendBuffSize,
}
g.comm, err = comm.NewCommInstance(s, conf.TLSCerts, g.idMapper, selfIdentity, secureDialOpts, sa,
gossipMetrics.CommMetrics, commConfig
// 多通道状态路由器,负责路由每个应用通道的数据消息、状态数据、状态消息
g.chanState = newChannelState(g)
// 批量消息发射器,用于节点继续散播消息
g.emitter = newBatchingEmitter(conf.PropagateIterations,
conf.MaxPropagationBurstSize, conf.MaxPropagationBurstLatency,
g.sendGossipBatch)
// 创建节点发现器
g.discAdapter = g.newDiscoveryAdapter()
g.disSecAdap = g.newDiscoverySecurityAdapter()
discoveryConfig := discovery.DiscoveryConfig{...}
self := g.selfNetworkMember()
g.disc = discovery.NewDiscoveryService(self, g.discAdapter, g.disSecAdap, g.disclosurePolicy,
discoveryConfig, anchorPeerTracker, logger)
// 创建身份z书存储器
g.certPuller = g.createCertStorePuller()
g.certStore = newCertStore(g.certPuller, g.idMapper, selfIdentity, mcs)
go g.start()
// 连接上bootstrap节点(该节点所属组织的节点)
go g.connect2BootstrapPeers()
return g
}
func (g *Node) start() {
go g.syncDiscovery()
// 启动死亡节点监测协程,将监测到的死亡节点信息记录下来
go g.handlePresumedDead()
// 消息选择器
msgSelector := func(msg interface{}) bool {...}
// 取得通信模块接受的信息,此为一个chan
incMsgs := g.comm.Accept(msgSelector)
// 使用协程处理信息
go g.acceptMessages(incMsgs)
g.logger.Info("Gossip instance", g.conf.ID, "started")
}
func (g *Node) syncDiscovery() {
// 每隔一段时间向一定数量存活的节点去请求其成员信息
for !g.toDie() {
g.disc.InitiateSync(g.conf.PullPeerNum)
time.Sleep(g.conf.PullInterval)
}
}
gossip服务的散播流程
peer节点通过deliver客户端取得区块后交由Gossip服务
将区块添加到本地账本中
将数据消息放入到区块散播缓存器中
如果还需要散播,则放入到批量消息发射器中
emitter散播信息
comm通信模块发送信息
接收其他节点散播的区块信息
将信息交给通道消息分发器ChannelDeMultiexer处理
将消息交给不同组件处理
GossipChannel处理消息,Forward交给emit继续散播
交给Demux分发器分发消息
1、peer节点通过deliver客户端取得区块后交由Gossip服务
// internal/plg/blocksprovider/blocksprovider.go
func (d *Deliverer) processMsg(msg *orderer.DeliverResponse) error {
if err := d.Gossip.AddPayload(d.ChannelID, payload); err != nil {return errors}
d.Gossip.Gossip(gossipMsg)
return nil
}
2、将区块添加到本地账本中
通过AddPayload方法,将区块加入到缓存中
// gossip/service/gossip_service.go
func (g *GossipService) AddPayload(channelID string, payload *gproto.Payload) error {
return g.chains[channelID].AddPayload(payload)
}
// 以下部分称为state子模块
// gossip/state/state.go
func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error {
return s.addPayload(payload, s.blockingMode)
}
func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMode bool) error {
// 取得账本高度
height, err := s.ledger.LedgerHeight()
// 如果不是阻塞模式,如果该区块的序列号比账本高度大过缓存器
if !blockingMode && payload.SeqNum-height >= uint64(s.config.StateBlockBufferSize) {
return errors.Errorf("Ledger height is at %d, cannot enqueue block with sequence of %d", height, payload.SeqNum)
}
// 如果处于阻塞模式,则阻塞
for blockingMode && s.payloads.Size() > s.config.StateBlockBufferSize*2 {
time.Sleep(enqueueRetryInterval)
}
// payloads为待写入队列,类型为PayloadBuffer
s.payloads.Push(payload)
return nil
}
type PayloadsBufferImpl struct {
// 下一个d出的序列号,它会d出下一个未存储的区块
next uint64
// 缓存队列,使用map实现,它会提前存储不连续的区块
buf map[uint64]*proto.Payload
// 提示已经准备好下一个区块
readyChan chan struct{}
}
func (b *PayloadsBufferImpl) Push(payload *proto.Payload) {
seqNum := payload.SeqNum
if seqNum < b.next || b.buf[seqNum] != nil {
// 避免重复
}
b.buf[seqNum] = payload
// 如果这个区块刚好是下一个区块,则给readyChan
if seqNum == b.next && len(b.readyChan) == 0 {
b.readyChan <- struct{}{}
}
}
func (b *PayloadsBufferImpl) Pop() *proto.Payload {
result := b.buf[b.Next()]
if result != nil {
// 删除该缓存
delete(b.buf, b.Next())
// 自增next
atomic.AddUint64(&b.next, 1)
}
return result
}
GossipStateProvider 在初始化时会开启处理缓存区块的协程
// gossip/state/state.go
func NewGossipStateProvider(...){
go s.deliverPayloads()
}
func (s *GossipStateProviderImpl) deliverPayloads() {
for {
select {
// 阻塞等待区块准备完毕
case <-s.payloads.Ready():
for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() {
rawBlock := &common.Block{}
// 这里貌似没有补偿机制
if err := pb.Unmarshal(payload.Data, rawBlock); err != nil {continue}
if rawBlock.Data == nil || rawBlock.Header == nil {continue}
var p util.PvtDataCollections
// 有私有数据读取一下
if payload.PrivateData != nil {err := p.Unmarshal(payload.PrivateData)}
// 提交区块
if err := s.commitBlock(rawBlock, p); err != nil {}
}
case <-s.stopCh:
return
}
}
}
func (s *GossipStateProviderImpl) commitBlock(block *common.Block, pvtData util.PvtDataCollections) error {
// 存储区块,此部分已经脱离gossip,不再深挖
if err := s.ledger.StoreBlock(block, pvtData); err != nil {
return err
}
// 更新账本高度
s.mediator.UpdateLedgerHeight(block.Header.Number+1, common2.ChannelID(s.chainID))
return nil
}
3、将数据消息放入到区块散播缓存器中
4、如果还需要散播,则放入到批量消息发射器中
func (g *Node) Gossip(msg *pg.GossipMessage) {
// 根据gossip消息,判断其散播范围是否合法
if err := protoext.IsTagLegal(msg); err != nil {}
sMsg := &protoext.SignedGossipMessage{GossipMessage: msg,}
// 散播非数据类型的消息需要对其签名
if protoext.IsDataMsg(sMsg.GossipMessage) {
sMsg, err = protoext.NoopSign(sMsg.GossipMessage)
} else {
_, err = sMsg.Sign(func(msg []byte) ([]byte, error) {
return g.mcs.Sign(msg)
})
}
if protoext.IsChannelRestricted(msg) { // 若是可在通道范围内传播的gossip消息
// 取得GossipChannel对象
gc := g.chanState.getGossipChannelByChainID(msg.Channel)
// 如果是数据消息,加入到MsgStore区块散播缓存器
if protoext.IsDataMsg(msg) {
gc.AddToMsgStore(sMsg)
}
}
if g.conf.PropagateIterations == 0 {return} // 传播迭代数为0,则不再需要通过emitter批量
g.emitter.Add(&emittedGossipMessage{ // 将要散播的消息放入到批量消息发射器emitter中
SignedGossipMessage: sMsg,
filter: func(_ common.PKIidType) bool {
return true
},
})
}
gossip消息类型见下面
将消息放入到区块散播缓存器中
// gossip/gossip/channel/channel.go
func (gc *gossipChannel) AddToMsgStore(msg *protoext.SignedGossipMessage) {
if protoext.IsDataMsg(msg.GossipMessage) {
added := gc.blockMsgStore.Add(msg) // 保证不重复接收区块,和已经存储的区块序号相差不大于总容量(装满会连续最新)
if added {
gc.blocksPuller.Add(msg)
}
}
if protoext.IsStateInfoMsg(msg.GossipMessage) {
gc.stateInfoMsgStore.Add(msg)
}
}
// gossip/gossip/msgstore/message.pb.go
func (s *messageStoreImpl) Add(message interface{}) bool {
n := len(s.messages)
for i := 0; i < n; i++ {
m := s.messages[i]
switch s.pol(message, m.data) {
// 直接不要该区块
case common.MessageInvalidated:
return false
// 删去过老区块,此为更新的方式
case common.MessageInvalidates:
s.invTrigger(m.data)
s.messages = append(s.messages[:i], s.messages[i+1:]...)
n--
i--
}
}
s.messages = append(s.messages, &msg{data: message, created: time.Now()})
return true
}
// gossip/protoext/msgcomparator.go
func (mc *msgComparator) dataInvalidationPolicy(thisDataMsg *gossip.DataMessage, thatDataMsg *gossip.DataMessage) common.InvalidationResult {
if thisDataMsg.Payload.SeqNum == thatDataMsg.Payload.SeqNum {
return common.MessageInvalidated // 序号相同,放弃该区块
}
diff := abs(thisDataMsg.Payload.SeqNum, thatDataMsg.Payload.SeqNum)
if diff <= uint64(mc.dataBlockStorageSize) {
return common.MessageNoAction // 差距小于容量,加入到末尾
}
if thisDataMsg.Payload.SeqNum > thatDataMsg.Payload.SeqNum {
return common.MessageInvalidates // 差距大于容量且大于原区块的,删去原区块,加入新区块
}
return common.MessageInvalidated
}
// 此处仅add,暂不要考虑过多
// gossip/gossip/pull/pullstore.go
func (p *pullMediatorImpl) Add(msg *protoext.SignedGossipMessage) {
itemID := p.IdExtractor(msg)
p.itemID2Msg[itemID] = msg // 此为一个itemId和msg的map
p.engine.Add(itemID)
}
5、emitter散播信息
6、comm通信模块发送信息
// gossip/gossip/batcher.go
func (p *batchingEmitterImpl) Add(message interface{}) {
// 无需散播,则直接返回
if p.iterations == 0 { return }
// 缓存中添加上该信息
p.buff = append(p.buff, &batchedMessage{data: message, iterationsLeft: p.iterations})
// 如果缓存到达阈值,发射
if len(p.buff) >= p.burstSize { p.emit() }
}
func (p *batchingEmitterImpl) emit() {
if p.toDie() { return }
if len(p.buff) == 0 { return }
msgs2beEmitted := make([]interface{}, len(p.buff))
for i, v := range p.buff { msgs2beEmitted[i] = v.data } // 把缓存器中的数据复制
p.cb(msgs2beEmitted) // 此处cb的值为sendGossipBatch
p.decrementCounters()
}
// gossip/gossip/gossip_impl.go
func (g *Node) sendGossipBatch(a []interface{}) {
msgs2Gossip := make([]*emittedGossipMessage, len(a))
for i, e := range a {
msgs2Gossip[i] = e.(*emittedGossipMessage)
}
g.gossipBatch(msgs2Gossip)
}
func (g *Node) gossipBatch(msgs []*emittedGossipMessage) {
var blocks []*emittedGossipMessage
var stateInfoMsgs []*emittedGossipMessage
var orgMsgs []*emittedGossipMessage
var leadershipMsgs []*emittedGossipMessage
// 依次处理 区块消息
blocks, msgs = partitionMessages(isABlock, msgs)
g.gossipInChan(blocks, func(gc channel.GossipChannel) filter.RoutingFilter {
return filter.CombineRoutingFilters(gc.EligibleForChannel, gc.IsMemberInChan, g.IsInMyOrg)
})
// 领导消息
leadershipMsgs, msgs = partitionMessages(isLeadershipMsg, msgs)
g.gossipInChan(leadershipMsgs, func(gc channel.GossipChannel) filter.RoutingFilter {
return filter.CombineRoutingFilters(gc.EligibleForChannel, gc.IsMemberInChan, g.IsInMyOrg)
})
// 状态消息
stateInfoMsgs, msgs = partitionMessages(isAStateInfoMsg, msgs)
for _, stateInfMsg := range stateInfoMsgs {... }
// 组织内部消息
orgMsgs, msgs = partitionMessages(isOrgRestricted, msgs)
peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), g.IsInMyOrg)
for _, msg := range orgMsgs {... }
// 其余消息
for _, msg := range msgs {... }
}
func (g *Node) gossipInChan(messages []*emittedGossipMessage, chanRoutingFactory channelRoutingFilterFactory) {
// 提取出各个消息中所包含的
totalChannels := extractChannels(messages)
var channel common.ChannelID
var messagesOfChannel []*emittedGossipMessage
for len(totalChannels) > 0 {
// 依次处理各个通过的信息
channel, totalChannels = totalChannels[0], totalChannels[1:]
// 区分出该通道的信息
messagesOfChannel, messages = partitionMessages(grabMsgs, messages)
// Grab channel object for that channel
gc := g.chanState.getGossipChannelByChainID(channel)
// Select the peers to send the messages to
// For leadership messages we will select all peers that pass routing factory - e.g. all peers in channel and org
membership := g.disc.GetMembership()
var peers2Send []*comm.RemotePeer
if protoext.IsLeadershipMsg(messagesOfChannel[0].GossipMessage) {
// 随机找出几个符合的peer节点
peers2Send = filter.SelectPeers(len(membership), membership, chanRoutingFactory(gc))
} else {
peers2Send = filter.SelectPeers(g.conf.PropagatePeerNum, membership, chanRoutingFactory(gc))
}
// 通过通信模块发送信息
for _, msg := range messagesOfChannel {
filteredPeers := g.removeSelfLoop(msg, peers2Send)
g.comm.Send(msg.SignedGossipMessage, filteredPeers...)
}
}
}
7、接收其他节点散播的区块信息
首先,节点接收信息的入口只有grpcServer这一个,它会注册GossipServer服务到grpc服务器中。
// 在初始化Gossip时,会创建Node实例,期间会创建通信模块实例commImpl,该实例实现了GossipServer,它会被注册到grpc服务器中
// proto/gossip/message.pb.go
var _Gossip_serviceDesc = grpc.ServiceDesc{
ServiceName: "gossip.Gossip",
HandlerType: (*GossipServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Ping",
Handler: _Gossip_Ping_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "GossipStream",
Handler: _Gossip_GossipStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "gossip/message.proto",
}
// gossip/comm/comm_impl.go
type commImpl struct {
idMapper identity.Mapper
}
func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
// 创建连接,流程包括向目标节点发送GossipMsg_Conn,并接收目标节点返回的GossipMessage_Conn信息,将其[PKIID: identity]存入到idMapper
connInfo, err := c.authenticateRemotePeer(stream, false, false)
conn := c.connStore.onConnected(stream, connInfo, c.metrics)
h := func(m *protoext.SignedGossipMessage) { // 处理接收到的gossip消息的函数
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
SignedGossipMessage: m,
connInfo: connInfo,
})
}
conn.handler = interceptAcks(h, connInfo.ID, c.pubSub) // 在以上函数之前判断是否为GM_Ack
return conn.serviceConnection()
}
func (conn *connection) serviceConnection() error {
errChan := make(chan error, 1)
msgChan := make(chan *protoext.SignedGossipMessage, conn.recvBuffSize)
// 读取信息,内容通过msgChan传递
go conn.readFromStream(errChan, msgChan)
// 发送内容,内容通过conn.outBuff传递
go conn.writeToStream()
for {
select {
case msg := <-msgChan:
conn.handler(msg) // 使用上面的处理器处理消息
}
}
}
8、将信息交给通道消息分发器ChannelDeMultiexer处理
// gossip/common/demux.go
func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{}) {
channels := m.channels // 不是fabric中的channel,而是订阅者模式中的channel,它通过pred判断是否要把信息写入到channel中,channel通过AddChannel进行注册
for _, ch := range channels {
if ch.pred(msg) {select {case ch.ch <- msg:}}
}
}
// 在启动Node的时候,会在实例化Node后,会调用Node.start()方法
// gossip/gossip/gossip_impl.go
func (g *Node) start() {
// 不接收GM_Conn, GM_Empty, GM_PrivateXXX
msgSelector := func(msg interface{}) bool {... }
// 向demux注册一个channel
incMsgs := g.comm.Accept(msgSelector)
// 开启协程去处理channel收到的消息
go g.acceptMessages(incMsgs)
}
func (g *Node) acceptMessages(incMsgs <-chan protoext.ReceivedMessage) {
for {
select {case msg := <-incMsgs: g.handleMessage(msg)}
}
}
9、将消息交给不同组件处理,channel内部消息交由GossipChannel处理
// gossip/gossip/gossip_impl.go
func (g *Node) handleMessage(m protoext.ReceivedMessage) {
msg := m.GetGossipMessage()
// 这是我们需要注意的
if protoext.IsChannelRestricted(msg.GossipMessage) { // 通道内部的消息
if gc := g.chanState.lookupChannelForMsg(m); gc == nil { // 未获取到消息所属通道
// 即使该节点不在该channel,也应该转发给组织内的其他节点,以防它是StateInfo消息
if g.IsInMyOrg(discovery.NetworkMember{PKIid: m.GetConnectionInfo().ID}) && protoext.IsStateInfoMsg(msg.GossipMessage) {
if g.stateInfoMsgStore.Add(msg) {
g.emitter.Add(...)
}
}
} else { // 取得到channel对象
// 如果是leader消息,额外验证一下
if protoext.IsLeadershipMsg(m.GetGossipMessage().GossipMessage) {
if err := g.validateLeadershipMessage(m.GetGossipMessage()); err != nil {return}
}
gc.HandleMessage(m)
}
return
}
// 处理仅发现消息,如GM_Alive GM_MemRes GM_MemReq等
if selectOnlyDiscoveryMessages(m) {
g.forwardDiscoveryMsg(m)
}
// 处理拉取机制用的消息,如GM_DataReq GM_DataUpdate GM_Hello GM_DataDig
if protoext.IsPullMsg(msg.GossipMessage) && protoext.GetPullMsgType(msg.GossipMessage) == pg.PullMsgType_IDENTITY_MSG {
g.certStore.handleMessage(m)
}
}
10、GossipChannel处理消息,Forward交给emit继续散播
// gossip/gossip/channel/channel.go
func (gc *gossipChannel) HandleMessage(msg protoext.ReceivedMessage) {
if !gc.verifyMsg(msg) {return}
// 取得
m := msg.GetGossipMessage()
if !protoext.IsChannelRestricted(m.GossipMessage) {return}
// 取得组织ID
orgID := gc.GetOrgOfPeer(msg.GetConnectionInfo().ID)
// 验证该组织处于channel中
if !gc.IsOrgInChannel(orgID) {return}
// 处理GM_StateInfoPullReq
if protoext.IsStateInfoPullRequestMsg(m.GossipMessage) {
msg.Respond(gc.createStateInfoSnapshot(orgID))
return
}
// 处理GM_StateInfoSnapshot
if protoext.IsStateInfoSnapshot(m.GossipMessage) {
gc.handleStateInfSnapshot(m.GossipMessage, msg.GetConnectionInfo().ID)
return
}
// 处理GM_DataMsg和GM_StateInfoMsg
if protoext.IsDataMsg(m.GossipMessage) || protoext.IsStateInfoMsg(m.GossipMessage) {
added := false
if protoext.IsDataMsg(m.GossipMessage) {
// 检测信息是否有效
if !gc.blockMsgStore.CheckValid(msg.GetGossipMessage()) {return }
// 验证区块
if !gc.verifyBlock(m.GossipMessage, msg.GetConnectionInfo().ID) {return}
added = gc.blockMsgStore.Add(msg.GetGossipMessage())
if added {
gc.blocksPuller.Add(msg.GetGossipMessage())
}
} else {
added = gc.stateInfoMsgStore.Add(msg.GetGossipMessage())
}
if added {
// Forward the message
gc.Forward(msg)
// DeMultiplex to local subscribers
gc.DeMultiplex(m)
}
return
}
// 处理拉取消息
if protoext.IsPullMsg(m.GossipMessage) && protoext.GetPullMsgType(m.GossipMessage) == proto.PullMsgType_BLOCK_MSG {}
// 处理领导消息
if protoext.IsLeadershipMsg(m.GossipMessage) {}
}
func (ga *gossipAdapterImpl) Forward(msg protoext.ReceivedMessage) {
ga.Node.emitter.Add(&emittedGossipMessage{
SignedGossipMessage: msg.GetGossipMessage(),
filter: msg.GetConnectionInfo().ID.IsNotSameFilter,
})
}
11、交给Demux分发器分发消息
当一个节点加入channel时,会调用InitializeChannel
// gossip/service/gossip_service.go
func (g *GossipService) InitializeChannel(...) {
g.chains[channelID] = state.NewGossipStateProvider(...)
}
// gossip/state/state.go
func NewGossipStateProvider(...) {
// 仅取得该channel的GM_DataMsg
gossipChan, _ := services.Accept(func(message interface{}) bool {
return protoext.IsDataMsg(message.(*proto.GossipMessage)) &&
bytes.Equal(message.(*proto.GossipMessage).Channel, []byte(chainID))
}, false)
go s.receiveAndQueueGossipMessages(gossipChan)
}
func (s *GossipStateProviderImpl) receiveAndQueueGossipMessages(ch <-chan *proto.GossipMessage) {
for msg := range ch {
go func(msg *proto.GossipMessage) {
dataMsg := msg.GetDataMsg()
if dataMsg != nil {
// 写入到Payload中,具体看上面
if err := s.addPayload(dataMsg.GetPayload(), nonBlocking); err != nil {return}
}
}(msg)
}
}
gossip重要子模块
Comm
通信子模块的地位:
它是实际上注册在GRPC服务器上的Gossip服务的实现类,它将接收所有的Gossip消息,并交给其他模块来处理这些消息
// gossip/comm/comm_impl.go
func NewCommInstance(...){
proto.RegisterGossipServer(s, commInst)
}
var _Gossip_serviceDesc = grpc.ServiceDesc{
ServiceName: "gossip.Gossip",
HandlerType: (*GossipServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Ping",
Handler: _Gossip_Ping_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "GossipStream",
Handler: _Gossip_GossipStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "gossip/message.proto",
}
它会负责消息发送的功能,比如其Send功能
数据结构
type commImpl struct {
sa api.SecurityAdvisor // 根据peer取得其组织
tlsCerts *common.TLSCertificates // 该节点的证书
pubSub *util.PubSub
peerIdentity api.PeerIdentityType // 身份
idMapper identity.Mapper
connStore *connectionStore // 连接缓存
PKIID []byte // 该peer的PKIID
msgPublisher *ChannelDeMultiplexer // 消息发布器
subscriptions []chan protoext.ReceivedMessage // 订阅到的消息通道
}
type ChannelDeMultiplexer struct {
channels []*channel // 注册的channel
}
type connectionStore struct {
connFactory connFactory // creates a connection to remote peer
pki2Conn map[string]*connection // mapping between pkiID to connections
destinationLocks map[string]*sync.Mutex // mapping between pkiIDs and locks,
}
核心功能,发送消息Send
func (c *commImpl) Send(msg *protoext.SignedGossipMessage, peers ...*RemotePeer) {
for _, peer := range peers {
go func(peer *RemotePeer, msg *protoext.SignedGossipMessage) {
c.sendToEndpoint(peer, msg, nonBlockingSend)
}(peer, msg)
}
}gossip重要子模块
Comm
通信子模块的地位:
它是实际上注册在GRPC服务器上的Gossip服务的实现类,它将接收所有的Gossip消息,并交给其他模块来处理这些消息

// gossip/comm/comm_impl.go
func NewCommInstance(...){
proto.RegisterGossipServer(s, commInst)
}
var _Gossip_serviceDesc = grpc.ServiceDesc{
ServiceName: "gossip.Gossip",
HandlerType: (*GossipServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Ping",
Handler: _Gossip_Ping_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "GossipStream",
Handler: _Gossip_GossipStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "gossip/message.proto",
}
它会负责消息发送的功能,比如其Send功能
数据结构

type commImpl struct {
sa api.SecurityAdvisor // 根据peer取得其组织
tlsCerts *common.TLSCertificates // 该节点的证书
pubSub *util.PubSub
peerIdentity api.PeerIdentityType // 身份
idMapper identity.Mapper
connStore *connectionStore // 连接缓存
PKIID []byte // 该peer的PKIID
msgPublisher *ChannelDeMultiplexer // 消息发布器
subscriptions []chan protoext.ReceivedMessage // 订阅到的消息通道
}
type ChannelDeMultiplexer struct {
channels []*channel // 注册的channel
}
type connectionStore struct {
connFactory connFactory // creates a connection to remote peer
pki2Conn map[string]*connection // mapping between pkiID to connections
destinationLocks map[string]*sync.Mutex // mapping between pkiIDs and locks,
}
核心功能,发送消息Send

func (c *commImpl) Send(msg *protoext.SignedGossipMessage, peers ...*RemotePeer) {
for _, peer := range peers {
go func(peer *RemotePeer, msg *protoext.SignedGossipMessage) {
c.sendToEndpoint(peer, msg, nonBlockingSend)
}(peer, msg)
}
}
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *protoext.SignedGossipMessage, shouldBlock blockingBehavior) {
conn, err := c.connStore.getConnection(peer)
if err == nil {
// 发送消息
conn.send(msg, disConnectOnErr, shouldBlock)
return
}
c.disconnect(peer.PKIID)
}
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {
pkiID := peer.PKIID
endpoint := peer.Endpoint
conn, exists := cs.pki2Conn[string(pkiID)]
if exists {return conn, nil}
createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)
// 再次确认,可能会出现连接创建期间,就已经被对方先创建成功
conn, exists = cs.pki2Conn[string(pkiID)]
if exists {
if createdConnection != nil {
createdConnection.close()
}
return conn, nil
}
// 此刻,我们已经知道节点最新的PKIID,但有可能我们要连接的peer是修改过PKIID的,确保我们没有已经连接着新的PKIID冲突的老节点
if conn, exists := cs.pki2Conn[string(createdConnection.pkiID)]; exists {
conn.close()
}
// 此刻,正式创建connection完毕
conn = createdConnection
cs.pki2Conn[string(createdConnection.pkiID)] = conn
// 处理消息
go conn.serviceConnection()
return conn, nil
}
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
// 创建连接
cc, err = grpc.DialContext(ctx, endpoint, dialOpts...)
// 创建GossipClient
cl := proto.NewGossipClient(cc)
// cli去ping目标节点,ping的实现方式为发送GM_Empty
if _, err = cl.Ping(ctx, &proto.Empty{}); err != nil {cc.Close() return nil, errors.WithStack(err)}
// 创建Gossip_GossipStreamClient
if stream, err = cl.GossipStream(ctx); err == nil {
// 此部分代码和GossipStream高度一致,此处它以initiator连接的发起者处理期间的消息,而GossipStream中则是以other peer的身份去处理消息
connInfo, err = c.authenticateRemotePeer(stream, true, false)
if err == nil {
pkiID = connInfo.ID
// pkiid被修改的情况,有些特殊处理
if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {...}
conn := newConnection(cl, cc, stream, c.metrics, connConfig)
h := func(m *protoext.SignedGossipMessage) {
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
SignedGossipMessage: m,
connInfo: connInfo,
})
}
// 设置该连接处理消息的方式
conn.handler = interceptAcks(h, connInfo.ID, c.pubSub)
return conn, nil
}
c.logger.Warningf("Authentication failed: %+v", err)
}
cc.Close()
cancel()
return nil, errors.WithStack(err)
}
// 验证远程节点,生成ConnectionInfo
func (c *commImpl) authenticateRemotePeer(stream stream, initiator, isProbe bool) (*protoext.ConnectionInfo, error) {
ctx := stream.Context()
remoteAddress := extractRemoteAddress(stream)
remoteCertHash := extractCertificateHashFromContext(ctx)
if useTLS {
certReference := c.tlsCerts.TLSServerCert
if initiator {
certReference = c.tlsCerts.TLSClientCert
}
selfCertHash = certHashFromRawCert(certReference.Load().(*tls.Certificate).Certificate[0])
}
signer := func(msg []byte) ([]byte, error) {
return c.idMapper.Sign(msg)
}
// 创建GM_Conn
cMsg, err = c.createConnectionMsg(c.PKIID, selfCertHash, c.peerIdentity, signer, isProbe)
// 发送消息并取得响应
stream.Send(cMsg.Envelope)
m, err := readWithTimeout(stream, c.connTimeout, remoteAddress)
receivedMsg := m.GetConn()
// 将身份存储到idMapper中
err = c.idMapper.Put(receivedMsg.PkiId, receivedMsg.Identity)
connInfo := &protoext.ConnectionInfo{
ID: receivedMsg.PkiId,
Identity: receivedMsg.Identity,
Endpoint: remoteAddress,
Auth: &protoext.AuthInfo{
Signature: m.Signature,
SignedData: m.Payload,
},
}
// 确保context中对方节点的证书hash和对方回复Conn消息提供的tlsCertHash一致
if useTLS {
// If the remote peer sent its TLS certificate, make sure it actually matches the TLS cert
// that the peer used.
if !bytes.Equal(remoteCertHash, receivedMsg.TlsCertHash) {
return nil, errors.Errorf("Expected %v in remote hash of TLS cert, but got %v", remoteCertHash, receivedMsg.TlsCertHash)
}
}
// 验证消息签名
verifier := func(peerIdentity []byte, signature, message []byte) error {
pkiID := c.idMapper.GetPKIidOfCert(peerIdentity)
return c.idMapper.Verify(pkiID, signature, message)
}
err = m.Verify(receivedMsg.Identity, verifier)
return connInfo, nil
}
发送消息和接收消息的流程总结:
在执行Send期间它会调用authenticateRemotePeer方法,它作为Initiator会创建Conn消息,并通过grpc的conn发送,并等待目标节点回复。
目标节点收到消息后,会交给GossipStream,它也会调用authenticateRemotePeer方法,它作为other peer创建一个Conn消息,发送给Initiator,此时Initiator收到Conn消息,将与other peer的连接信息存入connStore中,而发送消息后,再读取出之前Initiator发送的Conn消息,将与Initiator的连接信息存入到connStore。
此后,Initiator正式发送真正有用的其他类型消息,而other peer也会继续向conn中读取消息分发给不同channel
核心功能,处理GossipStream
func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
// 形成连接
connInfo, err := c.authenticateRemotePeer(stream, false, false)
conn := c.connStore.onConnected(stream, connInfo, c.metrics)
h := func(m *protoext.SignedGossipMessage) {
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
SignedGossipMessage: m,
connInfo: connInfo,
})
}
conn.handler = interceptAcks(h, connInfo.ID, c.pubSub)
for {
select {
case msg := <-msgChan:
conn.handler(msg)
}
}
// 简单来说就是,对于所有消息都交给msgPublisher.DeMultiplex函数
}
func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{}) {
channels := m.channels
// 如果该消息是该channel预期想要的,就发给该channel
for _, ch := range channels {
if ch.pred(msg) {
select {
case ch.ch <- msg:
}
}
}
}
// channel通过AddChannel进行订阅,返回可以获得该channel消息的chan
func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) <-chan interface{} {
bidirectionalCh := make(chan interface{}, 10)
ch := &channel{ch: bidirectionalCh, pred: predicate}
m.channels = append(m.channels, ch)
return bidirectionalCh
}
// 以上方法一般会被comm封装成Accept
func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan protoext.ReceivedMessage {
genericChan := c.msgPublisher.AddChannel(acceptor)
specificChan := make(chan protoext.ReceivedMessage, 10)
c.subscriptions = append(c.subscriptions, specificChan)
go func() {
for {
select {
case msg, channelOpen := <-genericChan:
select {
case specificChan <- msg.(*ReceivedMessageImpl):
}
}
}
}()
return specificChan
}
// 以上方法仅会被Node调用,且会被它封装,这里特别注意,passThrough为false时,返回的是ReceivedMessage所包含的GossipMessage,往往用于二次处理消息,如gossipChannel接收DataMsg后,重新把msg的GossipMessage放入分发器,供stateProvider继续使用
func (g *Node) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *pg.GossipMessage, <-chan protoext.ReceivedMessage) {
if passThrough { // 直接调用
return nil, g.comm.Accept(acceptor)
}
// 把SignedGossipMessage拆出GossipMessage来判断
acceptByType := func(o interface{}) bool {...}
inCh := g.AddChannel(acceptByType)
outCh := make(chan *pg.GossipMessage, acceptChanSize)
go func() {
for {
select {
case m, channelOpen := <-inCh:
select {
case outCh <- m.(*protoext.SignedGossipMessage).GossipMessage:
}
}
}
}()
return outCh, nil
}
gossip会订阅哪些通道
// gossip/gossip/gossip_impl.go
// 主要的消息处理,由Node负责
func (g *Node) start() {
// 过滤掉Conn Empty PrivateData三种GossipMessage
msgSelector := func(msg interface{}) bool {
gMsg, isGossipMsg := msg.(protoext.ReceivedMessage)
isConn := gMsg.GetGossipMessage().GetConn() != nil
isEmpty := gMsg.GetGossipMessage().GetEmpty() != nil
isPrivateData := protoext.IsPrivateDataMsg(gMsg.GetGossipMessage().GossipMessage)
return !(isConn || isEmpty || isPrivateData)
}
incMsgs := g.comm.Accept(msgSelector)
go g.acceptMessages(incMsgs)
}
// gossip/election/adapter.go
// 用于leader节点选举
func (ai *adapterImpl) Accept() <-chan Msg {
// 注意此处订阅的是GossipMessage,而不是Received
adapterCh, _ := ai.gossip.Accept(func(message interface{}) bool {
// 仅取得同channel,channel和org级别的GM_LeadershipMsg
return message.(*proto.GossipMessage).Tag == proto.GossipMessage_CHAN_AND_ORG &&
protoext.IsLeadershipMsg(message.(*proto.GossipMessage)) &&
bytes.Equal(message.(*proto.GossipMessage).Channel, ai.channel)
}, false)
return msgCh
}
// gossip/privdata/pull.go
// 私有数据拉取器
func NewPuller(...) *puller {
// 只要该channel的GM_PrivateDataMsg
_, p.msgChan = p.Accept(func(o interface{}) bool {
msg := o.(protoext.ReceivedMessage).GetGossipMessage()
if !bytes.Equal(msg.Channel, []byte(p.channel)) {
return false
}
return protoext.IsPrivateDataMsg(msg.GossipMessage)
}, true)
return p
}
// gossip/state/state.go
// gossipStateProvider处理DataMsg
func NewGossipStateProvider(...) {
// 仅处理DataMsg
gossipChan, _ := services.Accept(func(message interface{}) bool {
return protoext.IsDataMsg(message.(*proto.GossipMessage)) &&
bytes.Equal(message.(*proto.GossipMessage).Channel, []byte(chainID))
}, false)
// 处理不包含私有数据的StateReq或StateRes
remoteStateMsgFilter := func(message interface{}) bool {
receivedMsg := message.(protoext.ReceivedMessage)
msg := receivedMsg.GetGossipMessage()
// 必须是StateReq或StateRes消息或PrivateDataMsg
if !(protoext.IsRemoteStateMessage(msg.GossipMessage) || msg.GetPrivateData() != nil) {
return false
}
// 必须为本channel
if !bytes.Equal(msg.Channel, []byte(chainID)) {
return false
}
connInfo := receivedMsg.GetConnectionInfo()
authErr := services.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData)
return true
}
}
GossipChannel
type channelState struct {
channels map[channelId]channel.GossipChannel
g *Node
}
// 添加该节点到channel
func (cs *channelState) joinChannel(joinMsg api.JoinChannelMessage, channelID common.ChannelID,
metrics *metrics.MembershipMetrics) {
if gc, exists := cs.channels[string(channelID)]; !exists {
pkiID := cs.g.comm.GetPKIid()
ga := &gossipAdapterImpl{Node: cs.g, Discovery: cs.g.disc}
gc := channel.NewGossipChannel(pkiID, cs.g.selfOrg, cs.g.mcs, channelID, ga, joinMsg, metrics, nil)
cs.channels[string(channelID)] = gc
} else {
gc.ConfigureChannel(joinMsg)
}
}
type gossipChannel struct {
Adapter
mcs api.MessageCryptoService
pkiID common.PKIidType
selfOrg api.OrgIdentityType
selfStateInfoMsg *proto.GossipMessage
selfStateInfoSignedMsg *protoext.SignedGossipMessage
orgs []api.OrgIdentityType
joinMsg api.JoinChannelMessage // 使该节点加入到channel的msg
blockMsgStore msgstore.MessageStore // 存储区块相关GossipMessage
stateInfoMsgStore *stateInfoCache
leaderMsgStore msgstore.MessageStore
chainID common.ChannelID
blocksPuller pull.Mediator // 区块拉取工具
memFilter *membershipFilter // 节点发现服务的adapter
ledgerHeight uint64
leftChannel int32
}
func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.MessageCryptoService,
channelID common.ChannelID, adapter Adapter, joinMsg api.JoinChannelMessage,
metrics *metrics.MembershipMetrics, logger util.Logger) GossipChannel {
gc := &gossipChannel{...}
gc.memFilter = &membershipFilter{adapter: gc.Adapter, gossipChannel: gc}
comparator := protoext.NewGossipMessageComparator(adapter.GetConf().MaxBlockCountToStore)
gc.blocksPuller = gc.createBlockPuller()
// 初始化msgStore
gc.blockMsgStore = msgstore.NewMessageStoreExpirable(comparator, func(m interface{}) {
gc.blocksPuller.Remove(seqNumFromMsg(m))
}, gc.GetConf().BlockExpirationInterval, nil, nil, func(m interface{}) {
gc.blocksPuller.Remove(seqNumFromMsg(m))
})
// 初始化stateInfoMsgStore
gc.stateInfoMsgStore = newStateInfoCache(gc.GetConf().StateInfoCacheSweepInterval, hashPeerExpiredInMembership, verifyStateInfoMsg)
// 初始化通道状态信息
gc.updateProperties(1, nil, false)
gc.setupSignedStateInfoMessage()
ttl := adapter.GetConf().MsgExpirationTimeout
pol := protoext.NewGossipMessageComparator(0)
// 初始化leaderMsgStore
gc.leaderMsgStore = msgstore.NewMessageStoreExpirable(pol, msgstore.Noop, ttl, nil, nil, nil)
gc.ConfigureChannel(joinMsg)
// 定期发送该节点的状态信息
go gc.periodicalInvocation(gc.publishStateInfo, gc.stateInfoPublishScheduler.C)
// 定期请求其他节点的状态信息
go gc.periodicalInvocation(gc.requestStateInfo, gc.stateInfoRequestScheduler.C)
return gc
}
func (gc *gossipChannel) publishSignedStateInfoMessage() {
stateInfoMsg, err := gc.setupSignedStateInfoMessage() // 给该节点的stateInfo签名
gc.stateInfoMsgStore.Add(stateInfoMsg)
gc.Gossip(stateInfoMsg)
}
func (gc *gossipChannel) requestStateInfo() {
req, err := gc.createStateInfoRequest() // 创建GM_StateInfoPullReq
endpoints := filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan) // 节点发现服务找出合适节点
gc.Send(req, endpoints...)
}
// 处理channel级别的GossipMessage
func (gc *gossipChannel) HandleMessage(msg protoext.ReceivedMessage) {
// 如果是StateInfoPullRequestMsg,接受StateInfoPull请求,创建StateInfoSnapShot并响应
msg.Respond(gc.createStateInfoSnapshot(orgID))
// 如果是StateInfoSnapshot,接收StateInfoSnapShot并处理后加入到msgStore中
gc.handleStateInfSnapshot(m.GossipMessage, msg.GetConnectionInfo().ID)
// 如果是DataMsg,接收DataMsg,加入到msgStore,并继续散播,且交给通道分发器分发,该情况详细见散播流程8、11,这里重点是StateInfo相关消息
added = gc.blockMsgStore.Add(msg.GetGossipMessage())
if added {
gc.blocksPuller.Add(msg.GetGossipMessage())
}
// 如果是StateInfoMsg,接收StateInfoMsg,加入到msgStore,并继续散播,且交给通道分发器分发
added = gc.stateInfoMsgStore.Add(msg.GetGossipMessage())
// 如果是DataMsg、StateInfoMsg,成功加入到msgStore后
if added {
// 继续散播消息
gc.Forward(msg)
// 分发消息到本地订阅者
gc.DeMultiplex(m)
}
// Pull和Snapshot为一对,有请求和响应和一对一的节点,StateInfoMsg为散播消息
}
func (gc *gossipChannel) createStateInfoSnapshot(requestersOrg api.OrgIdentityType) *proto.GossipMessage {
sameOrg := bytes.Equal(gc.selfOrg, requestersOrg)
rawElements := gc.stateInfoMsgStore.Get()
elements := []*proto.Envelope{}
for _, rawEl := range rawElements {
msg := rawEl.(*protoext.SignedGossipMessage)
orgOfCurrentMsg := gc.GetOrgOfPeer(msg.GetStateInfo().PkiId)
// 如果请求者和该节点是一个组织,无需过滤
if sameOrg || !bytes.Equal(orgOfCurrentMsg, gc.selfOrg) {
elements = append(elements, msg.Envelope)
continue
}
// 如果不是一个组织的,则仅暴露和该消息相关的AliveMessage具有外部端点的消息
if netMember := gc.Lookup(msg.GetStateInfo().PkiId); netMember == nil || netMember.Endpoint == "" {
continue
}
elements = append(elements, msg.Envelope)
}
return &proto.GossipMessage{
Channel: gc.chainID,
Tag: proto.GossipMessage_CHAN_OR_ORG,
Content: &proto.GossipMessage_StateSnapshot{
StateSnapshot: &proto.StateInfoSnapshot{
Elements: elements,
},
},
}
}
func (gc *gossipChannel) handleStateInfSnapshot(m *proto.GossipMessage, sender common.PKIidType) {
chanName := string(gc.chainID)
for _, envelope := range m.GetStateSnapshot().Elements {
stateInf, err := protoext.EnvelopeToGossipMessage(envelope)
if !protoext.IsStateInfoMsg(stateInf.GossipMessage) {return} // 确保为StateInfoMsg
si := stateInf.GetStateInfo()
orgID := gc.GetOrgOfPeer(si.PkiId)
if !gc.IsOrgInChannel(orgID) {return} // 确保组织参加到channel
expectedMAC := GenerateMAC(si.PkiId, gc.chainID)
// 验证StateInfoMsg
err = gc.ValidateStateInfoMessage(stateInf)
if gc.Lookup(si.PkiId) == nil {continue} // 查看发送该消息的节点是否存活
gc.stateInfoMsgStore.Add(stateInf) // 存储stateInfoMsg
}
}
MsgStore
数据结构
type MessageStore interface {
// 添加消息
Add(msg interface{}) bool
// 检测是否可以有效加入到存储器中
CheckValid(msg interface{}) bool
Size() int
// 取得所有消息
Get() []interface{}
Stop()
// 清除所有会被传入方法判定为true的消息
Purge(func(interface{}) bool)
}
// 消息存储器实现类
type messageStoreImpl struct {
pol common.MessageReplacingPolicy // 消息替代方法,分为A使B无效,B使A无效,AB之间没有影响
messages []*msg // 存储的消息
invTrigger invalidationTrigger // 失效触发器,新增消息导致原有消息失效时,触发
msgTTL time.Duration // 消息存活时间
expiredCount int // 过期消息计数
expireMsgCallback func(msg interface{}) // 过期消息触发
}
创建方式
// 创建MsgStore
func NewMessageStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) MessageStore {
return newMsgStore(pol, trigger)
}
// 创建有过期功能的MsgStore
func NewMessageStoreExpirable(pol common.MessageReplacingPolicy, trigger invalidationTrigger, msgTTL time.Duration, externalLock func(), externalUnlock func(), externalExpire func(interface{})) MessageStore {
store := newMsgStore(pol, trigger)
store.msgTTL = msgTTL
if externalLock != nil { store.externalLock = externalLock }
if externalUnlock != nil { store.externalUnlock = externalUnlock }
if externalExpire != nil { store.expireMsgCallback = externalExpire }
go store.expirationRoutine() // 启动过期协程
return store
}
func newMsgStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) *messageStoreImpl {
return &messageStoreImpl{...}
}
核心实现
func (s *messageStoreImpl) Add(message interface{}) bool {
n := len(s.messages)
for i := 0; i < n; i++ {
m := s.messages[i]
switch s.pol(message, m.data) { // 判断新消息和已有消息之间的关系
case common.MessageInvalidated: // 该消息会被已有消息失效,加入失败
return false
case common.MessageInvalidates: // 该消息会导致已有消息失效
s.invTrigger(m.data) // 触发失效触发器
s.messages = append(s.messages[:i], s.messages[i+1:]...) // 删除原消息
n-- i--
}
}
s.messages = append(s.messages, &msg{data: message, created: time.Now()}) // 新增该消息
return true
}
func (s *messageStoreImpl) Purge(shouldBePurged func(interface{}) bool) {
// 是否有清除的必要
if !s.isPurgeNeeded(shouldMsgBePurged) {return}
n := len(s.messages)
for i := 0; i < n; i++ {
// 判断是否要删除
if !shouldMsgBePurged(s.messages[i]) {continue}
// 触发失效触发器
s.invTrigger(s.messages[i].data)
// 删除消息
s.messages = append(s.messages[:i], s.messages[i+1:]...)
n-- i--
}
}
func (s *messageStoreImpl) expirationRoutine() {
for {
select {
case <-time.After(s.expirationCheckInterval()): // 过期检测,过期时间的百分之一
hasMessageExpired := func(m *msg) bool {...} // 是否过期
if s.isPurgeNeeded(hasMessageExpired) {
s.expireMessages()
}
}
}
}
func (s *messageStoreImpl) expireMessages() {
n := len(s.messages)
for i := 0; i < n; i++ {
m := s.messages[i]
if !m.expired {
// 超过1倍过期时间,先标记为过期,并触发过期回调函数
if time.Since(m.created) > s.msgTTL {
m.expired = true
s.expireMsgCallback(m.data)
s.expiredCount++
}
} else {
// 超过两倍过期时间,直接删去该消息
if time.Since(m.created) > (s.msgTTL * 2) {
s.messages = append(s.messages[:i], s.messages[i+1:]...)
n--
i--
s.expiredCount--
}
}
}
}
应用场景
// Node的状态信息数据存储器
Node.stateInfoMsgStore = msgstore.NewMessageStoreExpirable(...)
// GossipChannel有三种类型的MsgStore
gossipChannel.blockMsgStore = msgstore.NewMessageStoreExpirable(...) // 失效触发器为gc.blocksPuller.Remove(seqNumFromMsg(m))
gossipChannel.leaderMsgStore = msgstore.NewMessageStoreExpirable(...) // 不进行额外的 *** 作
gc.stateInfoMsgStore = newStateInfoCache(...)
// 它会开启一个协程,不断调用 msgStore.Purge(hasExpired)
type stateInfoCache struct {
msgstore.MessageStore = msgstore.NewMessageStore(pol, invalidationTrigger)
}
// DiscoveryImpl的AliveMsgStore
discoveryImpl.msgStore = &aliveMsgStore{MessageStore: msgstore.NewMessageStoreExpirable(...),}
Pull
Pull的数据结构
// Mediator 是一个封装了 PullEngine 的组件,并提供了执行拉取同步所需的方法。 pull mediator 对特定类型消息的专门化是通过配置完成的,一个 IdentifierExtractor,在构造时给出的 IdentifierExtractor,以及可以为每种类型的 pullMsgType(hello、digest、req、res)注册的钩子。
type Mediator interface {
// 将MsgHook注册到特定类型的PullMsg
RegisterMsgHook(MsgType, MessageHook)
// 添加一个GossipMsg加入到Mediator中
Add(*protoext.SignedGossipMessage)
// 删除消息
Remove(digest string)
// 处理pull消息
HandleMessage(msg protoext.ReceivedMessage)
}
type pullMediatorImpl struct {
*PullAdapter
msgType2Hook map[MsgType][]MessageHook // 消息到对应狗子函数的映射
config Config
itemID2Msg map[string]*protoext.SignedGossipMessage // itemID到对应的GossipMessage
engine *algo.PullEngine // 拉取模块的核心
}
// pullAdapter和config用于让拉取适配不同场景,目前拉取主要用于拉取身份和拉取区块两种功能
type PullAdapter struct {
Sndr Sender 区块: GossipChannel(本质还是Comm) 身份: Comm
MemSvc MembershipService 区块: GossipChannel.memFilter 身份: Dicovery
IdExtractor IdentifierExtractor 区块: BlockSeq 身份: PKIID
MsgCons MsgConsumer 区块: 用channel分发器分发 身份: 存入到IdMapper中
EgressDigFilter EgressDigestFilter 区块: / 身份: sameOrgOrOurOrgPullFilter
IngressDigFilter IngressDigestFilter 区块: 过滤掉低于当前高度的区块 身份: /
}
type PullEngine struct {
PullAdapter // pullMediatorImpl本身
state *util.Set // digest的集合,存储了该节点已经接收到的消息的digest
item2owners map[string][]string // itemId到该消息持有者的映射
peers2nonces map[string]uint64 // 节点到随机数的映射
nonces2peers map[uint64]string // 随机数到节点的映射
outgoingNONCES *util.Set // 向其他节点请求的请求随机数集合
incomingNONCES *util.Set // 受其他节点请求的响应随机数集合
acceptingDigests: int32(0), // 表示允许接收GM_DataDig
acceptingResponses: int32(0), // 表示允许接收GM_DataUpdate
digFilter DigestFilter
}
Pull的协议机制和PullEngine与pullMediatorImpl
/* PullEngine is an object that performs pull-based gossip, and maintains an internal state of items
identified by string numbers.
The protocol is as follows:
1) Initiator向其他peer节点发出一个GM_Hello消息,并伴随一个随机数,它类似于一次拉取的口令来识别互相
2) 其他节点会回复一个GM_DataDig消息,它会包含它已经存储在state的消息的摘要和随机数
3) Initiator验证随机数,并找出自己所需的消息的digest集合,和随机数构成GM_DataReq
4) 其他peer节点将请求的消息和随机数封装为GM_DataUpdate
Other peer Initiator
O <-------- Hello ------------------------- O
/|\ --------- Digest <[3,5,8, 10...], NONCE> --------> /|\
| <-------- Request <[3,8], NONCE> ----------------- |
/ \ --------- Response <[item3, item8], NONCE>-------> / \
*/
PullEngine用于处理接收到的消息,而pullMediatorImpl通过其Adapter提供的组件来实现对消息的发送。
整个流程的起点是PullEngine在初始化时启动一个协程定期调用InitiatePull(),首先我们以Initiator的视角去看它是怎么处理的
func (engine *PullEngine) initiatePull() {
engine.acceptDigests() // 设置允许接收Dig的状态
for _, peer := range engine.SelectPeers() {
nonce := engine.newNONCE() // 产生一个随机数
engine.outgoingNONCES.Add(nonce) // 添加请求随机数
engine.nonces2peers[nonce] = peer // 产生节点与随机数的双映射
engine.peers2nonces[peer] = nonce
engine.Hello(peer, nonce) // 向目标节点发送GM_Hello
}
time.AfterFunc(engine.digestWaitTime, func() { // 等待一段时候处理收到的MG_DataDig消息,存入到item2owner,之后通过该函数统一处理
engine.processIncomingDigests()
})
}
func (p *pullMediatorImpl) Hello(dest string, nonce uint64) {
helloMsg := &gossip.GossipMessage{...} // 构建GM_Hello
sMsg, err := protoext.NoopSign(helloMsg) // 不签名,但是封装成已签名的GM
p.Sndr.Send(sMsg, p.peersWithEndpoints(dest)...) // 通过adapter的memSrv来找到要发送的节点,并通过adpater的Sndr发送给目标节点
}
// 节点接收到其他节点发送的Digest
func (engine *PullEngine) OnDigest(digest []string, nonce uint64, context interface{}) {
// 如果允许接收Dig,且随机数匹配
if !engine.isAcceptingDigests() || !engine.outgoingNONCES.Exists(nonce) {return}
for _, n := range digest {
// 该消息已经接收,这抛弃
if engine.state.Exists(n) {continue}
// 初始化一下
if _, exists := engine.item2owners[n]; !exists {engine.item2owners[n] = make([]string, 0)}
// 添加该消息的持有者,一个消息允许有多个持有者
engine.item2owners[n] = append(engine.item2owners[n], engine.nonces2peers[nonce])
}
}
// 节点已经接收到一些Dig消息,存入到item2owners中,现在统一处理
func (engine *PullEngine) processIncomingDigests() {
engine.ignoreDigests() // 不再允许接收新的Dig消息
requestMapping := make(map[string][]string)
for n, sources := range engine.item2owners {
// 随机找个该消息的持有者
source := sources[util.RandomInt(len(sources))]
// 初始化一下
if _, exists := requestMapping[source]; !exists {requestMapping[source] = make([]string, 0)}
// 整理成对每个peer节点要请求哪些消息的映射
requestMapping[source] = append(requestMapping[source], n)
}
// 开始允许接收GM_DataUpdate
engine.acceptResponses()
// 向各个节点发送GM_DataReq
for dest, seqsToReq := range requestMapping {
engine.SendReq(dest, seqsToReq, engine.peers2nonces[dest])
}
// 等待接收一些DataUpdate,添加到state中,之后再初始化engine
time.AfterFunc(engine.responseWaitTime, engine.endPull)
}
// 发送GM_DataReq,逻辑和上面的Hello一致
func (p *pullMediatorImpl) SendReq(dest string, items []string, nonce uint64) {
req := &gossip.GossipMessage{...}
sMsg, err := protoext.NoopSign(req)
p.Sndr.Send(sMsg, p.peersWithEndpoints(dest)...)
}
// 处理GM_DataUpdate,它只是记录收到了哪些消息,具体的消息存储在pullMediatorImpl中
func (engine *PullEngine) OnRes(items []string, nonce uint64) {
// 允许接收响应且随机数匹配
if !engine.outgoingNONCES.Exists(nonce) || !engine.isAcceptingResponses() {return}
engine.Add(items...)
}
// 添加到state中
func (engine *PullEngine) Add(seqs ...string) {
for _, seq := range seqs {
engine.state.Add(seq)
}
}
// 统一处理接收到的响应
func (engine *PullEngine) endPull() {
// 不再接收响应
atomic.StoreInt32(&(engine.acceptingResponses), int32(0))
// 初始化这四个集合,他们一轮pull的临时变量
engine.outgoingNONCES.Clear()
engine.item2owners = make(map[string][]string)
engine.peers2nonces = make(map[string]uint64)
engine.nonces2peers = make(map[uint64]string)
}
其他peer节点
func (engine *PullEngine) OnHello(nonce uint64, context interface{}) {
// 添加响应随机数
engine.incomingNONCES.Add(nonce)
// 等待其请求时限,超时删除该随机数
time.AfterFunc(engine.requestWaitTime, func() {
engine.incomingNONCES.Remove(nonce)
})
// 罗列消息
a := engine.state.ToArray()
var digest []string
filter := engine.digFilter(context)
for _, item := range a {
dig := item.(string)
// 过滤消息
if !filter(dig) {continue}
digest = append(digest, dig)
}
// 发送Dig消息,跟上面基本一样,就不贴代码了
engine.SendDigest(digest, nonce, context)
}
// 处理GM_DataReq,很简单
func (engine *PullEngine) OnReq(items []string, nonce uint64, context interface{}) {
if !engine.incomingNONCES.Exists(nonce) {return}
filter := engine.digFilter(context)
var items2Send []string
for _, item := range items {
if engine.state.Exists(item) && filter(item) {
items2Send = append(items2Send, item)
}
}
go engine.SendRes(items2Send, context, nonce)
}
// 发送GM_DataUpdate,这里不一样一点
func (p *pullMediatorImpl) SendRes(items []string, context interface{}, nonce uint64) {
items2return := []*gossip.Envelope{}
for _, item := range items {
// 数据保存在pullMediator中,需要在此处取出
if msg, exists := p.itemID2Msg[item]; exists {
items2return = append(items2return, msg.Envelope)
}
}
returnedUpdate := &gossip.GossipMessage{...}
remotePeer := context.(protoext.ReceivedMessage).GetConnectionInfo()
context.(protoext.ReceivedMessage).Respond(returnedUpdate)
}
以上OnXXX的调用是通过PullMediatorImpl的HandleMessage来调用的
func (p *pullMediatorImpl) HandleMessage(m protoext.ReceivedMessage) {
msg := m.GetGossipMessage()
msgType := protoext.GetPullMsgType(msg.GossipMessage)
itemIDs := []string{}
items := []*protoext.SignedGossipMessage{}
var pullMsgType MsgType
// 收到GM_Hello,调用OnHello
if helloMsg := msg.GetHello(); helloMsg != nil {
pullMsgType = HelloMsgType
p.engine.OnHello(helloMsg.Nonce, m)
} else if digest := msg.GetDataDig(); digest != nil {
// 收到GM_DataDig,先调用IngressDigFilter,过滤掉一些无用的digest,再调用OnDigest
d := p.PullAdapter.IngressDigFilter(digest)
itemIDs = util.BytesToStrings(d.Digests)
pullMsgType = DigestMsgType
p.engine.OnDigest(itemIDs, d.Nonce, m)
} else if req := msg.GetDataReq(); req != nil {
// 收到GM_DataReq,调用OnReq
itemIDs = util.BytesToStrings(req.Digests)
pullMsgType = RequestMsgType
p.engine.OnReq(itemIDs, req.Nonce, m)
} else if res := msg.GetDataUpdate(); res != nil {
// 收到GM_DataUpdate,调用OnRes
itemIDs = make([]string, len(res.Data))
items = make([]*protoext.SignedGossipMessage, len(res.Data))
pullMsgType = ResponseMsgType
for i, pulledMsg := range res.Data {
// 将Envelope转化为GossipMessage
msg, err := protoext.EnvelopeToGossipMessage(pulledMsg)
// 使用掉该Msg
p.MsgCons(msg)
// 生成其itemId,并将其存入到itemID2Msg中
itemIDs[i] = p.IdExtractor(msg)
items[i] = msg
p.itemID2Msg[itemIDs[i]] = msg
}
// 调用OnRes
p.engine.OnRes(itemIDs, res.Nonce)
}
// 对于不同类型的信息调用hook方法
for _, h := range p.hooksByMsgType(pullMsgType) {
h(itemIDs, items, m)
}
}
应用场景
1、GossipChannel中的BlockPuller
初始化
func NewGossipChannel(...){
gc.blocksPuller = gc.createBlockPuller()
}
func (gc *gossipChannel) createBlockPuller() pull.Mediator {
conf := pull.Config{
MsgType: proto.PullMsgType_BLOCK_MSG,
Channel: []byte(gc.chainID),
ID: gc.GetConf().ID,
PeerCountToSelect: gc.GetConf().PullPeerNum,
PullInterval: gc.GetConf().PullInterval,
Tag: proto.GossipMessage_CHAN_AND_ORG,
PullEngineConfig: algo.PullEngineConfig{
DigestWaitTime: gc.GetConf().DigestWaitTime,
RequestWaitTime: gc.GetConf().RequestWaitTime,
ResponseWaitTime: gc.GetConf().ResponseWaitTime,
},
}
// 构成itemID的方法
seqNumFromMsg := func(msg *protoext.SignedGossipMessage) string {
dataMsg := msg.GetDataMsg()
if dataMsg == nil || dataMsg.Payload == nil {
gc.logger.Warning("Non-data block or with no payload")
return ""
}
return fmt.Sprintf("%d", dataMsg.Payload.SeqNum)
}
adapter := &pull.PullAdapter{
Sndr: gc, // gossipChannel本身来作为发送者,其实最终还是comm
MemSvc: gc.memFilter, // menberFilter来作为成员服务
IdExtractor: seqNumFromMsg,
MsgCons: func(msg *protoext.SignedGossipMessage) {
gc.DeMultiplex(msg)
},
}
// 取得GM_DataDig过滤方法,过滤掉较低的区块
adapter.IngressDigFilter = func(digestMsg *proto.DataDigest) *proto.DataDigest {
height := gc.ledgerHeight
digests := digestMsg.Digests
digestMsg.Digests = nil
for i := range digests {
seqNum, err := strconv.ParseUint(string(digests[i]), 10, 64)
if seqNum >= height {
digestMsg.Digests = append(digestMsg.Digests, digests[i])
}
}
return digestMsg
}
return pull.NewPullMediator(conf, adapter)
}
消息处理
func (gc *gossipChannel) HandleMessage(msg protoext.ReceivedMessage) {
// 如果是PullMsg且是拉取区块的类型的消息
if protoext.IsPullMsg(m.GossipMessage) && protoext.GetPullMsgType(m.GossipMessage) == proto.PullMsgType_BLOCK_MSG {
// 如果没有消息发送方的StateInfo会因为不知道其权限,无法处理
if gc.stateInfoMsgStore.MsgByID(msg.GetConnectionInfo().ID) == nil {return}
// 判断该节点是否能够取得该channel的区块
if !gc.eligibleForChannelAndSameOrg(discovery.NetworkMember{PKIid: msg.GetConnectionInfo().ID}) {return}
// 如果是GM_DataUpdate
if protoext.IsDataUpdate(m.GossipMessage) {
var msgs []*protoext.SignedGossipMessage
var items []*proto.Envelope
filteredEnvelopes := []*proto.Envelope{}
for _, item := range m.GetDataUpdate().Data {
gMsg, err := protoext.EnvelopeToGossipMessage(item)
// channel 不匹配
if !bytes.Equal(gMsg.Channel, []byte(gc.chainID)) {return}
// 检测blockMsgStore是否可以添加该区块
if !gc.blockMsgStore.CheckValid(gMsg) {continue}
// 检测区块
if !gc.verifyBlock(gMsg.GossipMessage, msg.GetConnectionInfo().ID) {return}
msgs = append(msgs, gMsg)
items = append(items, item)
}
// 将消息加入到blockMsgStore中
for i, gMsg := range msgs {
item := items[i]
added := gc.blockMsgStore.Add(gMsg)
if !added {continue} // 表示该区块已经添加,或者区块太老了
filteredEnvelopes = append(filteredEnvelopes, item)
}
// Replace the update message with just the blocks that should be processed
m.GetDataUpdate().Data = filteredEnvelopes
}
// 交由BlockPuller处理
gc.blocksPuller.HandleMessage(msg)
}
}
2、Node中的certPuller
初始化
// gossip/gossip/gossip_impl.go
func New(...) {
g.certPuller = g.createCertStorePuller()
}
func (g *Node) createCertStorePuller() pull.Mediator {
conf := pull.Config{
MsgType: pg.PullMsgType_IDENTITY_MSG,
Channel: []byte(""),
ID: g.conf.InternalEndpoint,
PeerCountToSelect: g.conf.PullPeerNum,
PullInterval: g.conf.PullInterval,
Tag: pg.GossipMessage_EMPTY,
PullEngineConfig: algo.PullEngineConfig{
DigestWaitTime: g.conf.DigestWaitTime,
RequestWaitTime: g.conf.RequestWaitTime,
ResponseWaitTime: g.conf.ResponseWaitTime,
},
}
// 生成itemId的方式,就是以身份的PKIID为itemID
pkiIDFromMsg := func(msg *protoext.SignedGossipMessage) string {
identityMsg := msg.GetPeerIdentity()
return string(identityMsg.PkiId)
}
// 拉取到证书的消费者,简单来说就是加入到idMapper中
certConsumer := func(msg *protoext.SignedGossipMessage) {
idMsg := msg.GetPeerIdentity()
err := g.idMapper.Put(common.PKIidType(idMsg.PkiId), api.PeerIdentityType(idMsg.Cert))
}
adapter := &pull.PullAdapter{
Sndr: g.comm,
MemSvc: g.disc,
IdExtractor: pkiIDFromMsg,
MsgCons: certConsumer,
EgressDigFilter: g.sameOrgOrOurOrgPullFilter, // 同组织散播所有身份,非同组织只散播对外节点(锚节点)
}
return pull.NewPullMediator(conf, adapter)
}
信息处理
func (g *Node) handleMessage(m protoext.ReceivedMessage) {
if protoext.IsPullMsg(msg.GossipMessage) && protoext.GetPullMsgType(msg.GossipMessage) == pg.PullMsgType_IDENTITY_MSG {
g.certStore.handleMessage(m)
}
}
func (cs *certStore) handleMessage(msg protoext.ReceivedMessage) {
if update := msg.GetGossipMessage().GetDataUpdate(); update != nil {
for _, env := range update.Data {
m, err := protoext.EnvelopeToGossipMessage(env)
// 验证身份
if err := cs.validateIdentityMsg(m); err != nil {return}
}
}
// 交给pull处理消息
cs.pull.HandleMessage(msg)
}
Discovery
DIscovery子模块是gossip的一个子模块,不是Discovery服务,需要区分。
数据结构
type Discovery interface {
// 根据PKIID查询成员
Lookup(PKIID common.PKIidType) *NetworkMember
// 返回本节点的成员
Self() NetworkMember
// 更新该节点的Endpoint
UpdateEndpoint(string)
// 取得所有还存活的成员
GetMembership() []NetworkMember
// 请求peerNum个节点的成员信息
InitiateSync(peerNum int)
// 使该节点连接上一个远程实例
Connect(member NetworkMember, id identifier)
}
// 是一个peer的代表
type NetworkMember struct {
Endpoint string
Metadata []byte
PKIid common.PKIidType
InternalEndpoint string
Properties *proto.Properties
*proto.Envelope
}
type gossipDiscoveryImpl struct {
self NetworkMember // 本节点
id2Member map[string]*NetworkMember // 所有已知节点,id到member的映射
deadLastTS map[string]*timestamp // 最近探测为死亡的时间
aliveLastTS map[string]*timestamp // 最近探测为存活的时间
aliveMembership *util.MembershipStore // 存活member
deadMembership *util.MembershipStore // 死亡member
selfAliveMessage *protoext.SignedGossipMessage // 该节点的存活消息
msgStore *aliveMsgStore // GM_AliveMsg的存储器
comm CommService // DiscoveryAdapter,主要提供通信模块的适配
crypt CryptoService // DiscoverySecurityAdapter,主要提供了消息加密的适配
port int
disclosurePolicy DisclosurePolicy // 暴露策略
}
初始化过程
// gossip/gossip/gossip_impl.go
func New(...) {
g.discAdapter = g.newDiscoveryAdapter()
g.disSecAdap = g.newDiscoverySecurityAdapter()
g.disc = discovery.NewDiscoveryService(self, g.discAdapter, g.disSecAdap, g.disclosurePolicy,
discoveryConfig, anchorPeerTracker, logger)
}
// gossip/discovery/discovery_impl.go
func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoService, disPol DisclosurePolicy,
config DiscoveryConfig, anchorPeerTracker AnchorPeerTracker, logger util.Logger) Discovery {
d := &gossipDiscoveryImpl{...}
d.validateSelfConfig()
d.msgStore = newAliveMsgStore(d)
// 定期发送aliveMsg
go d.periodicalSendAlive()
// 定期检查其他节点是否存活
go d.periodicalCheckAlive()
go d.handleMessages()
go d.periodicalReconnectToDead()
go d.handleEvents()
return d
}
func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore {
// 消息替换策略,对于同一PKIID的,时间戳更新的有效
policy := protoext.NewGossipMessageComparator(0)
aliveMsgTTL := d.aliveExpirationTimeout * time.Duration(d.msgExpirationFactor)
// 过期回调函数
callback := func(m interface{}) {
msg := m.(*protoext.SignedGossipMessage)
membership := msg.GetAliveMsg().Membership
id := membership.PkiId
// 如果该过期节点为bootstrap或anchor节点则不删去
if ... {return}
// 过期,则删除该节点的信息
d.aliveMembership.Remove(id)
d.deadMembership.Remove(id)
delete(d.id2Member, string(id))
delete(d.deadLastTS, string(id))
delete(d.aliveLastTS, string(id))
}
// 创建aliveMsg的MsgStore
s := &aliveMsgStore{
MessageStore: msgstore.NewMessageStoreExpirable(policy, trigger, aliveMsgTTL, externalLock, externalUnlock, callback),
}
return s
}
周期性发送aliveMsg
func (d *gossipDiscoveryImpl) periodicalSendAlive() {
for !d.toDie() {
time.Sleep(d.aliveTimeInterval)
msg, err := d.createSignedAliveMessage(true)
// 更新自己的aliveMessage
d.selfAliveMessage = msg
// 散播该msg
d.comm.Gossip(msg)
}
}
周期性检查其他节点存活
func (d *gossipDiscoveryImpl) periodicalCheckAlive() {
for !d.toDie() {
time.Sleep(d.aliveExpirationCheckInterval)
dead := d.getDeadMembers()
if len(dead) > 0 {
d.expireDeadMembers(dead)
}
}
}
func (d *gossipDiscoveryImpl) getDeadMembers() []common.PKIidType {
dead := []common.PKIidType{}
// 查询上一次探测为存活的时间,如果超过过期时限,则判断为Dead
for id, last := range d.aliveLastTS {
elapsedNonAliveTime := time.Since(last.lastSeen)
if elapsedNonAliveTime > d.aliveExpirationTimeout {
dead = append(dead, common.PKIidType(id))
}
}
return dead
}
func (d *gossipDiscoveryImpl) expireDeadMembers(dead []common.PKIidType) {
var deadMembers2Expire []NetworkMember
for _, pkiID := range dead {
deadMembers2Expire = append(deadMembers2Expire, d.id2Member[string(pkiID)].Clone())
// 把探测为存活的时间作为探测为死亡的时间,并且把alive的节点移到dead中
lastTS, hasLastTS := d.aliveLastTS[string(pkiID)]
if hasLastTS {
d.deadLastTS[string(pkiID)] = lastTS
delete(d.aliveLastTS, string(pkiID))
}
if am := d.aliveMembership.MsgByID(pkiID); am != nil {
d.deadMembership.Put(pkiID, am)
d.aliveMembership.Remove(pkiID)
}
}
for i := range deadMembers2Expire {
// 断开连接
d.comm.CloseConn(&deadMembers2Expire[i])
}
}
处理消息
func (d *gossipDiscoveryImpl) handleMessages() {
in := d.comm.Accept() // 是DiscoveryAdapter的accept,返回它的incChan
for {
select {
case m := <-in:
d.handleMsgFromComm(m)
case <-d.toDieChan:
return
}
}
}
// 消息的获取
// gossip/gossip/gossip_impl.go
func (g *Node) handleMessage(m protoext.ReceivedMessage) {
if selectOnlyDiscoveryMessages(m) {
// 处理GM_AliveMsg GM_MemReq GM_MemRes
// 验证其selfInfomation是否符合发送方
if m.GetGossipMessage().GetMemReq() != nil {...}
g.discAdapter.incChan <- msg
}
}
// 消息的处理
func (d *gossipDiscoveryImpl) handleMsgFromComm(msg protoext.ReceivedMessage) {
m := msg.GetGossipMessage()
// 如果是GM_MemReq
if memReq := m.GetMemReq(); memReq != nil {
// 取得请求中的selfInfo,它是目标节点的selfAliveMsg
selfInfoGossipMsg, err := protoext.EnvelopeToGossipMessage(memReq.SelfInformation)
if !d.crypt.ValidateAliveMsg(selfInfoGossipMsg) {return}
// 如果msgStore中可以存入该消息,处理该aliveMsg
if d.msgStore.CheckValid(selfInfoGossipMsg) {
d.handleAliveMessage(selfInfoGossipMsg)
}
// 发送GM_MemRes,把自己和aliveMemberShip存入到Alive,把deadMembership存入到Dead中
go d.sendMemResponse(selfInfoGossipMsg.GetAliveMsg().Membership, internalEndpoint, m.Nonce)
return
}
// 如果是GM_Alive
if protoext.IsAliveMsg(m.GossipMessage) {
if !d.msgStore.CheckValid(m) || !d.crypt.ValidateAliveMsg(m) {return}
if d.isSentByMe(m) {return}
d.msgStore.Add(m)
d.handleAliveMessage(m) // 处理aliveMsg
d.comm.Forward(msg) // 散播该消息
return
}
// 如果是GM_MemRes
if memResp := m.GetMemRes(); memResp != nil {
// 取得响应中的所有AliveMsg
for _, env := range memResp.Alive {
am, err := protoext.EnvelopeToGossipMessage(env)
// 处理存活消息
if d.msgStore.CheckValid(am) && d.crypt.ValidateAliveMsg(am) {
d.handleAliveMessage(am)
}
}
// 取得响应中的所有Dead
for _, env := range memResp.Dead {
dm, err := protoext.EnvelopeToGossipMessage(env)
if !d.msgStore.CheckValid(dm) || !d.crypt.ValidateAliveMsg(dm) {continue}
newDeadMembers := []*protoext.SignedGossipMessage{}
if _, known := d.id2Member[string(dm.GetAliveMsg().Membership.PkiId)]; !known {
newDeadMembers = append(newDeadMembers, dm)
}
// 认识新节点
d.learnNewMembers([]*protoext.SignedGossipMessage{}, newDeadMembers)
}
}
}
func (d *gossipDiscoveryImpl) handleAliveMessage(m *protoext.SignedGossipMessage) {
pkiID := m.GetAliveMsg().Membership.PkiId
ts := m.GetAliveMsg().Timestamp
// 查看该节点是否已经在id2Member中,不在则要认识一下,超长的过期会导致不认识,短期过期只会认为死去
_, known := d.id2Member[string(pkiID)]
if !known {
d.learnNewMembers([]*protoext.SignedGossipMessage{m}, []*protoext.SignedGossipMessage{})
return
}
_, isAlive := d.aliveLastTS[string(pkiID)]
lastDeadTS, isDead := d.deadLastTS[string(pkiID)]
// 已经死亡
if isDead {
// 该消息较死亡时间更新
if before(lastDeadTS, ts) {
// 把dead里面的信息移到alive中,并刷新aliveLastTS
d.resurrectMember(m, *ts)
}
return
}
lastAliveTS, isAlive := d.aliveLastTS[string(pkiID)]
// 如果存活,且该消息较新,重新认识该节点
if isAlive {
if before(lastAliveTS, ts) {
// 跟learnNewMember做的事差不多
d.learnExistingMembers([]*protoext.SignedGossipMessage{m})
}
}
}
func (d *gossipDiscoveryImpl) learnNewMembers(aliveMembers []*protoext.SignedGossipMessage, deadMembers []*protoext.SignedGossipMessage) {
for _, am := range aliveMembers {
// 添加信息到 aliveMembership和aliveLastTs
...
}
for _, dm := range deadMembers {
// 添加信息到 deaMembership和deadLastTs
...
}
// update the member in any case
for _, a := range [][]*protoext.SignedGossipMessage{aliveMembers, deadMembers} {
for _, m := range a {
// 更新 id2Member
...
}
}
}
周期重连至死亡节点
func (d *gossipDiscoveryImpl) periodicalReconnectToDead() {
for !d.toDie() {
// 取出ID2Member
for _, member := range d.copyLastSeen(d.deadLastTS) {
go func(member NetworkMember) {
// 尝试ping通,实际上用的是commImpl.Probe
if d.comm.Ping(&member) {
// ping通后发送GM_MemReq
d.sendMembershipRequest(&member, true)
}
}(member)
}
time.Sleep(d.reconnectInterval)
}
}
消息分析
相关的消息类型有:
AliveMsg,此为散播消息,在服务中周期性散播MemReq,与MemRes为一组点对点的消息,用于请求其已经存储的存活信息MemRes,与MemReq一组,用于回应AliveMsg在periodicalSendAlive发送,在handleMessage处理
MemRes在handleMessage使处理,且在处理MemReq时发送
MemReq在handleMessage中处理,发送有多种情况
// 1、重连至死亡节点,可以看上面
// 2、Connect方法,但是调用该方法的是Node连接AnchorPeer和BootStrap节点
// 3、InitiateSync
// 调用于
func (g *Node) start() {
go g.syncDiscovery()
}
func (g *Node) syncDiscovery() {
for !g.toDie() {
g.disc.InitiateSync(g.conf.PullPeerNum)
time.Sleep(g.conf.PullInterval)
}
}
func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) {
var peers2SendTo []*NetworkMember
n := d.aliveMembership.Size()
k := peerNum
if k > n {k = n}
aliveMembersAsSlice := d.aliveMembership.ToSlice()
for _, i := range util.GetRandomIndices(k, n-1) {
// 随机找k个活的节点
pulledPeer := aliveMembersAsSlice[i].GetAliveMsg().Membership
var internalEndpoint string
if aliveMembersAsSlice[i].Envelope.SecretEnvelope != nil {
internalEndpoint = protoext.InternalEndpoint(aliveMembersAsSlice[i].Envelope.SecretEnvelope)
}
netMember := &NetworkMember{
Endpoint: pulledPeer.Endpoint,
Metadata: pulledPeer.Metadata,
PKIid: pulledPeer.PkiId,
InternalEndpoint: internalEndpoint,
}
peers2SendTo = append(peers2SendTo, netMember)
}
// 创建GM_MemReq
m, err := d.createMembershipRequest(true)
memReq, err := protoext.NoopSign(m)
for _, netMember := range peers2SendTo {
d.comm.SendToPeer(netMember, memReq)
}
}
GossipStateProvider
核心内容已经在散播流程的2、11两步介绍过,它在散播得到DataMsg,Deliver拉取区块后被调用,将区块信息写入到账本中
数据结构
// 它将用于将区块存入到账本中,并且请求缺失的区块
type GossipStateProvider interface {
AddPayload(payload *proto.Payload) error
}
type GossipStateProviderImpl struct {
chainID string
mediator *ServicesMediator // Gossip和MCS的适配器,用于
payloads PayloadsBuffer // 待写入到账本的区块队列
ledger ledgerResources // 账本资源
stateResponseCh chan protoext.ReceivedMessage
stateRequestCh chan protoext.ReceivedMessage
stateTransferActive int32
requestValidator *stateRequestValidator // StateRequest的验证器
}
核心功能
// 添加存有区块的payload到缓存payloads中
func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMode bool) error {
// 取得账本高度
height, err := s.ledger.LedgerHeight()
// 如果不是阻塞模式,如果该区块的序列号比账本高度大过缓存器
if !blockingMode && payload.SeqNum-height >= uint64(s.config.StateBlockBufferSize) {
return errors.Errorf("Ledger height is at %d, cannot enqueue block with sequence of %d", height, payload.SeqNum)
}
// 如果处于阻塞模式,则阻塞
for blockingMode && s.payloads.Size() > s.config.StateBlockBufferSize*2 {
time.Sleep(enqueueRetryInterval)
}
// payloads为待写入队列,类型为PayloadBuffer
s.payloads.Push(payload)
return nil
}
// 将payloads中有下一个区块(当前高度+1)时,写入到账本中
func (s *GossipStateProviderImpl) deliverPayloads() {
for {
select {
// 阻塞等待区块准备完毕
case <-s.payloads.Ready():
for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() {
rawBlock := &common.Block{}
// 这里貌似没有补偿机制
if err := pb.Unmarshal(payload.Data, rawBlock); err != nil {continue}
if rawBlock.Data == nil || rawBlock.Header == nil {continue}
var p util.PvtDataCollections
// 有私有数据读取一下
if payload.PrivateData != nil {err := p.Unmarshal(payload.PrivateData)}
// 提交区块
if err := s.commitBlock(rawBlock, p); err != nil {}
}
case <-s.stopCh:
return
}
}
}
func (s *GossipStateProviderImpl) commitBlock(block *common.Block, pvtData util.PvtDataCollections) error {
// 存储区块,此部分已经脱离gossip,不再深挖
if err := s.ledger.StoreBlock(block, pvtData); err != nil {
return err
}
// 更新账本高度
s.mediator.UpdateLedgerHeight(block.Header.Number+1, common2.ChannelID(s.chainID))
return nil
}
我们下一步要去找哪里会调用这些核心功能,其实大部分在初始化中就调用了,但是addPayload会在其他地方被调用,我们首先分析初始化部分,它会开启很多关键的协程
// 它在gossipService初始化Channel时会进行初始化该StateProvider
// gossip/service/gossip_service.go
func (g *GossipService) InitializeChannel(...) {
g.chains[channelID] = state.NewGossipStateProvider(...)
}
// gossip/state/state.go
func NewGossipStateProvider(...) {
// 向Comm订阅消息,接收该Channel的GM_DataMsg
gossipChan, _ := services.Accept(func(message interface{}) bool {
return protoext.IsDataMsg(message.(*proto.GossipMessage)) &&
bytes.Equal(message.(*proto.GossipMessage).Channel, []byte(chainID))
}, false)
// 向Comm订阅消息,接收该Channel的GM_StateReq GM_StateRes GM_PrivateData
remoteStateMsgFilter := func(message interface{}) bool {
receivedMsg := message.(protoext.ReceivedMessage)
msg := receivedMsg.GetGossipMessage()
if !(protoext.IsRemoteStateMessage(msg.GossipMessage) || msg.GetPrivateData() != nil) {return false}
// Ensure we deal only with messages that belong to this channel
if !bytes.Equal(msg.Channel, []byte(chainID)) {return false}
// 使用Channel/Application/Reader策略验证消息发送方是否有权限
connInfo := receivedMsg.GetConnectionInfo()
authErr := services.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData)
if authErr != nil {return false}
return true
}
_, commChan := services.Accept(remoteStateMsgFilter, true)
height, err := ledger.LedgerHeight()
s := &GossipStateProviderImpl{...}
// 更新gossipChannel的高度,gossipChannel仅被初始化为高度1,现在读取本地账本,初始化其高度
services.UpdateLedgerHeight(height, common2.ChannelID(s.chainID))
// Listen for incoming communication
go s.receiveAndQueueGossipMessages(gossipChan)
go s.receiveAndDispatchDirectMessages(commChan)
// 开启处理payloads缓存的线程
go s.deliverPayloads()
if s.config.StateEnabled {
// 反熵,如果允许的情况下,该节点向其他节点请求自己缺失的区块
go s.antiEntropy()
}
// Taking care of state request messages
go s.processStateRequests()
return s
}
初始化过程中开启的协程
receiveAndQueueGossipMessages
// 接收该Channel的DataMsg,将其加入到payloads中
func (s *GossipStateProviderImpl) receiveAndQueueGossipMessages(ch <-chan *proto.GossipMessage) {
for msg := range ch {
go func(msg *proto.GossipMessage) {
dataMsg := msg.GetDataMsg()
if dataMsg != nil {
if err := s.addPayload(dataMsg.GetPayload(), nonBlocking); err != nil {return}
}
}(msg)
}
}
receiveAndDispatchDirectMessages
// 接收该Channel的GM_StateReq GM_StateRes GM_PrivateData
func (s *GossipStateProviderImpl) receiveAndDispatchDirectMessages(ch <-chan protoext.ReceivedMessage) {
for msg := range ch {
go func(msg protoext.ReceivedMessage) {
gm := msg.GetGossipMessage()
// 如果是StateReq或StateRes
if protoext.IsRemoteStateMessage(gm.GossipMessage) {
s.directMessage(msg)
// 如果是私有数据
} else if gm.GetPrivateData() != nil {
s.privateDataMessage(msg)
}
}(msg)
}
}
// 将消息重定向到stateRequestCh和stateResponseCh
func (s *GossipStateProviderImpl) directMessage(msg protoext.ReceivedMessage) {
incoming := msg.GetGossipMessage()
if incoming.GetStateRequest() != nil {
// 不要响应过多请求
if len(s.stateRequestCh) < s.config.StateChannelSize {
s.stateRequestCh <- msg
}
} else if incoming.GetStateResponse() != nil {
// 不支持状态变换,就没必要去处理请求
if atomic.LoadInt32(&s.stateTransferActive) == 1 {
s.stateResponseCh <- msg
}
}
}
// 处理私有数据
func (s *GossipStateProviderImpl) privateDataMessage(msg protoext.ReceivedMessage) {
gossipMsg := msg.GetGossipMessage()
pvtDataMsg := gossipMsg.GetPrivateData()
collectionName := pvtDataMsg.Payload.CollectionName
txID := pvtDataMsg.Payload.TxId
pvtRwSet := pvtDataMsg.Payload.PrivateRwset
// 初始化交易的私有数据读写集
txPvtRwSet := &rwset.TxPvtReadWriteSet{...}
txPvtRwSetWithConfig := &transientstore.TxPvtReadWriteSetWithConfigInfo{...}
// 将私有数据写入到账本中
if err := s.ledger.StorePvtData(txID, txPvtRwSetWithConfig, pvtDataMsg.Payload.PrivateSimHeight); err != nil {
msg.Ack(err) // 发送消息,告知发送方接收失败
}
msg.Ack(nil) // 发送消息,告知发送方接收成功
}
deliverPayloads在核心功能中介绍过了
antiEntropy
// 反熵,该节点向其他节点请求自己缺失的区块
func (s *GossipStateProviderImpl) antiEntropy() {
for {
select {
case <-time.After(s.config.StateCheckInterval): // 每隔一段时间,发出StateReq请求
ourHeight, err := s.ledger.LedgerHeight()
maxHeight := s.maxAvailableLedgerHeight()
s.requestBlocksInRange(uint64(ourHeight), uint64(maxHeight)-1)
}
}
}
func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64) {
// 请求期间允许处理响应
atomic.StoreInt32(&s.stateTransferActive, 1)
defer atomic.StoreInt32(&s.stateTransferActive, 0)
for prev := start; prev <= end; {
// 请求不超过StateBatchSize个区块
next := min(end, prev+s.config.StateBatchSize)
// 创建GM_StateReq消息
gossipMsg := s.stateRequestMessage(prev, next)
tryCounts := 0
for !responseReceived {
// 尝试一定次数
if tryCounts > s.config.StateMaxRetries {return}
// 依靠gossip中discovery子模块取得高度足够的节点,并随机其中一个
peer, err := s.selectPeerToRequestFrom(next)
// 发送请求
s.mediator.Send(gossipMsg, peer)
tryCounts++
// Wait until timeout or response arrival
select {
// 发送请求阻塞等待响应,或等待一段时间,该ch源于过程的dispatch
case msg, stillOpen := <-s.stateResponseCh:
if msg.GetGossipMessage().Nonce != gossipMsg.Nonce {continue}
// 处理响应
index, err := s.handleStateResponse(msg)
prev = index + 1 // 请求之后的区块
responseReceived = true // 退出重试循环
case <-time.After(s.config.StateResponseTimeout):
}
}
}
}
// 处理响应
func (s *GossipStateProviderImpl) handleStateResponse(msg protoext.ReceivedMessage) (uint64, error) {
max := uint64(0)
response := msg.GetGossipMessage().GetStateResponse()
// 将响应中的payload加入到缓冲中
for _, payload := range response.GetPayloads() {
block, err := protoutil.UnmarshalBlock(payload.Data)
if err := s.mediator.VerifyBlock(common2.ChannelID(s.chainID), payload.SeqNum, block); err != nil {
return uint64(0), err
}
if max < payload.SeqNum {max = payload.SeqNum}
err = s.addPayload(payload, blocking)
}
return max, nil
}
processStateRequests
// 处理请求
func (s *GossipStateProviderImpl) processStateRequests() {
for {
msg, stillOpen := <-s.stateRequestCh
s.handleStateRequest(msg)
}
}
func (s *GossipStateProviderImpl) handleStateRequest(msg protoext.ReceivedMessage) {
request := msg.GetGossipMessage().GetStateRequest()
// 验证请求
if err := s.requestValidator.validate(request, s.config.StateBatchSize); err != nil {return}
currentHeight, err := s.ledger.LedgerHeight()
endSeqNum := min(currentHeight, request.EndSeqNum)
response := &proto.RemoteStateResponse{Payloads: make([]*proto.Payload, 0)}
for seqNum := request.StartSeqNum; seqNum <= endSeqNum; seqNum++ {
// 取得发送者的身份信息
connInfo := msg.GetConnectionInfo()
peerAuthInfo := protoutil.SignedData{
Data: connInfo.Auth.SignedData,
Signature: connInfo.Auth.Signature,
Identity: connInfo.Identity,
}
// 取得区块和私有消息
block, pvtData, err := s.ledger.GetPvtDataAndBlockByNum(seqNum, peerAuthInfo)
blockBytes, err := pb.Marshal(block)
var pvtBytes [][]byte
// 处理私有数据
if pvtData != nil {
// Marshal private data
pvtBytes, err = pvtData.Marshal()
}
// 添加到response中
response.Payloads = append(response.Payloads, &proto.Payload{
SeqNum: seqNum,
Data: blockBytes,
PrivateData: pvtBytes,
})
}
// 发送GM_StateRes
msg.Respond(&proto.GossipMessage{
// Copy nonce field from the request, so it will be possible to match response
Nonce: msg.GetGossipMessage().Nonce,
Tag: proto.GossipMessage_CHAN_OR_ORG,
Channel: []byte(s.chainID),
Content: &proto.GossipMessage_StateResponse{StateResponse: response},
})
}
调用addPayload场景整理
// 1、节点执行反熵,向其他节点发送GM_StateReq,并处理GM_StateRes消息
func (s *GossipStateProviderImpl) handleStateResponse(msg protoext.ReceivedMessage) (uint64, error) {
max := uint64(0)
response := msg.GetGossipMessage().GetStateResponse()
// 将响应中的payload加入到缓冲中
for _, payload := range response.GetPayloads() {
block, err := protoutil.UnmarshalBlock(payload.Data)
if err := s.mediator.VerifyBlock(common2.ChannelID(s.chainID), payload.SeqNum, block); err != nil {
return uint64(0), err
}
if max < payload.SeqNum {max = payload.SeqNum}
err = s.addPayload(payload, blocking)
}
return max, nil
}
// 2、leader节点通过Deliver取得区块后,调用AddPayload存入账本
func (d *Deliverer) processMsg(msg *orderer.DeliverResponse) error {
if err := d.Gossip.AddPayload(d.ChannelID, payload); err != nil
}
func (g *GossipService) AddPayload(channelID string, payload *gproto.Payload) error {
return g.chains[channelID].AddPayload(payload)
}
func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error {
return s.addPayload(payload, s.blockingMode)
}
// 3、收到其他节点散播过来的GM_DataMsg
func (s *GossipStateProviderImpl) receiveAndQueueGossipMessages(ch <-chan *proto.GossipMessage) {
for msg := range ch {
go func(msg *proto.GossipMessage) {
s.addPayload(dataMsg.GetPayload(), nonBlocking)
}(msg)
}
}
gossip消息类型
Message结构
由于go语言的继承不是真的继承,它们之间的关系更像是聚合,当然子类在类型识别的时候依旧是属于父类的,ReceivedMessage包含了GossipMessage,但它本身又是GossipMessage,给人的感觉不是很舒服,看代码的时候也要注意。
GossipMessage就是我们划分为各种类型的消息本身,Content就是消息的内容,也可以根据内容的类型判别其具体的GossipMessage类型,Tag会标明信息传播的范围,如Channel内部,Org组织内部等等Envelope为签名后的数据,Payload就是GossipMessageSignedGossipMessage,就是可以签名的GossipMessage,它在调用Sign的方法后,会将GossipMessage签名,GossipMessage作为this.Envelope.Payload,签名作为this.Envelope.SignatureReceivedMessage,存储了签名后的GossipMessage,它还会附带有发送方的连接信息。Comm通信模块会接收SignedGossipMessage对象,在发送时将其封装为ReceivedMessage,接收时也会收到ReceivedMessage 消息类型的应用场景1、 节点发现服务
GossipMessage_AliveMsg
GossipMessage_MemReq
GossipMessage_MemRes
在discovery模块,我进行了统一的分析
2、 区块散播
GossipMessage_DataMsg
发送:
leader节点通过deliver取得区块消息,通过Gossip散播收到其他节点的DataMsg,继续通过GossipChannel.Forward继续散播接收:
GossipChannel处理DataMsg消息3、 区块拉取
GossipMessage_Hello
GossipMessage_DataDig
GossipMessage_DataReq
GossipMessage_DataUpdate
这里就不细分了,全部在Pull,发送在pullMediator中,接收在pullMediator的handleMessage和PullEngine中
4、 连接
GossipMessage_Empty
它用于Ping,其他消息类型的处理都会交给注册在GRPC服务器的commImpl的GossipStream来处理,但是Empty会用Ping来处理,但是处理方式就是也回复一个Empty消息。
GossipMessage_Conn
它用于两个节点的连接,它会和其他消息类型一样采用GossipStream服务的形式,其他消息的处理会在连接建立后,而连接的建立通过该消息完成,其发送和处理都在commImpl.authenticateRemotePeer方法中被处理,具体看comm部分
5、 StateInfo
GossipMessage_StateInfo
发送:
GossipChannel中定期散播自己的状态信息接收:
GossipChannel的handleMessage中处理StateInfoMsgGossipMessage_StateSnapshot
发送:
GossipChannel的handleMessage中收到StateInfoPullReq,向其发送SnapShot接收:
GossipChannel的handleMessage中处理SnapShotGossipMessage_StateInfoPullReq
发送:
GossipChannel中定期向其他节点发出StateInfoPullReq接收:
和StateSnapShot一致,收到然后发送6、 State
GossipMessage_StateRequest
发送:
反熵接收:
处理请求GossipMessage_StateResponse
发送:
处理请求,向请求方发送响应接收:
反熵发送请求后会等待响应7、 Leadership(暂时不分析)
GossipMessage_LeadershipMsg
GossipMessage_PeerIdentity
GossipMessage_Ack
8、 私有数据(暂时不分析)
GossipMessage_PrivateReq
GossipMessage_PrivateRes
GossipMessage_PrivateData
原图
结语总算把笔记上的内容移到这里来了,内容还挺多的,算是我这两周多的成果。内容上可能会有点问题,有人看到了的话直接评论里说一下就好了。润了润了。。。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)