import
书接上回,在Aleth启动-eth服务中讲到import函数将新块进行验证并将通过验证的块存起来以供后续写入数据库。后续写入数据库的部分并未深入读,现在研究一下写入数据库这部分是怎么做的。
在import最后,区块通过验证之后,其执行了如下代码:
// If valid, append to blocks.
LOG(m_loggerDetail) << "OK - ready for chain insertion.";
// 将其放入未验证块队列中
DEV_GUARDED(m_verification)
m_unverified.enqueue(UnverifiedBlock { h, bi.parentHash(), _block.toBytes() });
// 通知等待该队列的任何线程
m_moreToVerify.notify_one();
// 将块的哈希值添加到已准备好的块集合中
m_readySet.insert(h);
// 增加总块难度
m_difficulty += bi.difficulty();
// 记录该块已准备好的信息
noteReady_WITH_LOCK(h);
return ImportResult::Success;
看下noteReady_WITH_LOCK这个函数。
noteReady_WITH_LOCK
// 接收到新来块的hash
void BlockQueue::noteReady_WITH_LOCK(h256 const& _good)
{
DEV_INVARIANT_CHECK;
list<h256> goodQueue(1, _good);
bool notify = false;
while (!goodQueue.empty())
{
h256 const parent = goodQueue.front();
// 从未知块中拿到_good块的子块,再通过子块拿孙块,一直拿下去。。。
// 拿到的块都放在了m_readySet中。
vector<pair<h256, bytes>> const removed = m_unknown.removeByKeyEqual(parent);
goodQueue.pop_front();
for (auto& newReady: removed)
{
DEV_GUARDED(m_verification)
m_unverified.enqueue(UnverifiedBlock { newReady.first, parent, move(newReady.second) });
m_unknownSet.erase(newReady.first);
m_readySet.insert(newReady.first);
goodQueue.push_back(newReady.first);
notify = true;
}
}
if (notify)
m_moreToVerify.notify_all();
}
这个函数从m_unknown中循环拿到了_good块的所有子孙块,并将这些块存入了m_readySet中。
最后调用notify_all
方法通知线程池中的线程,告诉它们有新的待验证块可用。
下面看下哪里对m_readySet进行了处理。
下面BlockQueue::drain这个函数对m_readySet进行了处理。
// 这里输入的参数VerifiedBlocks& o_out很重要
// 因为是个指针,所以从m_verified取出的已验证的块就存在了o_out里面,供上层函数处理
void BlockQueue::drain(VerifiedBlocks& o_out, unsigned _max)
{
DEV_WRITE_GUARDED(m_lock)
{
DEV_INVARIANT_CHECK;
if (m_drainingSet.empty())
{
m_drainingDifficulty = 0;
DEV_GUARDED(m_verification)
o_out = m_verified.dequeueMultiple(min<unsigned>(_max, m_verified.count()));
for (auto const& bs: o_out)
{
// TODO: @optimise use map<h256, bytes> rather than vector<bytes> & set<h256>.
auto h = bs.verified.info.hash();
// 将取出的块放入m_drainingSet中,代表这个块已经通过验证,可以存储了,所以放入drainingSet中用于清除
m_drainingSet.insert(h);
m_drainingDifficulty += bs.verified.info.difficulty();
// 从m_readySet中删除块
m_readySet.erase(h);
}
}
}
m_onBlocksDrained();
}
看下有哪个函数调用了BlockQueue::drain
分别是如下两个函数
tuple<ImportRoute, bool, unsigned> BlockChain::sync(
BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max)
{
// _bq.tick(*this);
VerifiedBlocks blocks;
_bq.drain(blocks, _max);
std::tuple<ImportRoute, h256s, unsigned> const importResult = sync(blocks, _stateDB);
bool const moreBlocks = _bq.doneDrain(std::get<1>(importResult));
return {std::get<0>(importResult), moreBlocks, std::get<2>(importResult)};
}
和
void Client::syncBlockQueue()
{
// cdebug << "syncBlockQueue()";
ImportRoute ir;
h256s badBlockHashes;
unsigned count;
Timer t;
// The verified blocks list needs to be a shared_ptr since we propagate them on the network
// thread and import them into our local chain on the client thread.
// 创建了一个空 VerifiedBlocks 对象
std::shared_ptr<VerifiedBlocks> verifiedBlocks = std::make_shared<VerifiedBlocks>();
// 从块队列 m_bq 中获取一定数量的块,然后将这些块的共识和状态验证后,将它们存储在一个 VerifiedBlocks 类型的共享指针 verifiedBlocks 中。
// m_syncAmount = 50
m_bq.drain(*verifiedBlocks, m_syncAmount);
// Propagate new blocks to peers before importing them into the chain.
auto h = m_host.lock();
assert(h); // capability is owned by Host and should be available for the duration of the
// Client's lifetime
// 将这些 verifiedBlocks 广播出去
h->propagateNewBlocks(verifiedBlocks);
// 将verifiedBlocks中的块同步到本地区块链中,如果发现有错误的块,则返回这些错误的块的哈希值列表。
std::tie(ir, badBlockHashes, count) = bc().sync(*verifiedBlocks, m_stateDB);
// 从 m_bq 中获取的所有已验证的块从队列中移除,并将队列中包含在 badBlockHashes 中的块排除在外。
m_syncBlockQueue = m_bq.doneDrain(badBlockHashes);
// 计算区块同步时间
double elapsed = t.elapsed();
if (count)
{
LOG(m_logger) << count << " blocks imported in " << unsigned(elapsed * 1000) << " ms ("
<< (count / elapsed) << " blocks/s) in #" << bc().number();
}
// 调整同步区块的数量,以保持同步速度的稳定。
if (elapsed > c_targetDurationS * 1.1 && count > c_syncMinBlockCount)
m_syncAmount = max(c_syncMinBlockCount, count * 9 / 10);
else if (count == m_syncAmount && elapsed < c_targetDurationS * 0.9 && m_syncAmount < c_syncMaxBlockCount)
m_syncAmount = min(c_syncMaxBlockCount, m_syncAmount * 11 / 10 + 1);
// 如果在同步过程中没有活动块,则直接返回。
if (ir.liveBlocks.empty())
return;
// 向其他组件广播区块链变化的消息,并且将变化的数据更新到本地存储中。
onChainChanged(ir);
}
第一个函数先不管,先看第二个函数:void Client::syncBlockQueue()
这个函数在之前提到的eth服务的死循环的doWork函数中会一直重复执行。
主要是这几句:
// The verified blocks list needs to be a shared_ptr since we propagate them on the network
// thread and import them into our local chain on the client thread.
// 创建了一个空 VerifiedBlocks 对象
std::shared_ptr<VerifiedBlocks> verifiedBlocks = std::make_shared<VerifiedBlocks>();
// 从块队列 m_bq 中获取一定数量的块,然后将这些块的共识和状态验证后,将它们存储在一个 VerifiedBlocks 类型的共享指针 verifiedBlocks 中。
// m_syncAmount = 50
m_bq.drain(*verifiedBlocks, m_syncAmount);
// Propagate new blocks to peers before importing them into the chain.
auto h = m_host.lock();
assert(h); // capability is owned by Host and should be available for the duration of the
// Client's lifetime
// 将这些 verifiedBlocks 广播出去
h->propagateNewBlocks(verifiedBlocks);
// 将verifiedBlocks中的块同步到本地区块链中,如果发现有错误的块,则返回这些错误的块的哈希值列表。
std::tie(ir, badBlockHashes, count) = bc().sync(*verifiedBlocks, m_stateDB);
// 从 m_bq 中获取的所有已验证的块从队列中移除,并将队列中包含在 badBlockHashes 中的块排除在外。
m_syncBlockQueue = m_bq.doneDrain(badBlockHashes);
先创建了一个空 VerifiedBlocks 对象,然后调用m_bq.drain(*verifiedBlocks, m_syncAmount);将拿到的已通过验证的块存入verifiedBlocks中。
在将这些块存入链之前,要先广播这些块。
然后这句话将其存入到本地区块链:std::tie(ir, badBlockHashes, count) = bc().sync(*verifiedBlocks, m_stateDB);
如果有些块存储失败,拿到这些块的哈希之后,然后调用doneDrain(badBlockHashes),对这些坏块进行进一步的处理。
所以需要重点看下bc().sync(*verifiedBlocks, m_stateDB);
bc().sync(*verifiedBlocks, m_stateDB)
tuple<ImportRoute, h256s, unsigned> BlockChain::sync(
VerifiedBlocks const& _blocks, OverlayDB const& _stateDB)
{
h256s fresh;
h256s dead;
Transactions goodTransactions;
unsigned count = 0;
h256s badBlockHashes;
for (VerifiedBlock const& block : _blocks)
{
do {
try
{
// Nonce & uncle nonces already verified in verification thread at this point.
ImportRoute r;
DEV_TIMED_ABOVE("Block import " + toString(block.verified.info.number()), 500)
// 🌟导入区块链
r = import(block.verified, _stateDB, (ImportRequirements::Everything & ~ImportRequirements::ValidSeal & ~ImportRequirements::CheckUncles) != 0);
fresh += r.liveBlocks;
dead += r.deadBlocks;
goodTransactions.reserve(goodTransactions.size() + r.goodTransactions.size());
std::move(std::begin(r.goodTransactions), std::end(r.goodTransactions), std::back_inserter(goodTransactions));
++count;
}
catch (dev::eth::AlreadyHaveBlock const&)
{
cwarn << "ODD: Import queue contains already imported block";
continue;
}
catch (dev::eth::UnknownParent const&)
{
cwarn << "ODD: Import queue contains block with unknown parent.";// << LogTag::Error << boost::current_exception_diagnostic_information();
// NOTE: don't reimport since the queue should guarantee everything in the right order.
// Can't continue - chain bad.
badBlockHashes.push_back(block.verified.info.hash());
}
catch (dev::eth::FutureTime const&)
{
cwarn << "ODD: Import queue contains a block with future time.";
this_thread::sleep_for(chrono::seconds(1));
continue;
}
catch (dev::eth::TransientError const&)
{
this_thread::sleep_for(chrono::milliseconds(100));
continue;
}
catch (Exception& ex)
{
// cnote << "Exception while importing block. Someone (Jeff? That you?) seems to be giving us dodgy blocks!";// << LogTag::Error << diagnostic_information(ex);
if (m_onBad)
m_onBad(ex);
// NOTE: don't reimport since the queue should guarantee everything in the right order.
// Can't continue - chain bad.
badBlockHashes.push_back(block.verified.info.hash());
}
} while (false);
}
return {ImportRoute{dead, fresh, goodTransactions}, badBlockHashes, count};
}
这个函数用for循环依次对每个块进行处理。
将区块导入链的函数调用为:
r = import(block.verified, _stateDB, (ImportRequirements::Everything & ~ImportRequirements::ValidSeal & ~ImportRequirements::CheckUncles) != 0);
看下这个函数干什么了。
ImportRoute BlockChain::import(VerifiedBlockRef const& _block, OverlayDB const& _db, bool _mustBeNew)
{
//@tidy This is a behemoth of a method - could do to be split into a few smaller ones.
ImportPerformanceLogger performanceLogger;
// Check block doesn't already exist first!
// 检查链上是否已经有此哈希的区块,已经有的话会抛出异常
if (_mustBeNew)
checkBlockIsNew(_block);
// Work out its number as the parent's number + 1
// 检查父区块是否已经有了,没有的话会抛出异常
if (!isKnown(_block.info.parentHash(), false)) // doesn't have to be current.
{
LOG(m_logger) << _block.info.hash() << " : Unknown parent " << _block.info.parentHash();
// We don't know the parent (yet) - discard for now. It'll get resent to us if we find out about its ancestry later on.
BOOST_THROW_EXCEPTION(UnknownParent() << errinfo_hash256(_block.info.parentHash()));
}
// 拿到父区块的信息
auto pd = details(_block.info.parentHash());
if (!pd)
{
auto pdata = pd.rlp();
LOG(m_loggerError) << "Details is returning false despite block known: " << RLP(pdata);
auto parentBlock = block(_block.info.parentHash());
LOG(m_loggerError) << "isKnown: " << isKnown(_block.info.parentHash());
LOG(m_loggerError) << "last/number: " << m_lastBlockNumber << " " << m_lastBlockHash << " "
<< _block.info.number();
LOG(m_loggerError) << "Block: " << BlockHeader(&parentBlock);
LOG(m_loggerError) << "RLP: " << RLP(parentBlock);
LOG(m_loggerError) << "DATABASE CORRUPTION: CRITICAL FAILURE";
exit(-1);
}
// 检查区块的时间戳是否是未来,如果是、且系统不允许未来块,则抛出异常
checkBlockTimestamp(_block.info);
// Verify parent-critical parts
// “父级关键部分验证”
verifyBlock(_block.block, m_onBad, ImportRequirements::InOrderChecks);
LOG(m_loggerDetail) << "Attempting import of block " << _block.info.hash() << " (#"
<< _block.info.number() << ") ...";
performanceLogger.onStageFinished("preliminaryChecks");
BlockReceipts br;
u256 td;
try
{
// Check transactions are valid and that they result in a state equivalent to our state_root.
// Get total difficulty increase and update state, checking it.
Block s(*this, _db);
// enactOn做了三件事:对交易进行验证,检查交易是否合法。
// 调用底层的 TrieDB 对象,执行每个交易。
// 检查执行完交易后的状态是否符合预期,即与区块头中的 state_root 值相等。
auto tdIncrease = s.enactOn(_block, *this);
for (unsigned i = 0; i < s.pending().size(); ++i)
br.receipts.push_back(s.receipt(i));
s.cleanup();
td = pd.totalDifficulty + tdIncrease;
performanceLogger.onStageFinished("enactment");
}
catch (BadRoot& ex)
{
cwarn << "*** BadRoot error! Trying to import" << _block.info.hash() << "needed root"
<< *boost::get_error_info<errinfo_hash256>(ex);
cwarn << _block.info;
// Attempt in import later.
BOOST_THROW_EXCEPTION(TransientError());
}
catch (Exception& ex)
{
addBlockInfo(ex, _block.info, _block.block.toBytes());
throw;
}
// All ok - insert into DB
// 通过RLP编码存储区块
bytes const receipts = br.rlp();
return insertBlockAndExtras(_block, ref(receipts), td, performanceLogger);
}
insertBlockAndExtras(_block, ref(receipts), td, performanceLogger);函数将区块写入数据库:
ImportRoute BlockChain::insertBlockAndExtras(VerifiedBlockRef const& _block, bytesConstRef _receipts, u256 const& _totalDifficulty, ImportPerformanceLogger& _performanceLogger)
{
std::unique_ptr<db::WriteBatchFace> blocksWriteBatch = m_blocksDB->createWriteBatch();
std::unique_ptr<db::WriteBatchFace> extrasWriteBatch = m_extrasDB->createWriteBatch();
h256 newLastBlockHash = currentHash();
unsigned newLastBlockNumber = number();
try
{
// ensure parent is cached for later addition.
// TODO: this is a bit horrible would be better refactored into an enveloping UpgradableGuard
// together with an "ensureCachedWithUpdatableLock(l)" method.
// This is safe in practice since the caches don't get flushed nearly often enough to be
// done here.
details(_block.info.parentHash());
DEV_WRITE_GUARDED(x_details)
m_details[_block.info.parentHash()].childHashes.push_back(_block.info.hash());
_performanceLogger.onStageFinished("collation");
blocksWriteBatch->insert(toSlice(_block.info.hash()), db::Slice(_block.block));
DEV_READ_GUARDED(x_details)
extrasWriteBatch->insert(toSlice(_block.info.parentHash(), ExtraDetails),
(db::Slice)dev::ref(m_details[_block.info.parentHash()].rlp()));
BlockDetails const details{static_cast<unsigned>(_block.info.number()), _totalDifficulty,
_block.info.parentHash(), {} /* children */, _block.block.size()};
extrasWriteBatch->insert(
toSlice(_block.info.hash(), ExtraDetails), (db::Slice)dev::ref(details.rlp()));
BlockLogBlooms blb;
for (auto i: RLP(_receipts))
blb.blooms.push_back(TransactionReceipt(i.data()).bloom());
extrasWriteBatch->insert(
toSlice(_block.info.hash(), ExtraLogBlooms), (db::Slice)dev::ref(blb.rlp()));
extrasWriteBatch->insert(toSlice(_block.info.hash(), ExtraReceipts), (db::Slice)_receipts);
_performanceLogger.onStageFinished("writing");
}
catch (Exception& ex)
{
addBlockInfo(ex, _block.info, _block.block.toBytes());
throw;
}
h256s route;
h256 common;
bool isImportedAndBest = false;
// This might be the new best block...
h256 last = currentHash();
if (_totalDifficulty > details(last).totalDifficulty || (m_sealEngine->chainParams().tieBreakingGas &&
_totalDifficulty == details(last).totalDifficulty && _block.info.gasUsed() > info(last).gasUsed()))
{
// don't include bi.hash() in treeRoute, since it's not yet in details DB...
// just tack it on afterwards.
unsigned commonIndex;
tie(route, common, commonIndex) = treeRoute(last, _block.info.parentHash());
route.push_back(_block.info.hash());
// Most of the time these two will be equal - only when we're doing a chain revert will they not be
if (common != last)
DEV_READ_GUARDED(x_lastBlockHash)
clearCachesDuringChainReversion(number(common) + 1);
// Go through ret backwards (i.e. from new head to common) until hash != last.parent and
// update m_transactionAddresses, m_blockHashes
for (auto i = route.rbegin(); i != route.rend() && *i != common; ++i)
{
BlockHeader tbi;
if (*i == _block.info.hash())
tbi = _block.info;
else
tbi = BlockHeader(block(*i));
// Collate logs into blooms.
h256s alteredBlooms;
{
LogBloom blockBloom = tbi.logBloom();
blockBloom.shiftBloom<3>(sha3(tbi.author().ref()));
// Pre-memoize everything we need before locking x_blocksBlooms
for (unsigned level = 0, index = (unsigned)tbi.number(); level < c_bloomIndexLevels; level++, index /= c_bloomIndexSize)
blocksBlooms(chunkId(level, index / c_bloomIndexSize));
WriteGuard l(x_blocksBlooms);
for (unsigned level = 0, index = (unsigned)tbi.number(); level < c_bloomIndexLevels; level++, index /= c_bloomIndexSize)
{
unsigned i = index / c_bloomIndexSize;
unsigned o = index % c_bloomIndexSize;
alteredBlooms.push_back(chunkId(level, i));
m_blocksBlooms[alteredBlooms.back()].blooms[o] |= blockBloom;
}
}
// Collate transaction hashes and remember who they were.
//h256s newTransactionAddresses;
{
bytes blockBytes;
RLP blockRLP(*i == _block.info.hash() ? _block.block : &(blockBytes = block(*i)));
TransactionAddress ta;
ta.blockHash = tbi.hash();
for (ta.index = 0; ta.index < blockRLP[1].itemCount(); ++ta.index)
extrasWriteBatch->insert(
toSlice(sha3(blockRLP[1][ta.index].data()), ExtraTransactionAddress),
(db::Slice)dev::ref(ta.rlp()));
}
// Update database with them.
ReadGuard l1(x_blocksBlooms);
for (auto const& h: alteredBlooms)
extrasWriteBatch->insert(
toSlice(h, ExtraBlocksBlooms), (db::Slice)dev::ref(m_blocksBlooms[h].rlp()));
extrasWriteBatch->insert(toSlice(h256(tbi.number()), ExtraBlockHash),
(db::Slice)dev::ref(BlockHash(tbi.hash()).rlp()));
}
// FINALLY! change our best hash.
{
newLastBlockHash = _block.info.hash();
newLastBlockNumber = (unsigned)_block.info.number();
isImportedAndBest = true;
}
LOG(m_logger) << " Imported and best " << _totalDifficulty << " (#"
<< _block.info.number() << "). Has "
<< (details(_block.info.parentHash()).childHashes.size() - 1)
<< " siblings. Route: " << route;
}
else
{
LOG(m_loggerDetail) << " Imported but not best (oTD: " << details(last).totalDifficulty
<< " > TD: " << _totalDifficulty << "; " << details(last).number << ".."
<< _block.info.number() << ")";
}
try
{
m_blocksDB->commit(std::move(blocksWriteBatch));
}
catch (boost::exception& ex)
{
cwarn << "Error writing to blockchain database: " << boost::diagnostic_information(ex);
cwarn << "Fail writing to blockchain database. Bombing out.";
exit(-1);
}
try
{
m_extrasDB->commit(std::move(extrasWriteBatch));
}
catch (boost::exception& ex)
{
cwarn << "Error writing to extras database: " << boost::diagnostic_information(ex);
cwarn << "Fail writing to extras database. Bombing out.";
exit(-1);
}
if (m_lastBlockHash != newLastBlockHash)
DEV_WRITE_GUARDED(x_lastBlockHash)
{
m_lastBlockHash = newLastBlockHash;
m_lastBlockNumber = newLastBlockNumber;
try
{
m_extrasDB->insert(db::Slice("best"), db::Slice((char const*)&m_lastBlockHash, 32));
}
catch (boost::exception const& ex)
{
cwarn << "Error writing to extras database: " << boost::diagnostic_information(ex);
cout << "Put" << toHex(bytesConstRef(db::Slice("best"))) << "=>"
<< toHex(bytesConstRef(db::Slice((char const*)&m_lastBlockHash, 32)));
cwarn << "Fail writing to extras database. Bombing out.";
exit(-1);
}
}
_performanceLogger.onStageFinished("checkBest");
unsigned const gasPerSecond = static_cast<double>(_block.info.gasUsed()) / _performanceLogger.stageDuration("enactment");
_performanceLogger.onFinished({
{"blockHash", "\""+ _block.info.hash().abridged() + "\""},
{"blockNumber", toString(_block.info.number())},
{"gasPerSecond", toString(gasPerSecond)},
{"transactions", toString(_block.transactions.size())},
{"gasUsed", toString(_block.info.gasUsed())}
});
if (!route.empty())
noteCanonChanged();
if (isImportedAndBest && m_onBlockImport)
m_onBlockImport(_block.info);
h256s fresh;
h256s dead;
bool isOld = true;
for (auto const& h: route)
if (h == common)
isOld = false;
else if (isOld)
dead.push_back(h);
else
fresh.push_back(h);
return ImportRoute{dead, fresh, _block.transactions};
}
它用于将新的块添加到以太坊区块链上,同时在数据库中添加新块的附加信息。该函数包含以下步骤:
- 创建用于块和附加信息写入的两个
db::WriteBatchFace
实例。 - 确保父块已经缓存,以便稍后添加。同时将新块添加到父块的子列表中。
- 将新块的块数据插入块数据库中。
- 将父块的块详细信息插入附加数据库中。
- 将新块的详细信息插入附加数据库中。
- 将新块的日志 Bloom 过滤器插入附加数据库中。
- 将新块的交易收据插入附加数据库中。
- 如果该新块的难度比当前最后一个块的难度更高,则更新块链并记录路由信息。
- 更新数据库中的交易地址信息和块哈希信息。
注:在上述步骤中,dev::
和 WriteGuard
等是以太坊客户端特定的工具和类型。