[TOC]
共识模块功能分析
共识模块编号为7,功能描述如下:
consensus 模块负责维护节点的共识机制,包括区块打包、区块验证、共 识奖励计算、共识奖励分配、共识状态维护、交易手续费计算、作恶节点惩 罚功能。
区块打包
共识机制里讲了它的功能有区块打包,我们就先从这个功能看代码。 在文件 /Users/chandler/WorkSpace/APA/Codes/nuls/consensus-impl/src/main/java/io/nuls/consensus/event/handler/BlockEventHandler.java 中,定义了一个BlockEventHandler,这个类创建了BlockManager和NetworkService两个实体,在收到区块事件时会通过参数BlockEvent来获取区块信息,然后进行验证,然后通过类BlockBatchDownloadUtils下载区块,并通过下面函数出块:
blockCacheManager.addBlock(block,false,fromId);
类BlockBatchDownloadUtils定义了一系列的实现体,
private static final BlockBatchDownloadUtils INSTANCE = new BlockBatchDownloadUtils();
private EventBroadcaster eventBroadcaster = NulsContext.getServiceBean(EventBroadcaster.class);
private QueueService<String> queueService = new QueueService<>();
private BlockManager blockManager = BlockManager.getInstance();
private BlockService blockService = NulsContext.getServiceBean(BlockService.class);
private ReceivedTxCacheManager receivedTxCacheManager = ReceivedTxCacheManager.getInstance();
private ConfirmingTxCacheManager confirmingTxCacheManager = ConfirmingTxCacheManager.getInstance();
后面留意一下使用位置。 addBlock函数首先是对参数进行了一系列的校验,然后执行下面代码:
bifurcateProcessor.addHeader(block.getHeader());
if (bifurcateProcessor.getChainSize() == 1) {
try {
this.appravalBlock(block);
context.setBestBlock(block);
this.lastAppravedHash = block.getHeader().getHash().getDigestHex();
checkNextblock(block.getHeader().getHash().getDigestHex());
} catch (Exception e) {
confrimingBlockCacheManager.removeBlock(block.getHeader().getHash().getDigestHex());
blockCacheBuffer.cacheBlock(block);
return;
}
} else {
this.rollbackAppraval(block);
}
从代码中看,addHeader似乎就是在本地链中添加一个块,如果链的长度为1,那么就将当前块设置为创世块。 addHeader中调用了add函数来添加块,此函数在文件 /Users/chandler/WorkSpace/APA/Codes/nuls/consensus-impl/src/main/java/io/nuls/consensus/entity/block/BifurcateProcessor.java 中,实现如下:
private boolean add(BlockHeader header) {
for (int i = 0; i < this.chainList.size(); i++) {
BlockHeaderChain chain = chainList.get(i);
if (chain.contains(header)) {
return false;
}
}
for (int i = 0; i < this.chainList.size(); i++) {
BlockHeaderChain chain = chainList.get(i);
int index = chain.indexOf(header.getPreHash().getDigestHex(), header.getHeight() - 1);
if (index == chain.size() - 1) {
chain.addHeader(header);
return true;
} else if (index >= 0) {
BlockHeaderChain newChain = chain.getBifurcateChain(header);
chainList.add(newChain);
return true;
}
}
BlockHeaderChain chain = new BlockHeaderChain();
chain.addHeader(header);
chainList.add(chain);
return true;
}
此处代码看到的都是头,没有body的添加。并且看不到如何将这个块抛到网络中的部分。
消息通信
在/Users/chandler/WorkSpace/APA/Codes/nuls/consensus-impl/src/main/java/io/nuls/consensus/event/handler下有多个Handler处理事务,其中一个NewTxEventHandler看名称是处理消息的。它在收到事件后会通过下面代码将事件进行分发:
cacheManager.putTx(tx);
eventBroadcaster.broadcastHashAndCacheAysn(event, false, fromId);
函数最终会调用下面代码发送消息:
public void offer(String name, Object obj) {
Disruptor<DisruptorEvent> disruptor = DISRUPTOR_MAP.get(name);
AssertUtil.canNotEmpty(disruptor, "the disruptor is not exist!name:" + name);
RingBuffer<DisruptorEvent> ringBuffer = disruptor.getRingBuffer();
//请求下一个事件序号;
long sequence = ringBuffer.next();
try {
//获取该序号对应的事件对象;
DisruptorEvent event = ringBuffer.get(sequence);
event.setData(obj);
} catch (Exception e) {
Log.error(e);
} finally {
//发布事件;
ringBuffer.publish(sequence);
}
}
ringBuffer是java库 /Users/chandler/.m2/repository/com/lmax/disruptor/3.4.1/disruptor-3.4.1.jar!/com/lmax/disruptor/RingBuffer.class 来实现的,这个库的介绍如下:
Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。
是一个开源的java实现的并发框架,可以对消息进行分发。此开源架构有一个golang版本的,不过已经近三年未更新。
区块结构
区块类定义:
public class Block extends BaseNulsData implements NulsCloneable {
private BlockHeader header;
private List<Transaction> txs;
public Block() {
initValidators();
}
header存放数据为:
public class BlockHeader extends BaseNulsData {
private NulsDigestData hash;
private NulsDigestData preHash;
private NulsDigestData merkleHash;
private long time;
private long height;
private long txCount;
private String packingAddress;
private P2PKHScriptSig scriptSign;
private byte[] extend;
private int size;
txs存放数据为:
public abstract class Transaction<T extends BaseNulsData> extends BaseNulsData {
protected NulsDigestData hash;
protected int type;
protected int index;
protected long time;
protected long blockHeight;
protected Na fee;
protected byte[] remark;
private byte[] scriptSig;
protected T txData;
protected TxStatusEnum status = TxStatusEnum.CACHED;
public static final int TRANSFER_RECEIVE = 1;
public static final int TRANSFER_SEND = 0;
// when localTx is true, should care transferType
protected int transferType;
protected int size;
txs是一个列表,应该是账单的实体,那么就是一系列账单和一个header组成一个区块。那么组成区块的关键就在这个txs账单是如何获取并且组织的了。 Block被类 /Users/chandler/WorkSpace/APA/Codes/nuls/consensus-impl/src/main/java/io/nuls/consensus/entity/genesis/GenesisBlock.java 继承,生成一个通用的块。类图如下:
这个类在 /Users/chandler/WorkSpace/APA/Codes/nuls/consensus-impl/src/main/java/io/nuls/consensus/manager/ConsensusManager.java 中被实体化,我们追踪这个实体就可以知道账单和区块是怎么创建的了。
private void loadConfigration() {
Block bestBlock = null;
Block genesisBlock = GenesisBlock.getInstance();
NulsContext.getInstance().setGenesisBlock(genesisBlock);
try {
bestBlock = blockStorageService.getBlock(blockStorageService.getBestHeight());
} catch (Exception e) {
Log.error(e);
}
ConsensusManager类
ConsensusManager类是继承的线程类,因此它是被实现成一个线程体的。在初始化的时候创建了我们上面分析的区块体:
private void loadConfigration() {
Block bestBlock = null;
Block genesisBlock = GenesisBlock.getInstance();
NulsContext.getInstance().setGenesisBlock(genesisBlock);
在set里将创建的这个实体赋值给了Nuls的底层数据:
public void setGenesisBlock(Block block) {
this.genesisBlock = block;
}
底层实现:
public class NulsContext {
/**
* cache the best block
*/
private Block bestBlock;
private Block genesisBlock;
private Long netBestBlockHeight = 0L;
底层是基类,可以容纳上层的子类。 ConsensusManager类的init实现了大部分机制,然后它自己就执行退出了:
public void init() {
loadConfigration();
accountService = NulsContext.getServiceBean(AccountService.class);
if (this.partakePacking) {
//todo
}
this.temporaryCacheManager = TemporaryCacheManager.getInstance();
this.temporaryCacheManager.init();
this.blockCacheBuffer = BlockCacheBuffer.getInstance();
this.blockCacheBuffer.init();
this.confrimingBlockCacheManager = ConfrimingBlockCacheManager.getInstance();
this.confrimingBlockCacheManager.init();
blockCacheManager = BlockManager.getInstance();
blockCacheManager.init();
consensusCacheManager = ConsensusCacheManager.getInstance();
consensusCacheManager.init();
confirmingTxCacheManager = ConfirmingTxCacheManager.getInstance();
confirmingTxCacheManager.init();
receivedTxCacheManager = ReceivedTxCacheManager.getInstance();
receivedTxCacheManager.init();
orphanTxCacheManager = OrphanTxCacheManager.getInstance();
orphanTxCacheManager.init();
TaskManager.createAndRunThread(NulsConstant.MODULE_ID_CONSENSUS, "consensus-status-manager", this);
}
其中confirmingTxCacheManager和receivedTxCacheManager是两个和交易相关的类,我们看看它们的初始化部分: confirmingTxCacheManager比较简单,基本就是对交易的添加和移除,为后续打包块做准备的应该。 receivedTxCacheManager似乎实现了相同的功能,并没有对交易的获取部分,这部分可能是通过事件通信完成的,所以还需要研究下系统中事件是如何通信的。