Edit me

[TOC]

类定义

bpRequester在文件 /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/blockchain/pool.go 中。

type bpRequester struct {
   BaseService
   pool       *BlockPool
   height     agtypes.INT
   gotBlockCh chan struct{}
   redoCh     chan struct{}

   mtx    sync.Mutex
   peerID string
   block  *agtypes.BlockCache
}

BaseService是用来实现自身一些服务功能的,后面会使用这个类go一个线程出来; BlockPool是一个挂载的指针,pool变量还会指向自身; height就是我们要去判断的一个变量,如何添加还未知; gotBlockCh和redoCh是信号量; block挂载一个外部变量,用来手法数据的基础数据格式;

启动流程

启动流程和NewBlockchainReactor类似,因为就是在这个函数中调用了NewBlockPool,然后在BlockchainReactor的OnStart中启动:

if bcR.fastSync {
   _, err := bcR.pool.Start()
   if err != nil {
      return err
   }
   go bcR.poolRoutine()
}

这个Start会调用到pool的OnStart:

func (pool *BlockPool) OnStart() error {
   pool.BaseService.OnStart()
   go pool.makeRequestersRoutine()
   pool.startTime = time.Now()
   return nil
}

这里会启动一个线程来对pool池进行管理:

// Run spawns requesters as needed.
func (pool *BlockPool) makeRequestersRoutine() {
   for {
      if !pool.IsRunning() {
         break
      }
      _, numPending, lenRequesters := pool.GetStatus()
      if numPending >= maxPendingRequests {                分支一
         // sleep for a bit.
         time.Sleep(requestIntervalMS * time.Millisecond)
         // check for timed out peers
         pool.removeTimedoutPeers()
      } else if lenRequesters >= maxTotalRequesters {      分支二
         // sleep for a bit.
         time.Sleep(requestIntervalMS * time.Millisecond)
         // check for timed out peers
         pool.removeTimedoutPeers()
      } else {                                              分支三
         // request for more blocks.
         pool.makeNextRequester()
      }
   }
}

通过断点发现重复执行分支三,去掉断点重复执行分支一,再去掉断点也无法进入分支二。 分之三会启动Requester的线程:

func (pool *BlockPool) makeNextRequester() {
   pool.mtx.Lock()
   defer pool.mtx.Unlock()

   nextHeight := pool.height + agtypes.INT(len(pool.requesters))
   request := newBPRequester(pool, nextHeight)

   pool.requesters[nextHeight] = request
   pool.numPending++

   request.Start()
}

启动函数: /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/blockchain/pool.go

func (bpr *bpRequester) OnStart() error {
   bpr.BaseService.OnStart()
   go bpr.requestRoutine()
   return nil
}

这是这个模块启动的过程。

方法描述

bpRequester有两个方法需要注意一下:


// Returns true if the peer matches
func (bpr *bpRequester) setBlock(block *pbtypes.Block, peerID string) bool {
   bpr.mtx.Lock()
   if bpr.block != nil || bpr.peerID != peerID {
      bpr.mtx.Unlock()
      return false
   }
   bpr.block = agtypes.MakeBlockCache(block)
   bpr.mtx.Unlock()

   bpr.gotBlockCh <- struct{}{}
   return true
}

func (bpr *bpRequester) getBlock() *agtypes.BlockCache {
   bpr.mtx.Lock()
   defer bpr.mtx.Unlock()
   return bpr.block
}

这里的名称是对Block的操作,但并不是出块,因为它只挂载了一个节点,并不是链的形式,它是将块信息组织好,然后通过channel发送出去了,我们可以根据这个函数看看谁调用它对块打包的。 通过断点发现这个函数根本没有被调用,我们通过回溯看看它的调用流程。

setBlock的调用流程

还是先回到Angine的创建函数:

func NewAngine(lgr *zap.Logger, tune *Tunes) (angine *Angine) {
---------------------
p2psw, err := prepareP2P(logger, conf, gb, privValidator, refuseList)
if err != nil {
   lgr.Error("prepare p2p err", zap.Error(err))
   return nil
}
p2pListener := p2psw.Listeners()[0]

函数启动了P2P的部分,P2P则创建了一个Switch:

func prepareP2P(logger *zap.Logger, conf *viper.Viper, genesisBytes []byte, privValidator *agtypes.PrivValidator, refuseList *refuse_list.RefuseList) (*p2p.Switch, error) {
   p2psw := p2p.NewSwitch(logger, conf, genesisBytes)
-----------------------------------------------
privKey := privValidator.GetPrivKey()
p2psw.AddListener(defaultListener)
p2psw.SetNodeInfo(nodeInfo)
p2psw.SetNodePrivKey(*(privKey.(*crypto.PrivKeyEd25519)))
p2psw.SetAddToRefuselist(addToRefuselist(refuseList))
p2psw.SetRefuseListFilter(refuseListFilter(refuseList))

创建的这个Switch会通过OnStart启动,什么时候调用这个OnStart可以参考8章节中Pool启动的机制,应该是类似的,这里先不去追踪。 跳转到文件/home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/chorus-module/lib/go-p2p/switch.go

// Switch.Start() starts all the reactors, peers, and listeners.
func (sw *Switch) OnStart() error {
   sw.BaseService.OnStart()
   // Start reactors
   for _, reactor := range sw.reactors {
      _, err := reactor.Start()
      if err != nil {
         return err
      }
   }
   // Start peers
   for _, peer := range sw.peers.List() {
      sw.startInitPeer(peer)
   }
   // Start listeners
   for _, listener := range sw.listeners {
      go sw.listenerRoutine(listener)
   }
   return nil
}

在启动的线程listenerRoutine中,会调用下面函数:

rn, err := inConn.Read(recv)
if err != nil {
   inConn.Close()
   continue OUTER // this connection doesn't play
}
bytes := recv[:rn]
switch bytes[0] {
case ConnActionP2P:
---------------------------------------------------------------------
// New inbound connection!
if _, err := sw.AddPeerWithConnection(inConn, false); err != nil {
   sw.logger.Info("Ignoring inbound connection: error on AddPeerWithConnection", zap.Stringer("address", inConn.RemoteAddr()), zap.String("error", err.Error()))
}

这个函数在这个文件中被多处调用,不一定就是在这个线程中,后面需要根据代码再作分析,这里假设就是在线程中调用,这个函数做了很多事情,都是和远程相关的,如过滤远程地址,设置握手超时时间,建立加密连接,并进行公钥过滤等,最后在验证通过后建立一个新的节点对象:

// NOTE: This performs a blocking handshake before the peer is added.
// CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed.
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
--------------------------------------------------
peer := newPeer(sw.logger, sw.config, sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)

此时代码跳转到节点对象文件中, /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/chorus-module/lib/go-p2p/peer.go 在newPeer中会创建一个匿名函数onReceive,并通过NewMConnection设置成回调,在哪里调用的我们后面再查,注意,这个回调就是添加区块的回调:

// NOTE: call peerHandshake on conn before calling newPeer().
func newPeer(logger *zap.Logger, config *viper.Viper, conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
   var p *Peer
   onReceive := func(chID byte, msgBytes []byte) {
      reactor := reactorsByCh[chID]
      if reactor == nil {
         PanicSanity(Fmt("Unknown channel %X", chID))
      }
      reactor.Receive(chID, p, msgBytes)
   }
   onError := func(r interface{}) {
      p.Stop()
      onPeerError(p, r)
   }
   mconn := NewMConnection(logger, config, conn, chDescs, onReceive, onError)

回调中reactor.Receive(chID, p, msgBytes)会接收数据,它是一个接口,实现体在下面文件中: /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/blockchain/reactor.go BlockchainReactor的Recive中一个分支调用:

// Implements Reactor
func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
----------------------------------------------------------------
case *blkpb.BlockResponseMessage:
   // Got a block.
   bcR.validatorSetor(agtypes.MakeBlockCache(msg.Block))
   bcR.pool.AddBlock(src.Key, msg.Block, len(msgBytes))

在收到*blkpb.BlockResponseMessage消息时调用下面函数执行区块打包:

// TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID string, block *pbtypes.Block, blockSize int) {
   pool.mtx.Lock()
   defer pool.mtx.Unlock()

   requester := pool.requesters[block.Header.Height]
   if requester == nil {
      return
   }

   if requester.setBlock(block, peerID) {
      pool.numPending--
      peer := pool.peers[peerID]
      peer.decrPending(blockSize)
   } else {
      // Bad peer?
   }
}

这里就对接上了我们上面说的setBlock函数,它将区块打包后放到bpRequester的block变量中,然后通过channel发送信号出去,等待接收信号的模块处理。

消息接收处理分析

上面发送的信号是在下面函数中接收到的:

// Responsible for making more requests as necessary
// Returns only when a block is found (e.g. AddBlock() is called)
func (bpr *bpRequester) requestRoutine() {
---------------------------------
case <-bpr.gotBlockCh:
   // We got the block, now see if it's good.
   select {
   case <-bpr.pool.Quit:
      bpr.Stop()
      return
   case <-bpr.Quit:
      return
   case <-bpr.redoCh:
      bpr.reset()
      continue OUTER_LOOP
   }

这个函数我们在上面描述过,是bpRequester类最后启动的一个线程,但是从这里的代码看,似乎并没有处理这个数据。。。

Tags: