main()
aleth启动的主函数位于aleth/aleth/main.cpp
中,。
main函数中下面这行代码启动了eth服务:
dev::WebThreeDirect web3(WebThreeDirect::composeClientVersion("aleth"), db::databasePath(),
snapshotPath, chainParams, withExisting, netPrefs, &nodesState, testingMode);
// 在WebThreeDirect中,传入的参数db::databasePath(), snapshotPath, chainParams, withExisting和eth服务有关,用于生成在WebThreeDirect中的成员变量eth::Client
// dev::WebThreeDirect的成员变量eth::Client就是eth服务的主体,eth服务会同步区块,接收来自外部的新块、处理本地挖出的新块和交易,处理数据库相关的内容
这里是生命了web3对象,调用了其构造函数,具体看下构造函数的内容:
WebThreeDirect::WebThreeDirect(std::string const& _clientVersion,
boost::filesystem::path const& _dbPath, boost::filesystem::path const& _snapshotPath,
eth::ChainParams const& _params, WithExisting _we, NetworkConfig const& _n,
bytesConstRef _network, bool _testing)
: m_clientVersion(_clientVersion), m_net(_clientVersion, _n, _network)
{
// m_ethereum是WebThreeDirect的私有变量,是eth::Client类型
if (_testing)
m_ethereum.reset(new eth::ClientTest(
_params, (int)_params.networkID, m_net, shared_ptr<GasPricer>(), _dbPath, _we));
else // 这里new Client的时候初始化了很多东西🌟
m_ethereum.reset(new eth::Client(_params, (int)_params.networkID, m_net,
shared_ptr<GasPricer>(), _dbPath, _snapshotPath, _we));
// Client继承了Worker的startWorking 🌟🌟
m_ethereum->startWorking();
const auto* buildinfo = aleth_get_buildinfo();
m_ethereum->setExtraData(rlpList(0, string{buildinfo->project_version}.substr(0, 5)
+ "++"
+ string{buildinfo->git_commit_hash}.substr(0, 4)
+ string{buildinfo->build_type}.substr(0, 1)
+ string{buildinfo->system_name}.substr(0, 5)
+ string{buildinfo->compiler_id}.substr(0, 3)));
// setExtraData内容例子:1.8.0++dlinuxgnu
}
这里主要看两个地方:
- 🌟:new eth::Client初始化了很多东西
- 🌟🌟:m_ethereum->startWorking(); 会启动新线程一直处理eth服务
🌟 new eth::Client
在深入看new eth::Client
之前,先看下Client中的一些成员变量:
BlockChain m_bc; ///< Maintains block database and owns the seal engine.
BlockQueue m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported).
TransactionQueue m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.
std::shared_ptr<GasPricer> m_gp; ///< The gas pricer.
OverlayDB m_stateDB; ///< Acts as the central point for the state database, so multiple States can share it.
mutable SharedMutex x_preSeal; ///< Lock on m_preSeal.
Block m_preSeal; ///< The present state of the client.
mutable SharedMutex x_postSeal; ///< Lock on m_postSeal.
Block m_postSeal; ///< The state of the client which we're sealing (i.e. it'll have all the rewards added).
mutable SharedMutex x_working; ///< Lock on m_working.
Block m_working; ///< The state of the client which we're sealing (i.e. it'll have all the rewards added), while we're actually working on it.
BlockHeader m_sealingInfo; ///< The header we're attempting to seal on (derived from m_postSeal).
std::atomic<bool> m_remoteWorking = { false }; ///< Has the remote worker recently been reset?
std::atomic<bool> m_needStateReset = { false }; ///< Need reset working state to premin on next sync
std::chrono::system_clock::time_point m_lastGetWork; ///< Is there an active and valid remote worker?
std::weak_ptr<EthereumCapability> m_host;
std::weak_ptr<WarpCapability> m_warpHost;
std::condition_variable m_signalled;
Mutex x_signalled;
Handler<> m_tqReady;
Handler<h256 const&> m_tqReplaced;
Handler<> m_bqReady;
std::atomic<bool> m_wouldSeal = { false }; ///< True if we /should/ be sealing.
bool m_wouldButShouldnot = false; ///< True if the last time we called rejigSealing wouldSeal() was true but sealer's shouldSeal() was false.
mutable std::chrono::system_clock::time_point m_lastGarbageCollection;
///< When did we last both doing GC on the watches?
mutable std::chrono::system_clock::time_point m_lastTick = std::chrono::system_clock::now();
///< When did we last tick()?
unsigned m_syncAmount = 50; ///< Number of blocks to sync in each go.
ActivityReport m_report;
SharedMutex x_functionQueue;
std::queue<std::function<void()>> m_functionQueue; ///< Functions waiting to be executed in the main thread.
std::atomic<bool> m_syncTransactionQueue = {false};
std::atomic<bool> m_syncBlockQueue = {false};
bytes m_extraData;
Signal<bytes const&> m_onBlockSealed; ///< Called if we have sealed a new block
/// Called when blockchain was changed
Signal<h256s const&, h256s const&> m_onChainChanged;
Logger m_logger{createLogger(VerbosityInfo, "client")};
Logger m_loggerDetail{createLogger(VerbosityDebug, "client")};
上面变量中有如下几个变量可以拿出来说一下:
- m_bc:区块数据库和共识引擎
- m_bq:区块链队列,存储一些incoming但是还没验证写入链上的块
- m_tq:交易队列,存储一些incoming但是还没验证写入链上的交易
- m_stateDB:状态数据库的实例句柄
- m_host: 这是个EthereumCapability类,用于和节点同步,发送新交易和新区块
- m_warpHost:WarpCapability类,用于实现Warp同步协议的相关功能,Warp是一种用于快速同步以太坊区块链的协议
Client 的构造函数初始化了一些参数,在深入看init(_host, _dbPath, _snapshotPath, _forceAction, _networkID);
之前,
// Client 继承了 Worker("eth", 0)
// 初始化了m_bc、m_tq等几个成员变量
Client::Client(ChainParams const& _params, int _networkID, p2p::Host& _host,
std::shared_ptr<GasPricer> _gpForAdoption, fs::path const& _dbPath,
fs::path const& _snapshotPath, WithExisting _forceAction, TransactionQueue::Limits const& _l)
: Worker("eth", 0),
m_bc(_params, _dbPath, _forceAction,
[](unsigned d, unsigned t) {
std::cerr << "REVISING BLOCKCHAIN: Processed " << d << " of " << t << "...\r";
}),
m_tq(_l),
m_gp(_gpForAdoption ? _gpForAdoption : make_shared<TrivialGasPricer>()),
m_preSeal(chainParams().accountStartNonce),
m_postSeal(chainParams().accountStartNonce),
m_working(chainParams().accountStartNonce)
{
init(_host, _dbPath, _snapshotPath, _forceAction, _networkID);
}
初始化一些变量之后,其调用了init(_host, _dbPath, _snapshotPath, _forceAction, _networkID);
进行更多的初始化。
下面看下这个init干啥了。
init
每行做了什么可以看注释:
void Client::init(p2p::Host& _extNet, fs::path const& _dbPath,
fs::path const& _snapshotDownloadPath, WithExisting _forceAction, u256 _networkId)
{
DEV_TIMED_FUNCTION_ABOVE(500);
// Cannot be opened until after blockchain is open, since BlockChain may upgrade the database.
// TODO: consider returning the upgrade mechanism here. will delaying the opening of the blockchain database
// until after the construction.
// 通过调用 State::openDB 函数打开指定路径的数据库,并指定 Genesis 块的哈希值和 _forceAction 参数,从而获取状态数据库的句柄并将其存储在 m_stateDB 变量中
m_stateDB = State::openDB(_dbPath, bc().genesisHash(), _forceAction);
// LAZY. TODO: move genesis state construction/commiting to stateDB openning and have this just take the root from the genesis block.
// 这段代码是根据m_stateDB中的状态,在区块链中生成创世区块。m_preSeal在这里指代生成的创世区块。
m_preSeal = bc().genesisBlock(m_stateDB);
m_postSeal = m_preSeal;
// 将当前区块链对象(bc())作为参数传递给BlockQueue类的一个成员函数setChain()。
// BlockQueue类是一个用于处理交易池和挖掘块的类,setChain()函数是用于设置BlockQueue类对象中当前区块链对象的函数。
m_bq.setChain(bc());
// 这个变量用于记录最后一次向矿池请求work的时间,以便在一段时间内避免过于频繁地请求。
// 当前时间减去30秒钟,即表示30秒前的时间点。
m_lastGetWork = std::chrono::system_clock::now() - chrono::seconds(30);
// lambda表达式,定义了一个onReady回调函数,当交易队列就绪时被调用。
// 其中,m_tq.onReady函数返回一个待执行的事件句柄,被赋值给m_tqReady变量。这个事件句柄可以在适当的时候被取消或再次执行。
m_tqReady = m_tq.onReady([=]() {
this->onTransactionQueueReady();
}); // TODO: should read m_tq->onReady(thisThread, syncTransactionQueue);
m_tqReplaced = m_tq.onReplaced([=](h256 const&) { m_needStateReset = true; });
m_bqReady = m_bq.onReady([=]() {
this->onBlockQueueReady();
}); // TODO: should read m_bq->onReady(thisThread, syncBlockQueue);
m_bq.setOnBad([=](Exception& ex) { this->onBadBlock(ex); });
bc().setOnBad([=](Exception& ex) { this->onBadBlock(ex); });
bc().setOnBlockImport([=](BlockHeader const& _info) {
if (auto h = m_host.lock())
h->onBlockImported(_info);
});
if (_forceAction == WithExisting::Rescue)
bc().rescue(m_stateDB);
// update 函数的作用是更新存储一些关于 Ethereum 区块链的参数,例如 gas limit,gas price 等。
// 函数的参数 bc() 是当前区块链的 BlockChain 对象。
m_gp->update(bc());
// create Ethereum capability only if we're not downloading the snapshot
if (_snapshotDownloadPath.empty())
{
auto ethCapability = make_shared<EthereumCapability>(
_extNet.capabilityHost(), bc(), m_stateDB, m_tq, m_bq, _networkId);
_extNet.registerCapability(ethCapability);
m_host = ethCapability;
}
// create Warp capability if we either download snapshot or can give out snapshot
auto const importedSnapshot = importedSnapshotPath(_dbPath, bc().genesisHash());
bool const importedSnapshotExists = fs::exists(importedSnapshot);
if (!_snapshotDownloadPath.empty() || importedSnapshotExists)
{
std::shared_ptr<SnapshotStorageFace> snapshotStorage(
importedSnapshotExists ? createSnapshotStorage(importedSnapshot) : nullptr);
auto warpCapability = make_shared<WarpCapability>(
_extNet.capabilityHost(), bc(), _networkId, _snapshotDownloadPath, snapshotStorage);
_extNet.registerCapability(warpCapability);
m_warpHost = warpCapability;
}
doWork(false);
}
此函数做了如下工作:
- 打开状态数据库获得数据库句柄
- 从数据库中拿创世块,如果没有创世块就直接生成一个
- 将blockchain对象放进BlockQueue中,后续BlockQueue可以直接调用他来操作链数据库
- 剩下是做一些触发器,发生了什么事件就做相应的操作
- 更新gas limit,gas price 等数值
- doWork(false); 这句下面细讲
doWork(false)
void Client::doWork(bool _doWait)
{
bool t = true;
// 比较 m_syncBlockQueue 的值和 t 的值。
// 如果相等,就将 m_syncBlockQueue 的值设为 false,并返回 true;
// 否则不修改 m_syncBlockQueue 的值,并返回 false。
// 如果 compare_exchange_strong 返回 true,说明 m_syncBlockQueue 的值原来为 true,现在已经被成功设为 false 了,并且需要调用 syncBlockQueue 函数来处理待同步的区块。
if (m_syncBlockQueue.compare_exchange_strong(t, false))
syncBlockQueue();
// 重置区块链的状态
// 在某些情况下,比如当新的区块链版本推出,或者是由于错误导致区块链状态不一致时,需要对区块链进行重置。
// m_needStateReset 是一个标志位,用于指示是否需要重置区块链状态。
if (m_needStateReset)
{
resetState();
m_needStateReset = false;
}
t = true;
bool isSealed = false;
// 宏定义设置读锁
DEV_READ_GUARDED(x_working)
// m_working 是当前正在工作的区块,isSealed 表示当前工作区块是否已经被封存。
// 当一个区块被封存后,它的数据和计算结果就不能被更改了。
isSealed = m_working.isSealed();
// 这段代码表示如果当前正在打包区块(isSealed为true)或者正在进行主要同步(isMajorSyncing()为true),
// 或者当前的工作不是在本地进行(m_remoteWorking为true),那么就不会调用 syncTransactionQueue() 来同步交易队列。
// 当这三个条件都不成立时,会尝试去将 m_syncTransactionQueue 的值从 true 修改为 false,如果修改成功,
// 则会调用 syncTransactionQueue() 同步交易队列。
// syncTransactionQueue() 的作用是尽可能地将新的交易加入到待打包交易列表中,以便于打包新的区块。
if (!isSealed && !isMajorSyncing() && !m_remoteWorking && m_syncTransactionQueue.compare_exchange_strong(t, false))
syncTransactionQueue();
tick();
rejigSealing();
callQueuedFunctions();
DEV_READ_GUARDED(x_working)
isSealed = m_working.isSealed();
// If the block is sealed, we have to wait for it to tickle through the block queue
// (which only signals as wanting to be synced if it is ready).
if (!m_syncBlockQueue && !m_syncTransactionQueue && (_doWait || isSealed) && isWorking())
{
std::unique_lock<std::mutex> l(x_signalled);
m_signalled.wait_for(l, chrono::seconds(1));
}
}
这段函数是一个循环中的主要调度程序,其中包括:
检查是否需要同步块队列,如果需要则同步。
检查是否需要重置状态,如果需要则重置。
检查是否可以同步交易队列,如果可以则同步。
执行tick()函数。
重新调整sealing。
调用排队的函数。
等待信号量。
其中,tick()函数用于处理所有需要在工作中完成的事情,例如处理挖掘任务、处理新块、处理未完成的交易等。rejigSealing()函数用于检查是否需要重新调整sealing,以保持在最新的工作状态。最后,程序等待信号量以避免占用太多CPU资源。
tick()函数
void Client::tick()
{
if (chrono::system_clock::now() - m_lastTick > chrono::seconds(1))
{
m_report.ticks++;
checkWatchGarbage();
m_bq.tick();
m_lastTick = chrono::system_clock::now();
if (m_report.ticks == 15)
LOG(m_loggerDetail) << activityReport();
}
}
这个函数主要是在 Client 类中执行,用于定时调用一些功能,包括:
- 检查是否需要清理过期的事务数据(checkWatchGarbage)
- 触发 BlockQueue 的 tick 函数
- 更新 m_lastTick 的值,以及在特定情况下输出活动日志
这个函数会在 doWork 函数中被调用。其中,if 语句的判断条件是当前时间与上一次调用该函数的时间相差 1 秒以上,因此该函数的作用是定时执行一些任务,例如检查是否需要回收垃圾、更新块队列等。同时,该函数也会更新客户端的 m_lastTick
成员变量,用于下一次判断是否需要执行。
m_bq.tick()
m_bq.tick()做了一些区块同步的事情,需要重点读一下。
void BlockQueue::tick()
{
vector<pair<h256, bytes>> todo;
{
UpgradableGuard l(m_lock);
// m_future 存的是还未验证的区块
// 下面会往 m_future 中存储新块(如果有),此函数会被循环执行,所以这次添加,下次就会处理
if (m_future.isEmpty())
return;
LOG(m_logger) << "Checking past-future blocks...";
// 如果 m_future 里面最近的块来自未来,直接return
time_t t = utcTime();
if (t < m_future.firstKey())
return;
LOG(m_logger) << "Past-future blocks ready.";
{
UpgradeGuard l2(l);
DEV_INVARIANT_CHECK;
// 从 m_future 中取出所有的过去的区块(时刻t之前的区块)
todo = m_future.removeByKeyNotGreater(t);
for (auto const& hash : todo)
m_futureSet.erase(hash.first);
}
}
LOG(m_logger) << "Importing " << todo.size() << " past-future blocks.";
// 执行import
for (auto const& b: todo)
import(&b.second);
}
简单来说,m_future存储了广播过来的、我方还未验证的区块。这个函数取出了m_future 列表中在当前时刻之前产生的区块。然后将这些块从 m_future 中删去后,针对每一个区块执行import(&b.second);
import
ImportResult BlockQueue::import(bytesConstRef _block, bool _isOurs)
{
// Check if we already know this block.
h256 h = BlockHeader::headerHashFromBlock(_block);
LOG(m_loggerDetail) << "Queuing block " << h << " for import...";
UpgradableGuard l(m_lock);
if (contains(m_readySet, h) || contains(m_drainingSet, h) || contains(m_unknownSet, h) ||
contains(m_knownBad, h) || contains(m_futureSet, h))
{
// Already know about this one.
LOG(m_loggerDetail) << "Already known.";
return ImportResult::AlreadyKnown;
}
// 尝试验证区块,检查它是否符合区块链的规则和内部一致性
// 验证不通过的话,报错:区块格式不正确
// 如果验证通过,会将验证结果保存到变量 bi 中,包括该区块的头部信息和交易列表。
BlockHeader bi;
try
{
// TODO: quick verification of seal - will require BlockQueue to be templated on SealEngine
// VERIFY: populates from the block and checks the block is internally coherent.
bi = m_bc->verifyBlock(_block, m_onBad, ImportRequirements::PostGenesis).info;
}
catch (Exception const& _e)
{
cwarn << "Ignoring malformed block: " << diagnostic_information(_e);
return ImportResult::Malformed;
}
LOG(m_loggerDetail) << "Block " << h << " is " << bi.number() << " parent is " << bi.parentHash();
// 验证通过,需要将其写入本地链
// 检查该区块是否已经存在于区块链中
// 如果该区块已经存在于区块链中,那么就不需要再次导入该区块
// Check block doesn't already exist first!
if (m_bc->isKnown(h))
{
LOG(m_logger) << "Already known in chain.";
return ImportResult::AlreadyInChain;
}
UpgradeGuard ul(l);
DEV_INVARIANT_CHECK;
// 这段代码是在判断待导入的区块是否为未来时间的区块。
// 如果是未来时间的区块,那么它会被添加到一个 future 集合中,等待未来时间到来后再导入。
// 具体来说,它会将该区块的时间戳作为键,区块的哈希和字节表示作为值插入到一个按时间戳排序的 map 容器 m_future 中,
// 并将该区块的哈希添加到 m_futureSet 集合中。同时,它还会计算该区块的难度并将其添加到当前难度中,以便后续使用。
// 最后,它还会检查该区块的父区块是否已经在 readySet、drainingSet、futureSet 或者 blockchain 中,
// 如果都不在,则返回 ImportResult::FutureTimeUnknown,表示该区块的父区块是未知的。
// 如果已经存在,则返回 ImportResult::FutureTimeKnown,表示该区块的父区块已知。
// Check it's not in the future
if (bi.timestamp() > utcTime() && !_isOurs)
{
m_future.insert(static_cast<time_t>(bi.timestamp()), h, _block.toBytes());
m_futureSet.insert(h);
char buf[24];
time_t bit = static_cast<time_t>(bi.timestamp());
if (strftime(buf, 24, "%X", localtime(&bit)) == 0)
buf[0] = '\0'; // empty if case strftime fails
LOG(m_loggerDetail) << "OK - queued for future [" << bi.timestamp() << " vs " << utcTime()
<< "] - will wait until " << buf;
m_difficulty += bi.difficulty();
h256 const parentHash = bi.parentHash();
bool const unknown = !contains(m_readySet, parentHash) &&
!contains(m_drainingSet, parentHash) &&
!contains(m_futureSet, parentHash) && !m_bc->isKnown(parentHash);
return unknown ? ImportResult::FutureTimeUnknown : ImportResult::FutureTimeKnown;
}
else
{
// We now know it.
// 如果父区块是坏块,则这个块也是坏块,不导入
if (m_knownBad.count(bi.parentHash()))
{
m_knownBad.insert(bi.hash());
updateBad_WITH_LOCK(bi.hash());
// bad parent; this is bad too, note it as such
return ImportResult::BadChain;
}
else if (!m_readySet.count(bi.parentHash()) && !m_drainingSet.count(bi.parentHash()) && !m_bc->isKnown(bi.parentHash()))
{
// We don't know the parent (yet) - queue it up for later. It'll get resent to us if we find out about its ancestry later on.
LOG(m_loggerDetail) << "OK - queued as unknown parent: " << bi.parentHash();
m_unknown.insert(bi.parentHash(), h, _block.toBytes());
m_unknownSet.insert(h);
m_difficulty += bi.difficulty();
return ImportResult::UnknownParent;
}
else
{
// If valid, append to blocks.
LOG(m_loggerDetail) << "OK - ready for chain insertion.";
// 将其放入未验证块队列中
DEV_GUARDED(m_verification)
m_unverified.enqueue(UnverifiedBlock { h, bi.parentHash(), _block.toBytes() });
// 通知等待该队列的任何线程
m_moreToVerify.notify_one();
// 将块的哈希值添加到已准备好的块集合中
m_readySet.insert(h);
// 增加总块难度
m_difficulty += bi.difficulty();
// 记录该块已准备好的信息
noteReady_WITH_LOCK(h);
return ImportResult::Success;
}
}
}
这个函数验证了如下东西:
- 这个块是否符合区块链的规则和内部一致性,不符合则返回块格式错误
- 这个块的时间戳是否是未来的,是的话检查父区块是否已知,返回父区块存在情况
- 检查这个块的父区块是否合法,不合法则抛弃
- 检查这个块的父区块是否已知,不已知则先存起来,等其父区块;如果已知,则通过检查
如果通过检查,则验证通过,将其存入链上:
- 将其放入未验证块队列中
- 通知等待该队列的任何线程
- 将块的哈希值添加到已准备好的块集合中
- 增加总块难度
- 记录该块已准备好的信息
后续对块写入链的操作,这篇文章就先不看了,更深入的内容可以看另一篇文章Aleth-区块验证与存储。
rejigSealing()函数
// "rejig" 是一个英国俚语,意思是重新安排或调整。
// 这个函数是用于重新调整客户端的 sealing 策略的。
void Client::rejigSealing()
{
// 检查当前客户端是否满足启动 sealing 的条件,即 wouldSeal() || remoteActive()
// 如果满足条件且当前不在进行主同步(!isMajorSyncing()),则会调用 sealEngine 的 shouldSeal 函数来判断是否应该启动 sealing。
if ((wouldSeal() || remoteActive()) && !isMajorSyncing())
{
if (sealEngine()->shouldSeal(this))
{
m_wouldButShouldnot = false;
LOG(m_loggerDetail) << "Rejigging seal engine...";
DEV_WRITE_GUARDED(x_working)
{
if (m_working.isSealed())
{
LOG(m_logger) << "Tried to seal sealed block...";
return;
}
// TODO is that needed? we have "Generating seal on" below
LOG(m_loggerDetail) << "Starting to seal block #" << m_working.info().number();
m_working.commitToSeal(bc(), m_extraData);
}
DEV_READ_GUARDED(x_working)
{
DEV_WRITE_GUARDED(x_postSeal)
m_postSeal = m_working;
m_sealingInfo = m_working.info();
}
// 如果应该启动,那么它会提交一个生成 seal 的回调函数,然后开始生成 seal,
// 等 seal 生成后,会将该 block 提交到网络中,并调用注册的回调函数 m_onBlockSealed,
// 告知外部该 block 已经被 sealing 成功。
if (wouldSeal())
{
// 回调函数
sealEngine()->onSealGenerated([=](bytes const& _header) {
LOG(m_logger) << "Block sealed #" << BlockHeader(_header, HeaderData).number();
// seal 生成后,会将该 block 提交到网络中,并调用注册的回调函数 m_onBlockSealed,告知外部该 block 已经被 sealing 成功。
if (this->submitSealed(_header))
m_onBlockSealed(_header);
else
LOG(m_logger) << "Submitting block failed...";
});
ctrace << "Generating seal on " << m_sealingInfo.hash(WithoutSeal) << " #" << m_sealingInfo.number();
// 开始生成 seal
sealEngine()->generateSeal(m_sealingInfo);
}
}
else // 不应该启动 sealing,则会将 m_wouldButShouldnot 标记为 true
m_wouldButShouldnot = true;
}
// 如果当前客户端不会 sealing(!m_wouldSeal),则取消正在进行的 seal 生成过程。
if (!m_wouldSeal)
sealEngine()->cancelGeneration();
}
这个函数是用于重新调整客户端的 sealing 策略的。首先它会检查当前客户端是否满足启动 sealing 的条件,即 wouldSeal() || remoteActive()
,如果满足条件且当前不在进行主同步(!isMajorSyncing()
),则会调用 sealEngine 的 shouldSeal
函数来判断是否应该启动 sealing。如果应该启动,那么它会提交一个生成 seal 的回调函数,然后开始生成 seal,等 seal 生成后,会将该 block 提交到网络中,并调用注册的回调函数 m_onBlockSealed
,告知外部该 block 已经被 sealing 成功。
如果不应该启动 sealing,则会将 m_wouldButShouldnot
标记为 true。如果当前客户端不会 sealing(!m_wouldSeal
),则取消正在进行的 seal 生成过程。
callQueuedFunctions()函数
void Client::callQueuedFunctions()
{
while (true)
{
function<void()> f;
DEV_WRITE_GUARDED(x_functionQueue)
if (!m_functionQueue.empty())
{
f = m_functionQueue.front();
m_functionQueue.pop();
}
if (f)
f();
else
break;
}
}
该函数是用来从客户端的函数队列中依次取出一个函数并执行,直到队列为空为止。该函数使用了读写锁 x_functionQueue
来确保并发安全。它会从队列的前端取出函数 f
,然后判断 f
是否为空,如果不为空,则执行函数 f
。如果 f
为空,则退出循环。这个函数通常在主循环中被调用,用于处理一些需要在主线程中运行的函数,以确保它们不会被并发执行。
🌟🌟 m_ethereum->startWorking()
在new Client时其构造函数调用了初始化函数做了一些工作,就是m_ethereum->startWorking()
启动了最核心的线程部分,如下
void Worker::startWorking()
{
// cnote << "startWorking for thread" << m_name;
std::unique_lock<std::mutex> l(x_work);
if (m_work)
{
WorkerState ex = WorkerState::Stopped;
m_state.compare_exchange_strong(ex, WorkerState::Starting);
m_state_notifier.notify_all();
}
else
{
m_state = WorkerState::Starting;
m_state_notifier.notify_all();
// 创建线程
m_work.reset(new thread([&]() {
setThreadName(m_name.c_str());
cnote << "New thread begins: " << m_name.c_str();
while (m_state != WorkerState::Killing)
{
WorkerState ex = WorkerState::Starting;
{
// the condition variable-related lock
unique_lock<mutex> l(x_work);
m_state = WorkerState::Started;
}
// cnote << "Trying to set Started: Thread was" << (unsigned)ex << "; " <<
//ok;
m_state_notifier.notify_all();
try
{
// ETH线程会调用Client的
// P2P会调用Host的
startedWorking();
workLoop();
doneWorking();
}
catch (std::exception const& _e)
{
cwarn << "Exception thrown in Worker thread: " << _e.what();
}
// ex = WorkerState::Stopping;
// m_state.compare_exchange_strong(ex, WorkerState::Stopped);
{
// the condition variable-related lock
unique_lock<mutex> l(x_work);
ex = m_state.exchange(WorkerState::Stopped);
// cnote << "State: Stopped: Thread was" << (unsigned)ex;
if (ex == WorkerState::Killing || ex == WorkerState::Starting)
m_state.exchange(ex);
}
m_state_notifier.notify_all();
// cnote << "Waiting until not Stopped...";
{
unique_lock<mutex> l(x_work);
DEV_TIMED_ABOVE("Worker stopping", 100)
while (m_state == WorkerState::Stopped)
m_state_notifier.wait(l);
}
}
}));
// cnote << "Spawning" << m_name;
}
DEV_TIMED_ABOVE("Start worker", 100)
while (m_state == WorkerState::Starting)
m_state_notifier.wait(l);
}
可以看到,startWorking()这个函数再次被调用(在P2P启动时,这个函数也被调用),其核心部分依然是启动了新线程,然后在新线程中执行了最重要的三个函数
{
startedWorking();
workLoop();
doneWorking();
}
这三个函数分别是开始前的准备,工作的循环流程(死循环),线程结束之后的收尾
不难推断出,aleth中的多线程方式就是通过此函数以三步走的方式提供服务。
而不同的类会实现不同的startedWorking()、workLoop()、doneWorking()函数,以保证正确性。aleth的eth服务的类是Client,就是客户端的意思,提供客户端应有的区块同步,数据库等功能。而p2p服务的类是Host,Host更多代表着网络上的主机。可以看出老外对Client/Host的认识还是比较清楚的。而Client/Host都继承了Worker,可以重写/调用Worker的startedWorking()、workLoop()、doneWorking()函数,Worker就是“工人”,很形象。
在这里,启动的线程名字为eth
(启动P2P服务的线程名为p2p
)。
startedWorking()
void Client::startedWorking()
{
// Synchronise the state according to the head of the block chain.
// TODO: currently it contains keys for *all* blocks. Make it remove old ones.
LOG(m_loggerDetail) << "startedWorking()";
DEV_WRITE_GUARDED(x_preSeal)
m_preSeal.sync(bc());
DEV_READ_GUARDED(x_preSeal)
{
DEV_WRITE_GUARDED(x_working)
m_working = m_preSeal;
DEV_WRITE_GUARDED(x_postSeal)
m_postSeal = m_preSeal;
}
}
这个函数的主要作用是在客户端开始工作时,将客户端的状态与区块链的头同步。
具体来说,它将 m_preSeal 对象(包含客户端对区块链中所有块的密钥)与区块链的头同步,并将同步后的状态存储在 m_working 和 m_postSeal 对象中。
其中,m_preSeal、m_working 和 m_postSeal 都是 Client 类的成员变量,它们分别表示工作前、工作中和工作后的客户端状态。
函数中使用了 DEV_READ_GUARDED 和 DEV_WRITE_GUARDED 宏来保证多线程安全。
workLoop()
void Worker::workLoop()
{
while (m_state == WorkerState::Started)
{
if (m_idleWaitMs)
this_thread::sleep_for(chrono::milliseconds(m_idleWaitMs));
doWork();
}
}
这个函数很简单,就是在死循环中一直doWork()
。
Client类的doWork()
函数在上面Client初始化时已经运行过一次,具体dowork的内容直接看上面的就行。总结来说doWork()
处理区块同步、交易同步、挖矿任务、接收新块、与数据库交互之类的。
doneWorking()
void Client::doneWorking()
{
// Synchronise the state according to the head of the block chain.
// TODO: currently it contains keys for *all* blocks. Make it remove old ones.
DEV_WRITE_GUARDED(x_preSeal)
m_preSeal.sync(bc());
DEV_READ_GUARDED(x_preSeal)
{
DEV_WRITE_GUARDED(x_working)
m_working = m_preSeal;
DEV_WRITE_GUARDED(x_postSeal)
m_postSeal = m_preSeal;
}
}
doneWorking()
和startedWorking()
很像。
doneWorking()
函数的作用是完成工作状态。该函数会根据当前区块链的最新状态来同步客户端的状态,并将客户端的三个状态 m_preSeal
, m_working
, m_postSeal
更新为同步后的最新状态。
具体地,该函数首先使用m_preSeal
进行区块链状态同步,然后将同步后的状态写入m_working
和m_postSeal
。这三个状态分别代表着客户端的预封装状态、正在封装状态和封装后状态。更新这三个状态的目的是为了确保客户端的状态与区块链的状态保持一致。