Edit me

[TOC]

共识模块功能分析

共识模块编号为7,功能描述如下:

consensus 模块负责维护节点的共识机制,包括区块打包、区块验证、共 识奖励计算、共识奖励分配、共识状态维护、交易手续费计算、作恶节点惩 罚功能。

区块打包

共识机制里讲了它的功能有区块打包,我们就先从这个功能看代码。 在文件 /Users/chandler/WorkSpace/APA/Codes/nuls/consensus-impl/src/main/java/io/nuls/consensus/event/handler/BlockEventHandler.java 中,定义了一个BlockEventHandler,这个类创建了BlockManager和NetworkService两个实体,在收到区块事件时会通过参数BlockEvent来获取区块信息,然后进行验证,然后通过类BlockBatchDownloadUtils下载区块,并通过下面函数出块:

blockCacheManager.addBlock(block,false,fromId);

类BlockBatchDownloadUtils定义了一系列的实现体,

private static final BlockBatchDownloadUtils INSTANCE = new BlockBatchDownloadUtils();
private EventBroadcaster eventBroadcaster = NulsContext.getServiceBean(EventBroadcaster.class);
private QueueService<String> queueService = new QueueService<>();
private BlockManager blockManager = BlockManager.getInstance();
private BlockService blockService = NulsContext.getServiceBean(BlockService.class);


private ReceivedTxCacheManager receivedTxCacheManager = ReceivedTxCacheManager.getInstance();
private ConfirmingTxCacheManager confirmingTxCacheManager = ConfirmingTxCacheManager.getInstance();

后面留意一下使用位置。 addBlock函数首先是对参数进行了一系列的校验,然后执行下面代码:

bifurcateProcessor.addHeader(block.getHeader());
if (bifurcateProcessor.getChainSize() == 1) {
    try {
        this.appravalBlock(block);
        context.setBestBlock(block);
        this.lastAppravedHash = block.getHeader().getHash().getDigestHex();
        checkNextblock(block.getHeader().getHash().getDigestHex());
    } catch (Exception e) {
        confrimingBlockCacheManager.removeBlock(block.getHeader().getHash().getDigestHex());
        blockCacheBuffer.cacheBlock(block);
        return;
    }
} else {
    this.rollbackAppraval(block);
}

从代码中看,addHeader似乎就是在本地链中添加一个块,如果链的长度为1,那么就将当前块设置为创世块。 addHeader中调用了add函数来添加块,此函数在文件 /Users/chandler/WorkSpace/APA/Codes/nuls/consensus-impl/src/main/java/io/nuls/consensus/entity/block/BifurcateProcessor.java 中,实现如下:

private boolean add(BlockHeader header) {
    for (int i = 0; i < this.chainList.size(); i++) {
        BlockHeaderChain chain = chainList.get(i);
        if (chain.contains(header)) {
            return false;
        }
    }
    for (int i = 0; i < this.chainList.size(); i++) {
        BlockHeaderChain chain = chainList.get(i);
        int index = chain.indexOf(header.getPreHash().getDigestHex(), header.getHeight() - 1);
        if (index == chain.size() - 1) {
            chain.addHeader(header);
            return true;
        } else if (index >= 0) {
            BlockHeaderChain newChain = chain.getBifurcateChain(header);
            chainList.add(newChain);
            return true;
        }
    }
    BlockHeaderChain chain = new BlockHeaderChain();
    chain.addHeader(header);
    chainList.add(chain);
    return true;
}

此处代码看到的都是头,没有body的添加。并且看不到如何将这个块抛到网络中的部分。

消息通信

在/Users/chandler/WorkSpace/APA/Codes/nuls/consensus-impl/src/main/java/io/nuls/consensus/event/handler下有多个Handler处理事务,其中一个NewTxEventHandler看名称是处理消息的。它在收到事件后会通过下面代码将事件进行分发:

cacheManager.putTx(tx);
eventBroadcaster.broadcastHashAndCacheAysn(event, false, fromId);

函数最终会调用下面代码发送消息:

public void offer(String name, Object obj) {
    Disruptor<DisruptorEvent> disruptor = DISRUPTOR_MAP.get(name);
    AssertUtil.canNotEmpty(disruptor, "the disruptor is not exist!name:" + name);
    RingBuffer<DisruptorEvent> ringBuffer = disruptor.getRingBuffer();
    //请求下一个事件序号;
    long sequence = ringBuffer.next();
    try {
        //获取该序号对应的事件对象;
        DisruptorEvent event = ringBuffer.get(sequence);
        event.setData(obj);
    } catch (Exception e) {
        Log.error(e);
    } finally {
        //发布事件;
        ringBuffer.publish(sequence);
    }
}

ringBuffer是java库 /Users/chandler/.m2/repository/com/lmax/disruptor/3.4.1/disruptor-3.4.1.jar!/com/lmax/disruptor/RingBuffer.class 来实现的,这个库的介绍如下:

Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。

是一个开源的java实现的并发框架,可以对消息进行分发。此开源架构有一个golang版本的,不过已经近三年未更新。

区块结构

区块类定义:

public class Block extends BaseNulsData implements NulsCloneable {

    private BlockHeader header;

    private List<Transaction> txs;

    public Block() {
        initValidators();
    }

header存放数据为:

public class BlockHeader extends BaseNulsData {

    private NulsDigestData hash;
    private NulsDigestData preHash;
    private NulsDigestData merkleHash;

    private long time;

    private long height;

    private long txCount;

    private String packingAddress;

    private P2PKHScriptSig scriptSign;

    private byte[] extend;

    private int size;

txs存放数据为:

public abstract class Transaction<T extends BaseNulsData> extends BaseNulsData {

    protected NulsDigestData hash;

    protected int type;

    protected int index;

    protected long time;

    protected long blockHeight;

    protected Na fee;

    protected byte[] remark;

    private byte[] scriptSig;

    protected T txData;

    protected TxStatusEnum status = TxStatusEnum.CACHED;

    public static final int TRANSFER_RECEIVE = 1;
    public static final int TRANSFER_SEND = 0;
    // when localTx is true, should care transferType
    protected int transferType;

    protected int size;

txs是一个列表,应该是账单的实体,那么就是一系列账单和一个header组成一个区块。那么组成区块的关键就在这个txs账单是如何获取并且组织的了。 Block被类 /Users/chandler/WorkSpace/APA/Codes/nuls/consensus-impl/src/main/java/io/nuls/consensus/entity/genesis/GenesisBlock.java 继承,生成一个通用的块。类图如下: 03-1

这个类在 /Users/chandler/WorkSpace/APA/Codes/nuls/consensus-impl/src/main/java/io/nuls/consensus/manager/ConsensusManager.java 中被实体化,我们追踪这个实体就可以知道账单和区块是怎么创建的了。

private void loadConfigration() {
    Block bestBlock = null;
    Block genesisBlock = GenesisBlock.getInstance();
    NulsContext.getInstance().setGenesisBlock(genesisBlock);
    try {
        bestBlock = blockStorageService.getBlock(blockStorageService.getBestHeight());
    } catch (Exception e) {
        Log.error(e);
    }

ConsensusManager类

ConsensusManager类是继承的线程类,因此它是被实现成一个线程体的。在初始化的时候创建了我们上面分析的区块体:

private void loadConfigration() {
    Block bestBlock = null;
    Block genesisBlock = GenesisBlock.getInstance();
    NulsContext.getInstance().setGenesisBlock(genesisBlock);

在set里将创建的这个实体赋值给了Nuls的底层数据:

public void setGenesisBlock(Block block) {
    this.genesisBlock = block;
}

底层实现:

public class NulsContext {
/**
 * cache the best block
 */
private Block bestBlock;
private Block genesisBlock;
private Long netBestBlockHeight = 0L;

底层是基类,可以容纳上层的子类。 ConsensusManager类的init实现了大部分机制,然后它自己就执行退出了:

public void init() {
    loadConfigration();
    accountService = NulsContext.getServiceBean(AccountService.class);
    if (this.partakePacking) {
        //todo
    }
    this.temporaryCacheManager = TemporaryCacheManager.getInstance();
    this.temporaryCacheManager.init();
    this.blockCacheBuffer = BlockCacheBuffer.getInstance();
    this.blockCacheBuffer.init();
    this.confrimingBlockCacheManager = ConfrimingBlockCacheManager.getInstance();
    this.confrimingBlockCacheManager.init();

    blockCacheManager = BlockManager.getInstance();
    blockCacheManager.init();

    consensusCacheManager = ConsensusCacheManager.getInstance();
    consensusCacheManager.init();
    confirmingTxCacheManager = ConfirmingTxCacheManager.getInstance();
    confirmingTxCacheManager.init();
    receivedTxCacheManager = ReceivedTxCacheManager.getInstance();
    receivedTxCacheManager.init();
    orphanTxCacheManager = OrphanTxCacheManager.getInstance();
    orphanTxCacheManager.init();
    TaskManager.createAndRunThread(NulsConstant.MODULE_ID_CONSENSUS, "consensus-status-manager", this);
}

其中confirmingTxCacheManager和receivedTxCacheManager是两个和交易相关的类,我们看看它们的初始化部分: confirmingTxCacheManager比较简单,基本就是对交易的添加和移除,为后续打包块做准备的应该。 receivedTxCacheManager似乎实现了相同的功能,并没有对交易的获取部分,这部分可能是通过事件通信完成的,所以还需要研究下系统中事件是如何通信的。

Tags: