[TOC]
类定义
bpRequester在文件 /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/blockchain/pool.go 中。
type bpRequester struct {
BaseService
pool *BlockPool
height agtypes.INT
gotBlockCh chan struct{}
redoCh chan struct{}
mtx sync.Mutex
peerID string
block *agtypes.BlockCache
}
BaseService是用来实现自身一些服务功能的,后面会使用这个类go一个线程出来; BlockPool是一个挂载的指针,pool变量还会指向自身; height就是我们要去判断的一个变量,如何添加还未知; gotBlockCh和redoCh是信号量; block挂载一个外部变量,用来手法数据的基础数据格式;
启动流程
启动流程和NewBlockchainReactor类似,因为就是在这个函数中调用了NewBlockPool,然后在BlockchainReactor的OnStart中启动:
if bcR.fastSync {
_, err := bcR.pool.Start()
if err != nil {
return err
}
go bcR.poolRoutine()
}
这个Start会调用到pool的OnStart:
func (pool *BlockPool) OnStart() error {
pool.BaseService.OnStart()
go pool.makeRequestersRoutine()
pool.startTime = time.Now()
return nil
}
这里会启动一个线程来对pool池进行管理:
// Run spawns requesters as needed.
func (pool *BlockPool) makeRequestersRoutine() {
for {
if !pool.IsRunning() {
break
}
_, numPending, lenRequesters := pool.GetStatus()
if numPending >= maxPendingRequests { 分支一
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
} else if lenRequesters >= maxTotalRequesters { 分支二
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
} else { 分支三
// request for more blocks.
pool.makeNextRequester()
}
}
}
通过断点发现重复执行分支三,去掉断点重复执行分支一,再去掉断点也无法进入分支二。 分之三会启动Requester的线程:
func (pool *BlockPool) makeNextRequester() {
pool.mtx.Lock()
defer pool.mtx.Unlock()
nextHeight := pool.height + agtypes.INT(len(pool.requesters))
request := newBPRequester(pool, nextHeight)
pool.requesters[nextHeight] = request
pool.numPending++
request.Start()
}
启动函数: /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/blockchain/pool.go
func (bpr *bpRequester) OnStart() error {
bpr.BaseService.OnStart()
go bpr.requestRoutine()
return nil
}
这是这个模块启动的过程。
方法描述
bpRequester有两个方法需要注意一下:
// Returns true if the peer matches
func (bpr *bpRequester) setBlock(block *pbtypes.Block, peerID string) bool {
bpr.mtx.Lock()
if bpr.block != nil || bpr.peerID != peerID {
bpr.mtx.Unlock()
return false
}
bpr.block = agtypes.MakeBlockCache(block)
bpr.mtx.Unlock()
bpr.gotBlockCh <- struct{}{}
return true
}
func (bpr *bpRequester) getBlock() *agtypes.BlockCache {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.block
}
这里的名称是对Block的操作,但并不是出块,因为它只挂载了一个节点,并不是链的形式,它是将块信息组织好,然后通过channel发送出去了,我们可以根据这个函数看看谁调用它对块打包的。 通过断点发现这个函数根本没有被调用,我们通过回溯看看它的调用流程。
setBlock的调用流程
还是先回到Angine的创建函数:
func NewAngine(lgr *zap.Logger, tune *Tunes) (angine *Angine) {
---------------------
p2psw, err := prepareP2P(logger, conf, gb, privValidator, refuseList)
if err != nil {
lgr.Error("prepare p2p err", zap.Error(err))
return nil
}
p2pListener := p2psw.Listeners()[0]
函数启动了P2P的部分,P2P则创建了一个Switch:
func prepareP2P(logger *zap.Logger, conf *viper.Viper, genesisBytes []byte, privValidator *agtypes.PrivValidator, refuseList *refuse_list.RefuseList) (*p2p.Switch, error) {
p2psw := p2p.NewSwitch(logger, conf, genesisBytes)
-----------------------------------------------
privKey := privValidator.GetPrivKey()
p2psw.AddListener(defaultListener)
p2psw.SetNodeInfo(nodeInfo)
p2psw.SetNodePrivKey(*(privKey.(*crypto.PrivKeyEd25519)))
p2psw.SetAddToRefuselist(addToRefuselist(refuseList))
p2psw.SetRefuseListFilter(refuseListFilter(refuseList))
创建的这个Switch会通过OnStart启动,什么时候调用这个OnStart可以参考8章节中Pool启动的机制,应该是类似的,这里先不去追踪。 跳转到文件/home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/chorus-module/lib/go-p2p/switch.go
// Switch.Start() starts all the reactors, peers, and listeners.
func (sw *Switch) OnStart() error {
sw.BaseService.OnStart()
// Start reactors
for _, reactor := range sw.reactors {
_, err := reactor.Start()
if err != nil {
return err
}
}
// Start peers
for _, peer := range sw.peers.List() {
sw.startInitPeer(peer)
}
// Start listeners
for _, listener := range sw.listeners {
go sw.listenerRoutine(listener)
}
return nil
}
在启动的线程listenerRoutine中,会调用下面函数:
rn, err := inConn.Read(recv)
if err != nil {
inConn.Close()
continue OUTER // this connection doesn't play
}
bytes := recv[:rn]
switch bytes[0] {
case ConnActionP2P:
---------------------------------------------------------------------
// New inbound connection!
if _, err := sw.AddPeerWithConnection(inConn, false); err != nil {
sw.logger.Info("Ignoring inbound connection: error on AddPeerWithConnection", zap.Stringer("address", inConn.RemoteAddr()), zap.String("error", err.Error()))
}
这个函数在这个文件中被多处调用,不一定就是在这个线程中,后面需要根据代码再作分析,这里假设就是在线程中调用,这个函数做了很多事情,都是和远程相关的,如过滤远程地址,设置握手超时时间,建立加密连接,并进行公钥过滤等,最后在验证通过后建立一个新的节点对象:
// NOTE: This performs a blocking handshake before the peer is added.
// CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed.
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
--------------------------------------------------
peer := newPeer(sw.logger, sw.config, sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
此时代码跳转到节点对象文件中, /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/chorus-module/lib/go-p2p/peer.go 在newPeer中会创建一个匿名函数onReceive,并通过NewMConnection设置成回调,在哪里调用的我们后面再查,注意,这个回调就是添加区块的回调:
// NOTE: call peerHandshake on conn before calling newPeer().
func newPeer(logger *zap.Logger, config *viper.Viper, conn net.Conn, peerNodeInfo *NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
var p *Peer
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
PanicSanity(Fmt("Unknown channel %X", chID))
}
reactor.Receive(chID, p, msgBytes)
}
onError := func(r interface{}) {
p.Stop()
onPeerError(p, r)
}
mconn := NewMConnection(logger, config, conn, chDescs, onReceive, onError)
回调中reactor.Receive(chID, p, msgBytes)会接收数据,它是一个接口,实现体在下面文件中: /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/blockchain/reactor.go BlockchainReactor的Recive中一个分支调用:
// Implements Reactor
func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
----------------------------------------------------------------
case *blkpb.BlockResponseMessage:
// Got a block.
bcR.validatorSetor(agtypes.MakeBlockCache(msg.Block))
bcR.pool.AddBlock(src.Key, msg.Block, len(msgBytes))
在收到*blkpb.BlockResponseMessage消息时调用下面函数执行区块打包:
// TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID string, block *pbtypes.Block, blockSize int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
requester := pool.requesters[block.Header.Height]
if requester == nil {
return
}
if requester.setBlock(block, peerID) {
pool.numPending--
peer := pool.peers[peerID]
peer.decrPending(blockSize)
} else {
// Bad peer?
}
}
这里就对接上了我们上面说的setBlock函数,它将区块打包后放到bpRequester的block变量中,然后通过channel发送信号出去,等待接收信号的模块处理。
消息接收处理分析
上面发送的信号是在下面函数中接收到的:
// Responsible for making more requests as necessary
// Returns only when a block is found (e.g. AddBlock() is called)
func (bpr *bpRequester) requestRoutine() {
---------------------------------
case <-bpr.gotBlockCh:
// We got the block, now see if it's good.
select {
case <-bpr.pool.Quit:
bpr.Stop()
return
case <-bpr.Quit:
return
case <-bpr.redoCh:
bpr.reset()
continue OUTER_LOOP
}
这个函数我们在上面描述过,是bpRequester类最后启动的一个线程,但是从这里的代码看,似乎并没有处理这个数据。。。