Skip to content

import

书接上回,在Aleth启动-eth服务中讲到import函数将新块进行验证并将通过验证的块存起来以供后续写入数据库。后续写入数据库的部分并未深入读,现在研究一下写入数据库这部分是怎么做的。

在import最后,区块通过验证之后,其执行了如下代码:

cpp
						// If valid, append to blocks.
            LOG(m_loggerDetail) << "OK - ready for chain insertion.";
            // 将其放入未验证块队列中
            DEV_GUARDED(m_verification)
                m_unverified.enqueue(UnverifiedBlock { h, bi.parentHash(), _block.toBytes() });
            // 通知等待该队列的任何线程
            m_moreToVerify.notify_one();
            // 将块的哈希值添加到已准备好的块集合中
            m_readySet.insert(h);
            // 增加总块难度
            m_difficulty += bi.difficulty();
            // 记录该块已准备好的信息
            noteReady_WITH_LOCK(h);

            return ImportResult::Success;

看下noteReady_WITH_LOCK这个函数。

noteReady_WITH_LOCK

cpp
// 接收到新来块的hash
void BlockQueue::noteReady_WITH_LOCK(h256 const& _good)
{
    DEV_INVARIANT_CHECK;
    list<h256> goodQueue(1, _good);
    bool notify = false;
    while (!goodQueue.empty())
    {
        h256 const parent = goodQueue.front();
        // 从未知块中拿到_good块的子块,再通过子块拿孙块,一直拿下去。。。
        // 拿到的块都放在了m_readySet中。
        vector<pair<h256, bytes>> const removed = m_unknown.removeByKeyEqual(parent);
        goodQueue.pop_front();
        for (auto& newReady: removed)
        {
            DEV_GUARDED(m_verification)
                m_unverified.enqueue(UnverifiedBlock { newReady.first, parent, move(newReady.second) });
            m_unknownSet.erase(newReady.first);
            m_readySet.insert(newReady.first);
            goodQueue.push_back(newReady.first);
            notify = true;
        }
    }
    if (notify)
        m_moreToVerify.notify_all();
}

这个函数从m_unknown中循环拿到了_good块的所有子孙块,并将这些块存入了m_readySet中。

最后调用notify_all方法通知线程池中的线程,告诉它们有新的待验证块可用。

下面看下哪里对m_readySet进行了处理。

下面BlockQueue::drain这个函数对m_readySet进行了处理。

cpp
// 这里输入的参数VerifiedBlocks& o_out很重要
// 因为是个指针,所以从m_verified取出的已验证的块就存在了o_out里面,供上层函数处理
void BlockQueue::drain(VerifiedBlocks& o_out, unsigned _max)
{
    DEV_WRITE_GUARDED(m_lock)
    {
        DEV_INVARIANT_CHECK;
        if (m_drainingSet.empty())
        {
            m_drainingDifficulty = 0;
            DEV_GUARDED(m_verification)
                o_out = m_verified.dequeueMultiple(min<unsigned>(_max, m_verified.count()));

            for (auto const& bs: o_out)
            {
                // TODO: @optimise use map<h256, bytes> rather than vector<bytes> & set<h256>.
                auto h = bs.verified.info.hash();
                // 将取出的块放入m_drainingSet中,代表这个块已经通过验证,可以存储了,所以放入drainingSet中用于清除
                m_drainingSet.insert(h);
                m_drainingDifficulty += bs.verified.info.difficulty();
                // 从m_readySet中删除块
                m_readySet.erase(h);
            }
        }
    }
    m_onBlocksDrained();
}

看下有哪个函数调用了BlockQueue::drain

分别是如下两个函数

cpp
tuple<ImportRoute, bool, unsigned> BlockChain::sync(
    BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max)
{
    //  _bq.tick(*this);

    VerifiedBlocks blocks;
    _bq.drain(blocks, _max);

    std::tuple<ImportRoute, h256s, unsigned> const importResult = sync(blocks, _stateDB);
    bool const moreBlocks = _bq.doneDrain(std::get<1>(importResult));
    return {std::get<0>(importResult), moreBlocks, std::get<2>(importResult)};
}

cpp
void Client::syncBlockQueue()
{
//  cdebug << "syncBlockQueue()";

    ImportRoute ir;
    h256s badBlockHashes;
    unsigned count;
    Timer t;

    // The verified blocks list needs to be a shared_ptr since we propagate them on the network
    // thread and import them into our local chain on the client thread.
    // 创建了一个空 VerifiedBlocks 对象
    std::shared_ptr<VerifiedBlocks> verifiedBlocks = std::make_shared<VerifiedBlocks>();
    // 从块队列 m_bq 中获取一定数量的块,然后将这些块的共识和状态验证后,将它们存储在一个 VerifiedBlocks 类型的共享指针 verifiedBlocks 中。
    // m_syncAmount = 50
    m_bq.drain(*verifiedBlocks, m_syncAmount);

    // Propagate new blocks to peers before importing them into the chain.
    auto h = m_host.lock();
    assert(h);  // capability is owned by Host and should be available for the duration of the
                // Client's lifetime
    // 将这些 verifiedBlocks 广播出去
    h->propagateNewBlocks(verifiedBlocks);

    // 将verifiedBlocks中的块同步到本地区块链中,如果发现有错误的块,则返回这些错误的块的哈希值列表。
    std::tie(ir, badBlockHashes, count) = bc().sync(*verifiedBlocks, m_stateDB);
    // 从 m_bq 中获取的所有已验证的块从队列中移除,并将队列中包含在 badBlockHashes 中的块排除在外。
    m_syncBlockQueue = m_bq.doneDrain(badBlockHashes);

    // 计算区块同步时间
    double elapsed = t.elapsed();

    if (count)
    {
        LOG(m_logger) << count << " blocks imported in " << unsigned(elapsed * 1000) << " ms ("
                      << (count / elapsed) << " blocks/s) in #" << bc().number();
    }

    // 调整同步区块的数量,以保持同步速度的稳定。
    if (elapsed > c_targetDurationS * 1.1 && count > c_syncMinBlockCount)
        m_syncAmount = max(c_syncMinBlockCount, count * 9 / 10);
    else if (count == m_syncAmount && elapsed < c_targetDurationS * 0.9 && m_syncAmount < c_syncMaxBlockCount)
        m_syncAmount = min(c_syncMaxBlockCount, m_syncAmount * 11 / 10 + 1);
    // 如果在同步过程中没有活动块,则直接返回。
    if (ir.liveBlocks.empty())
        return;
    // 向其他组件广播区块链变化的消息,并且将变化的数据更新到本地存储中。
    onChainChanged(ir);
}

第一个函数先不管,先看第二个函数:void Client::syncBlockQueue()

这个函数在之前提到的eth服务的死循环的doWork函数中会一直重复执行。

主要是这几句:

cpp
// The verified blocks list needs to be a shared_ptr since we propagate them on the network
// thread and import them into our local chain on the client thread.
// 创建了一个空 VerifiedBlocks 对象
std::shared_ptr<VerifiedBlocks> verifiedBlocks = std::make_shared<VerifiedBlocks>();
// 从块队列 m_bq 中获取一定数量的块,然后将这些块的共识和状态验证后,将它们存储在一个 VerifiedBlocks 类型的共享指针 verifiedBlocks 中。
// m_syncAmount = 50
m_bq.drain(*verifiedBlocks, m_syncAmount);

// Propagate new blocks to peers before importing them into the chain.
auto h = m_host.lock();
assert(h);  // capability is owned by Host and should be available for the duration of the
            // Client's lifetime
// 将这些 verifiedBlocks 广播出去
h->propagateNewBlocks(verifiedBlocks);

// 将verifiedBlocks中的块同步到本地区块链中,如果发现有错误的块,则返回这些错误的块的哈希值列表。
std::tie(ir, badBlockHashes, count) = bc().sync(*verifiedBlocks, m_stateDB);
// 从 m_bq 中获取的所有已验证的块从队列中移除,并将队列中包含在 badBlockHashes 中的块排除在外。
m_syncBlockQueue = m_bq.doneDrain(badBlockHashes);

先创建了一个空 VerifiedBlocks 对象,然后调用m_bq.drain(*verifiedBlocks, m_syncAmount);将拿到的已通过验证的块存入verifiedBlocks中。

在将这些块存入链之前,要先广播这些块。

然后这句话将其存入到本地区块链:std::tie(ir, badBlockHashes, count) = bc().sync(*verifiedBlocks, m_stateDB);

如果有些块存储失败,拿到这些块的哈希之后,然后调用doneDrain(badBlockHashes),对这些坏块进行进一步的处理。

所以需要重点看下bc().sync(*verifiedBlocks, m_stateDB);

bc().sync(*verifiedBlocks, m_stateDB)

cpp
tuple<ImportRoute, h256s, unsigned> BlockChain::sync(
    VerifiedBlocks const& _blocks, OverlayDB const& _stateDB)
{
    h256s fresh;
    h256s dead;
    Transactions goodTransactions;
    unsigned count = 0;
    h256s badBlockHashes;
    for (VerifiedBlock const& block : _blocks)
    {
        do {
            try
            {
                // Nonce & uncle nonces already verified in verification thread at this point.
                ImportRoute r;
                DEV_TIMED_ABOVE("Block import " + toString(block.verified.info.number()), 500)
                  // 🌟导入区块链
                    r = import(block.verified, _stateDB, (ImportRequirements::Everything & ~ImportRequirements::ValidSeal & ~ImportRequirements::CheckUncles) != 0);
                fresh += r.liveBlocks;
                dead += r.deadBlocks;
                goodTransactions.reserve(goodTransactions.size() + r.goodTransactions.size());
                std::move(std::begin(r.goodTransactions), std::end(r.goodTransactions), std::back_inserter(goodTransactions));
                ++count;
            }
            catch (dev::eth::AlreadyHaveBlock const&)
            {
                cwarn << "ODD: Import queue contains already imported block";
                continue;
            }
            catch (dev::eth::UnknownParent const&)
            {
                cwarn << "ODD: Import queue contains block with unknown parent.";// << LogTag::Error << boost::current_exception_diagnostic_information();
                // NOTE: don't reimport since the queue should guarantee everything in the right order.
                // Can't continue - chain bad.
                badBlockHashes.push_back(block.verified.info.hash());
            }
            catch (dev::eth::FutureTime const&)
            {
                cwarn << "ODD: Import queue contains a block with future time.";
                this_thread::sleep_for(chrono::seconds(1));
                continue;
            }
            catch (dev::eth::TransientError const&)
            {
                this_thread::sleep_for(chrono::milliseconds(100));
                continue;
            }
            catch (Exception& ex)
            {
                // cnote << "Exception while importing block. Someone (Jeff? That you?) seems to be giving us dodgy blocks!";// << LogTag::Error << diagnostic_information(ex);
                if (m_onBad)
                    m_onBad(ex);
                // NOTE: don't reimport since the queue should guarantee everything in the right order.
                // Can't continue - chain  bad.
                badBlockHashes.push_back(block.verified.info.hash());
            }
        } while (false);
    }
    return {ImportRoute{dead, fresh, goodTransactions}, badBlockHashes, count};
}

这个函数用for循环依次对每个块进行处理。

将区块导入链的函数调用为:

r = import(block.verified, _stateDB, (ImportRequirements::Everything & ~ImportRequirements::ValidSeal & ~ImportRequirements::CheckUncles) != 0);

看下这个函数干什么了。

cpp
ImportRoute BlockChain::import(VerifiedBlockRef const& _block, OverlayDB const& _db, bool _mustBeNew)
{
    //@tidy This is a behemoth of a method - could do to be split into a few smaller ones.

    ImportPerformanceLogger performanceLogger;

    // Check block doesn't already exist first!
    // 检查链上是否已经有此哈希的区块,已经有的话会抛出异常
    if (_mustBeNew)
        checkBlockIsNew(_block);

    // Work out its number as the parent's number + 1
    // 检查父区块是否已经有了,没有的话会抛出异常
    if (!isKnown(_block.info.parentHash(), false))  // doesn't have to be current.
    {
        LOG(m_logger) << _block.info.hash() << " : Unknown parent " << _block.info.parentHash();
        // We don't know the parent (yet) - discard for now. It'll get resent to us if we find out about its ancestry later on.
        BOOST_THROW_EXCEPTION(UnknownParent() << errinfo_hash256(_block.info.parentHash()));
    }

    // 拿到父区块的信息
    auto pd = details(_block.info.parentHash());
    if (!pd)
    {
        auto pdata = pd.rlp();
        LOG(m_loggerError) << "Details is returning false despite block known: " << RLP(pdata);
        auto parentBlock = block(_block.info.parentHash());
        LOG(m_loggerError) << "isKnown: " << isKnown(_block.info.parentHash());
        LOG(m_loggerError) << "last/number: " << m_lastBlockNumber << " " << m_lastBlockHash << " "
                           << _block.info.number();
        LOG(m_loggerError) << "Block: " << BlockHeader(&parentBlock);
        LOG(m_loggerError) << "RLP: " << RLP(parentBlock);
        LOG(m_loggerError) << "DATABASE CORRUPTION: CRITICAL FAILURE";
        exit(-1);
    }

    // 检查区块的时间戳是否是未来,如果是、且系统不允许未来块,则抛出异常
    checkBlockTimestamp(_block.info);

    // Verify parent-critical parts
    // “父级关键部分验证”
    verifyBlock(_block.block, m_onBad, ImportRequirements::InOrderChecks);

    LOG(m_loggerDetail) << "Attempting import of block " << _block.info.hash() << " (#"
                        << _block.info.number() << ") ...";

    performanceLogger.onStageFinished("preliminaryChecks");

    BlockReceipts br;
    u256 td;
    try
    {
        // Check transactions are valid and that they result in a state equivalent to our state_root.
        // Get total difficulty increase and update state, checking it.
        Block s(*this, _db);

        // enactOn做了三件事:对交易进行验证,检查交易是否合法。
        // 调用底层的 TrieDB 对象,执行每个交易。
        // 检查执行完交易后的状态是否符合预期,即与区块头中的 state_root 值相等。
        auto tdIncrease = s.enactOn(_block, *this);

        for (unsigned i = 0; i < s.pending().size(); ++i)
            br.receipts.push_back(s.receipt(i));

        s.cleanup();

        td = pd.totalDifficulty + tdIncrease;

        performanceLogger.onStageFinished("enactment");
    }
    catch (BadRoot& ex)
    {
        cwarn << "*** BadRoot error! Trying to import" << _block.info.hash() << "needed root"
              << *boost::get_error_info<errinfo_hash256>(ex);
        cwarn << _block.info;
        // Attempt in import later.
        BOOST_THROW_EXCEPTION(TransientError());
    }
    catch (Exception& ex)
    {
        addBlockInfo(ex, _block.info, _block.block.toBytes());
        throw;
    }

    // All ok - insert into DB
    // 通过RLP编码存储区块
    bytes const receipts = br.rlp();
    return insertBlockAndExtras(_block, ref(receipts), td, performanceLogger);
}

insertBlockAndExtras(_block, ref(receipts), td, performanceLogger);函数将区块写入数据库:

cpp
ImportRoute BlockChain::insertBlockAndExtras(VerifiedBlockRef const& _block, bytesConstRef _receipts, u256 const& _totalDifficulty, ImportPerformanceLogger& _performanceLogger)
{
    std::unique_ptr<db::WriteBatchFace> blocksWriteBatch = m_blocksDB->createWriteBatch();
    std::unique_ptr<db::WriteBatchFace> extrasWriteBatch = m_extrasDB->createWriteBatch();
    h256 newLastBlockHash = currentHash();
    unsigned newLastBlockNumber = number();

    try
    {
        // ensure parent is cached for later addition.
        // TODO: this is a bit horrible would be better refactored into an enveloping UpgradableGuard
        // together with an "ensureCachedWithUpdatableLock(l)" method.
        // This is safe in practice since the caches don't get flushed nearly often enough to be
        // done here.
        details(_block.info.parentHash());
        DEV_WRITE_GUARDED(x_details)
            m_details[_block.info.parentHash()].childHashes.push_back(_block.info.hash());

        _performanceLogger.onStageFinished("collation");

        blocksWriteBatch->insert(toSlice(_block.info.hash()), db::Slice(_block.block));
        DEV_READ_GUARDED(x_details)
        extrasWriteBatch->insert(toSlice(_block.info.parentHash(), ExtraDetails),
            (db::Slice)dev::ref(m_details[_block.info.parentHash()].rlp()));

        BlockDetails const details{static_cast<unsigned>(_block.info.number()), _totalDifficulty,
            _block.info.parentHash(), {} /* children */, _block.block.size()};
        extrasWriteBatch->insert(
            toSlice(_block.info.hash(), ExtraDetails), (db::Slice)dev::ref(details.rlp()));

        BlockLogBlooms blb;
        for (auto i: RLP(_receipts))
            blb.blooms.push_back(TransactionReceipt(i.data()).bloom());
        extrasWriteBatch->insert(
            toSlice(_block.info.hash(), ExtraLogBlooms), (db::Slice)dev::ref(blb.rlp()));

        extrasWriteBatch->insert(toSlice(_block.info.hash(), ExtraReceipts), (db::Slice)_receipts);

        _performanceLogger.onStageFinished("writing");
    }
    catch (Exception& ex)
    {
        addBlockInfo(ex, _block.info, _block.block.toBytes());
        throw;
    }

    h256s route;
    h256 common;
    bool isImportedAndBest = false;
    // This might be the new best block...
    h256 last = currentHash();
    if (_totalDifficulty > details(last).totalDifficulty || (m_sealEngine->chainParams().tieBreakingGas && 
        _totalDifficulty == details(last).totalDifficulty && _block.info.gasUsed() > info(last).gasUsed()))
    {
        // don't include bi.hash() in treeRoute, since it's not yet in details DB...
        // just tack it on afterwards.
        unsigned commonIndex;
        tie(route, common, commonIndex) = treeRoute(last, _block.info.parentHash());
        route.push_back(_block.info.hash());

        // Most of the time these two will be equal - only when we're doing a chain revert will they not be
        if (common != last)
            DEV_READ_GUARDED(x_lastBlockHash)
                clearCachesDuringChainReversion(number(common) + 1);

        // Go through ret backwards (i.e. from new head to common) until hash != last.parent and
        // update m_transactionAddresses, m_blockHashes
        for (auto i = route.rbegin(); i != route.rend() && *i != common; ++i)
        {
            BlockHeader tbi;
            if (*i == _block.info.hash())
                tbi = _block.info;
            else
                tbi = BlockHeader(block(*i));

            // Collate logs into blooms.
            h256s alteredBlooms;
            {
                LogBloom blockBloom = tbi.logBloom();
                blockBloom.shiftBloom<3>(sha3(tbi.author().ref()));

                // Pre-memoize everything we need before locking x_blocksBlooms
                for (unsigned level = 0, index = (unsigned)tbi.number(); level < c_bloomIndexLevels; level++, index /= c_bloomIndexSize)
                    blocksBlooms(chunkId(level, index / c_bloomIndexSize));

                WriteGuard l(x_blocksBlooms);
                for (unsigned level = 0, index = (unsigned)tbi.number(); level < c_bloomIndexLevels; level++, index /= c_bloomIndexSize)
                {
                    unsigned i = index / c_bloomIndexSize;
                    unsigned o = index % c_bloomIndexSize;
                    alteredBlooms.push_back(chunkId(level, i));
                    m_blocksBlooms[alteredBlooms.back()].blooms[o] |= blockBloom;
                }
            }
            // Collate transaction hashes and remember who they were.
            //h256s newTransactionAddresses;
            {
                bytes blockBytes;
                RLP blockRLP(*i == _block.info.hash() ? _block.block : &(blockBytes = block(*i)));
                TransactionAddress ta;
                ta.blockHash = tbi.hash();
                for (ta.index = 0; ta.index < blockRLP[1].itemCount(); ++ta.index)
                    extrasWriteBatch->insert(
                        toSlice(sha3(blockRLP[1][ta.index].data()), ExtraTransactionAddress),
                        (db::Slice)dev::ref(ta.rlp()));
            }

            // Update database with them.
            ReadGuard l1(x_blocksBlooms);
            for (auto const& h: alteredBlooms)
                extrasWriteBatch->insert(
                    toSlice(h, ExtraBlocksBlooms), (db::Slice)dev::ref(m_blocksBlooms[h].rlp()));
            extrasWriteBatch->insert(toSlice(h256(tbi.number()), ExtraBlockHash),
                (db::Slice)dev::ref(BlockHash(tbi.hash()).rlp()));
        }

        // FINALLY! change our best hash.
        {
            newLastBlockHash = _block.info.hash();
            newLastBlockNumber = (unsigned)_block.info.number();
            isImportedAndBest = true;
        }

        LOG(m_logger) << "   Imported and best " << _totalDifficulty << " (#"
                      << _block.info.number() << "). Has "
                      << (details(_block.info.parentHash()).childHashes.size() - 1)
                      << " siblings. Route: " << route;
    }
    else
    {
        LOG(m_loggerDetail) << "   Imported but not best (oTD: " << details(last).totalDifficulty
                            << " > TD: " << _totalDifficulty << "; " << details(last).number << ".."
                            << _block.info.number() << ")";
    }

    try
    {
        m_blocksDB->commit(std::move(blocksWriteBatch));
    }
    catch (boost::exception& ex)
    {
        cwarn << "Error writing to blockchain database: " << boost::diagnostic_information(ex);
        cwarn << "Fail writing to blockchain database. Bombing out.";
        exit(-1);
    }

    try
    {
        m_extrasDB->commit(std::move(extrasWriteBatch));
    }
    catch (boost::exception& ex)
    {
        cwarn << "Error writing to extras database: " << boost::diagnostic_information(ex);
        cwarn << "Fail writing to extras database. Bombing out.";
        exit(-1);
    }
    if (m_lastBlockHash != newLastBlockHash)
        DEV_WRITE_GUARDED(x_lastBlockHash)
        {
            m_lastBlockHash = newLastBlockHash;
            m_lastBlockNumber = newLastBlockNumber;
            try
            {
                m_extrasDB->insert(db::Slice("best"), db::Slice((char const*)&m_lastBlockHash, 32));
            }
            catch (boost::exception const& ex)
            {
                cwarn << "Error writing to extras database: " << boost::diagnostic_information(ex);
                cout << "Put" << toHex(bytesConstRef(db::Slice("best"))) << "=>"
                     << toHex(bytesConstRef(db::Slice((char const*)&m_lastBlockHash, 32)));
                cwarn << "Fail writing to extras database. Bombing out.";
                exit(-1);
            }
        }

    _performanceLogger.onStageFinished("checkBest");

    unsigned const gasPerSecond = static_cast<double>(_block.info.gasUsed()) / _performanceLogger.stageDuration("enactment");
    _performanceLogger.onFinished({
        {"blockHash", "\""+ _block.info.hash().abridged() + "\""},
        {"blockNumber", toString(_block.info.number())},
        {"gasPerSecond", toString(gasPerSecond)},
        {"transactions", toString(_block.transactions.size())},
        {"gasUsed", toString(_block.info.gasUsed())}
    });

    if (!route.empty())
        noteCanonChanged();

    if (isImportedAndBest && m_onBlockImport)
        m_onBlockImport(_block.info);

    h256s fresh;
    h256s dead;
    bool isOld = true;
    for (auto const& h: route)
        if (h == common)
            isOld = false;
        else if (isOld)
            dead.push_back(h);
        else
            fresh.push_back(h);
    return ImportRoute{dead, fresh, _block.transactions};
}

它用于将新的块添加到以太坊区块链上,同时在数据库中添加新块的附加信息。该函数包含以下步骤:

  1. 创建用于块和附加信息写入的两个 db::WriteBatchFace 实例。
  2. 确保父块已经缓存,以便稍后添加。同时将新块添加到父块的子列表中。
  3. 将新块的块数据插入块数据库中。
  4. 将父块的块详细信息插入附加数据库中。
  5. 将新块的详细信息插入附加数据库中。
  6. 将新块的日志 Bloom 过滤器插入附加数据库中。
  7. 将新块的交易收据插入附加数据库中。
  8. 如果该新块的难度比当前最后一个块的难度更高,则更新块链并记录路由信息。
  9. 更新数据库中的交易地址信息和块哈希信息。

注:在上述步骤中,dev::WriteGuard 等是以太坊客户端特定的工具和类型。