Edit me

[TOC]

账单检索

首先是 /home/pct/go/src/github.com/Baptist-Publication/chorus/src/chain/cmd/run.go 文件的init函数通过执行run命令启动节点:

Run: func(cmd *cobra.Command, args []string) {
   env := viper.GetString("environment")
   logpath := viper.GetString("log_path")
   if logpath == "" {
      var err error
      if logpath, err = os.Getwd(); err != nil {
         cmd.Println(err)
         os.Exit(1)
      }
   }
   viper.Set("log_path", logpath)
   logger := log.Initialize(env, path.Join(logpath, "node.output.log"), path.Join(logpath, "node.err.log"))
   node.RunNode(logger, viper.GetViper())
},

RunNode函数会创建一个Node对象,并且在NewNode中创建共识引擎:

func NewNode(logger *zap.Logger, conf *viper.Viper) *Node {
   aConf := ac.GetConfig(conf.GetString("runtime"))
   for k, v := range conf.AllSettings() {
      aConf.Set(k, v)
   }

   metropolis := NewMetropolis(logger, aConf)
   metroAngine := angine.NewAngine(logger, &angine.Tunes{Conf: aConf})

NewAngine函数会调用下面代码启动一个状态机:

if gotGenesis {
   angine.assembleStateMachine(stateM)
}

然后通过一个匿名函数启动块的部分:

bcReactor.SetBlockExecuter(func(blk *agtypes.BlockCache, pst *agtypes.PartSet, c *agtypes.CommitCache) error {
   blockStore.SaveBlock(blk, pst, c)
   if err := stateM.ApplyBlock(*ang.eventSwitch, blk, pst.Header(), MockMempool{}, -1); err != nil {
      return err
   }
   stateM.Save()
   return nil
})

在这里,这个回调函数根据我们的断点测试是没有运行的,它的执行流程见下面的部分,在调用这个回调时,需要传递一个block参数,这个参数中已经包含了账单信息,然后会通过下面代码更新内存池,作用是将已经打包的账单从内存池删除,避免下次再打包进去:

// lock mempool, commit state, update mempoool
err = s.CommitStateUpdateMempool(eventSwitch, block, mempool, round)

那么block是怎么获取到账单信息的呢?

出块函数被调用流程

我们需要记住一个关键函数:assembleStateMachine,它是在Angine被创建时调用的,它做了许多关键的事情,包括上面的设置出块回调,以及创建共识线程,内存池线程等:

func (ang *Angine) assembleStateMachine(stateM *state.State) {
   conf := ang.tune.Conf

   fastSync := fastSyncable(conf, ang.privValidator.GetAddress(), stateM.Validators)
   stateM.SetLogger(ang.logger)

   blockStore := blockchain.NewBlockStore(ang.dbs["blockstore"], ang.dbs["archive"])
   _, stateLastHeight, _ := stateM.GetLastBlockInfo()
   bcReactor := blockchain.NewBlockchainReactor(ang.logger, conf, stateLastHeight, blockStore, fastSync, ang.dataArchive)
   mem := mempool.NewMempool(ang.logger, conf)
   memReactor := mempool.NewMempoolReactor(ang.logger, conf, mem)

其中NewBlockchainReactor创建BlockPool和Ractor: /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/blockchain/reactor.go

pool := NewBlockPool(
   logger,
   store.Height()+1,
   requestsCh,
   timeoutsCh,
)
bcR := &BlockchainReactor{
   config:     config,
   store:      store,
   pool:       pool,
   fastSync:   fastSync,
   requestsCh: requestsCh,
   timeoutsCh: timeoutsCh,
   archive:    arch,
   logger:     logger,

   closeArchive: make(chan struct{}, 1),
}
bcR.BaseReactor = *p2p.NewBaseReactor(logger, "BlockchainReactor", bcR)

然后在这个文件中OnStart方法会启动Pool:

func (bcR *BlockchainReactor) OnStart() error {

   bcR.BaseReactor.OnStart()
   if bcR.archive.Threshold > 0 && bcR.archive.Threshold < math.MaxInt64/agtypes.INT(time.Second) {
      go bcR.BlockArchive()
   } else {
      bcR.logger.Warn("invalid archive.Threshold", zap.Int64("archive_threshold", bcR.archive.Threshold))
   }

   if bcR.fastSync {
      _, err := bcR.pool.Start()
      if err != nil {
         return err
      }
      go bcR.poolRoutine()
   }
   return nil
}

只是这个OnStart是在何处被调用需要查找代码,我们先往下追踪:

// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
// (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
func (bcR *BlockchainReactor) poolRoutine() {

poolRoutine函数是比较重要的函数,它会根据定时器来调用我们上面设置好的出块回调bcR.blockExecuter: /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/blockchain/reactor.go

case _ = <-trySyncTicker.C: // chan time
   // This loop can be slow as long as it's doing syncing work.
SYNC_LOOP:
   for i := 0; i < 10; i++ {
      // See if there are any blocks to sync.
      first, second := bcR.pool.PeekTwoBlocks()
      if first == nil || second == nil {
         // We need both to sync the first block.
         break SYNC_LOOP
      }
      firstParts := first.MakePartSet(bcR.config.GetInt64("block_part_size")) // TODO: put part size in parts header?
      firstPartsHeader := firstParts.Header()
      // Finally, verify the first block using the second's commit
      // NOTE: we can probably make this more efficient, but note that calling
      // first.Hash() doesn't verify the tx contents, so MakePartSet() is
      // currently necessary.
      if err := bcR.blockVerifier(pbtypes.BlockID{Hash: first.Hash(), PartsHeader: firstPartsHeader}, first.Header.Height, second.CommitCache()); err != nil {
         bcR.logger.Error("error in validation", zap.String("error", err.Error()))
         bcR.pool.RedoRequest(first.Header.Height)
         break SYNC_LOOP
      } else {
         bcR.pool.PopRequest()
         if err := bcR.blockExecuter(first, firstParts, second.CommitCache()); err != nil {
            // TODO This is bad, are we zombie?
            PanicQ(Fmt("Failed to process committed block (%d:%X): %v", first.Header.Height, first.Hash(), err))
         }
      }
   }

好的,我们在这边打断点看看是否会执行到此处,经测试,函数会首先进入一次分支:

// Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
if err := bcR.blockVerifier(pbtypes.BlockID{Hash: first.Hash(), PartsHeader: firstPartsHeader}, first.Header.Height, second.CommitCache()); err != nil {
   bcR.logger.Error("error in validation", zap.String("error", err.Error()))
   bcR.pool.RedoRequest(first.Header.Height)
   break SYNC_LOOP
}

然后函数在下面代码中循环:

SYNC_LOOP:
   for i := 0; i < 10; i++ {
      // See if there are any blocks to sync.
      first, second := bcR.pool.PeekTwoBlocks()
      if first == nil || second == nil {
         // We need both to sync the first block.
         break SYNC_LOOP
      }

也就是说出块函数一直没有被执行,它怎么才能被执行后面再去细看,现在先理清账单的获取流程。

账单获取

如果函数正确执行到代码分支:

if err := bcR.blockExecuter(first, firstParts, second.CommitCache()); err != nil {

那么,第一个参数就包含了账单信息,可以通过调用block.GetData().ExTxs来获取,此函数会执行账单检索中的最后一个函数来更新内存池,以删除打包的账单:

// lock mempool, commit state, update mempoool
err = s.CommitStateUpdateMempool(eventSwitch, block, mempool, round)

这个参数是通过下面代码获取的:

// See if there are any blocks to sync.
first, second := bcR.pool.PeekTwoBlocks()
if first == nil || second == nil {
   // We need both to sync the first block.
   break SYNC_LOOP
}

它返回的是BlockPool的一个map变量requesters的子成员*agtypes.BlockCache,我们现在需要关心的是requester中的数据是怎么来的。关于Requester的解读见章节10。

断点测试

我们在上面的函数中依次打入断点查看运行情况:

pct@Chandler:~/go/src/github.com/Baptist-Publication/chorus/src/chain$ dlv debug main.go 
Type 'help' for list of commands.
(dlv) b run.go:48
Breakpoint 1 set at 0xed8e8c for github.com/Baptist-Publication/chorus/src/chain/cmd.glob..func5() ./cmd/run.go:48
(dlv) b node/node.go:57
Breakpoint 2 set at 0xebc3c4 for github.com/Baptist-Publication/chorus/src/chain/node.NewNode() ./node/node.go:57
(dlv) b angine.go:248
Breakpoint 3 set at 0xcfe674 for github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine.NewAngine() /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/angine.go:248
(dlv) b angine.go:265
Breakpoint 4 set at 0xd0e30e for github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine.NewAngine.func2() /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/angine.go:265
(dlv) b angine.go:300
Breakpoint 5 set at 0xcfeebd for github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine.(*Angine).assembleStateMachine() /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/angine.go:300
(dlv) b /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/state/execution.go:195
Breakpoint 6 set at 0xc5c59a for github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/state.(*State).ApplyBlock() /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/state/execution.go:195
(dlv) r run
Process restarted with PID 30782
(dlv) c
Using config file: /home/pct/.chorus.toml
> github.com/Baptist-Publication/chorus/src/chain/cmd.glob..func5() ./cmd/run.go:48 (hits goroutine(1):1 total:1) (PC: 0xed8e8c)
Warning: debugging optimized function
    43:                    os.Exit(1)
    44:                }
    45:            }
    46:            viper.Set("log_path", logpath)
    47:            logger := log.Initialize(env, path.Join(logpath, "node.output.log"), path.Join(logpath, "node.err.log"))
=>  48:            node.RunNode(logger, viper.GetViper())
    49:        },
    50:    }
    51:    
    52:    func init() {
    53:        RootCmd.AddCommand(runCmd)
(dlv) c
> github.com/Baptist-Publication/chorus/src/chain/node.NewNode() ./node/node.go:57 (hits goroutine(1):1 total:1) (PC: 0xebc3c4)
Warning: debugging optimized function
    52:        for k, v := range conf.AllSettings() {
    53:            aConf.Set(k, v)
    54:        }
    55:    
    56:        metropolis := NewMetropolis(logger, aConf)
=>  57:        metroAngine := angine.NewAngine(logger, &angine.Tunes{Conf: aConf})
    58:        tune := metroAngine.Tune
    59:        if err := metroAngine.ConnectApp(metropolis); err != nil {
    60:            cmn.PanicCrisis(err)
    61:        }
    62:    
(dlv) c
> github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine.NewAngine() /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/angine.go:248 (hits goroutine(1):1 total:1) (PC: 0xcfe674)
Warning: debugging optimized function
   243:    
   244:            logger: logger,
   245:        }
   246:    
   247:        if gotGenesis {
=> 248:            angine.assembleStateMachine(stateM)
   249:        } else if !angine.conf.GetBool("enable_incentive") {
   250:            p2psw.SetGenesisUnmarshal(func(b []byte) (err error) {
   251:                defer func() {
   252:                    if e := recover(); e != nil {
   253:                        err = errors.Errorf("%v", e)
(dlv) c
> github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine.(*Angine).assembleStateMachine() /home/pct/go/src/github.com/Baptist-Publication/chorus/vendor/github.com/Baptist-Publication/angine/angine.go:300 (hits goroutine(1):1 total:1) (PC: 0xcfeebd)
Warning: debugging optimized function
   295:        consensusState.BindReactor(consensusReactor)
   296:    
   297:        bcReactor.SetBlockVerifier(func(bID pbtypes.BlockID, h agtypes.INT, lc *agtypes.CommitCache) error {
   298:            return stateM.Validators.VerifyCommit(stateM.ChainID, bID, h, lc)
   299:        })
=> 300:        bcReactor.SetBlockExecuter(func(blk *agtypes.BlockCache, pst *agtypes.PartSet, c *agtypes.CommitCache) error {
   301:            blockStore.SaveBlock(blk, pst, c)
   302:            if err := stateM.ApplyBlock(*ang.eventSwitch, blk, pst.Header(), MockMempool{}, -1); err != nil {
   303:                return err
   304:            }
   305:            stateM.Save()
(dlv) c

从断点执行情况看,最后的出块函数没有执行,它是作为匿名函数当做变量赋值给了下面函数:

func (bcR *BlockchainReactor) SetBlockExecuter(x BlockExecuterFunc) {
   bcR.blockExecuter = x
}

我们需要看一下这个回调是在什么时候被调用,为什么没有调用到。

Tags: