title: Aleth启动-P2P description: 分析了Aleth启动过程中P2P服务的代码运行细节 author: 付铭 version: 1.0.0 date: 2023-02-28 19:00:00
先上图,整体的流程如下:

main()
aleth启动的主函数位于aleth/aleth/main.cpp
中,。
main函数中下面这行代码启动了eth服务:
dev::WebThreeDirect web3(WebThreeDirect::composeClientVersion("aleth"), db::databasePath(),
snapshotPath, chainParams, withExisting, netPrefs, &nodesState, testingMode);
// 在WebThreeDirect中,传入的参数WebThreeDirect::composeClientVersion("aleth"), netPrefs, &nodesState和P2P网络有关,用于生成在WebThreeDirect中的成员变量m_net
// dev::WebThreeDirect的成员变量m_net就是eth服务的P2P服务,m_net会在后台运行并在接收到新块时向eth服务发送消息,并且也允许eth广播新块
再往下,这里启动了P2P网络服务:
if (bootstrap || !remoteHost.empty() || !disableDiscovery || listenSet || !preferredNodes.empty())
{
// 这里启动了P2P服务
web3.startNetwork();
cout << "Node ID: " << web3.enode() << "\n";
}
再往下,这里添加了预设节点:
// 这里主要是在p2p服务启动后,向Kad协议桶中加入用户预定义的节点和bootnodes
if (web3.isNetworkStarted())
{
for (auto const& p: preferredNodes)
if (p.second.second)
web3.requirePeer(p.first, p.second.first);
else
web3.addNode(p.first, p.second.first);
// 添加默认bootnodes
// 如果启动时声明了test、no-discovery或no-bootstrap,则bootstrap=false
if (bootstrap)
for (auto const& i : defaultBootNodes())
web3.addNode(i.first, i.second);
if (!remoteHost.empty())
web3.addNode(p2p::NodeID(), remoteHost + ":" + toString(remotePort));
}
就此,main函数关于P2P启动部分就差不多了,最重要的就是web3.startNetwork();
,看下他做了什么工作。
web3.startNetwork()
步进之后,可以看到其直接调用了此函数:
void Host::start()
{
DEV_TIMED_FUNCTION_ABOVE(500);
if (m_nodeTable)
BOOST_THROW_EXCEPTION(NetworkRestartNotSupported());
startWorking(); // ⭐️⭐️⭐️
while (isWorking() && !haveNetwork())
this_thread::sleep_for(chrono::milliseconds(10));
// network start failed!
if (isWorking())
return;
cwarn << "Network start failed!";
doneWorking();
}
最重要的部分就是startWorking();
,再深入查看:
void Worker::startWorking()
{
// cnote << "startWorking for thread" << m_name;
std::unique_lock<std::mutex> l(x_work);
if (m_work)
{
WorkerState ex = WorkerState::Stopped;
m_state.compare_exchange_strong(ex, WorkerState::Starting);
m_state_notifier.notify_all();
}
else
{
m_state = WorkerState::Starting;
m_state_notifier.notify_all();
// 创建线程⭐️⭐️⭐️
m_work.reset(new thread([&]() {
setThreadName(m_name.c_str());
cnote << "New thread begins: " << m_name.c_str();
while (m_state != WorkerState::Killing)
{
WorkerState ex = WorkerState::Starting;
{
// the condition variable-related lock
unique_lock<mutex> l(x_work);
m_state = WorkerState::Started;
}
//cnote << "Trying to set Started: Thread was" << (unsigned)ex << "; " <<
//ok;
m_state_notifier.notify_all();
try
{
startedWorking();
workLoop();
doneWorking();
}
catch (std::exception const& _e)
{
cwarn << "Exception thrown in Worker thread: " << _e.what();
}
// ex = WorkerState::Stopping;
// m_state.compare_exchange_strong(ex, WorkerState::Stopped);
{
// the condition variable-related lock
unique_lock<mutex> l(x_work);
ex = m_state.exchange(WorkerState::Stopped);
// cnote << "State: Stopped: Thread was" << (unsigned)ex;
if (ex == WorkerState::Killing || ex == WorkerState::Starting)
m_state.exchange(ex);
}
m_state_notifier.notify_all();
// cnote << "Waiting until not Stopped...";
{
unique_lock<mutex> l(x_work);
DEV_TIMED_ABOVE("Worker stopping", 100)
while (m_state == WorkerState::Stopped)
m_state_notifier.wait(l);
}
}
}));
// cnote << "Spawning" << m_name;
}
DEV_TIMED_ABOVE("Start worker", 100)
while (m_state == WorkerState::Starting)
m_state_notifier.wait(l);
}
最重要的就是上面标⭐️⭐️⭐️的部分创建了线程,然后便是下面的
try
{
startedWorking();
workLoop();
doneWorking();
}
在线程中启动了主要的服务。
再往下看startedWorking();
:
// Called after thread has been started to perform additional class-specific state
// initialization (e.g. start capability threads, start TCP listener, and kick off timers)
void Host::startedWorking()
{
if (haveCapabilities())
{
startCapabilities();
// try to open acceptor (todo: ipv6)
// 注意这里传了一个m_tcp4Acceptor进去到tcp4Listen
int port = Network::tcp4Listen(m_tcp4Acceptor, m_netConfig);
if (port > 0)
{
m_listenPort = port;
runAcceptor(); // ⭐️
}
else
LOG(m_logger) << "p2p.start.notice id: " << id()
<< " TCP Listen port is invalid or unavailable.";
}
else
m_listenPort = m_netConfig.listenPort;
m_tcpPublic = determinePublic();
ENR const enr = updateENR(m_restoredENR, m_tcpPublic, listenPort());
// 创建Kad协议NodeTable ⭐️⭐️
// New NodeTable会调用其构造函数,构造函数绑定了本地端口,启动了发现服务,
auto nodeTable = make_shared<NodeTable>(m_ioContext, m_alias,
NodeIPEndpoint(bi::make_address(listenAddress()), listenPort(), listenPort()), enr,
m_netConfig.discovery, m_netConfig.allowLocalDiscovery);
// Don't set an event handler if we don't have capabilities, because no capabilities
// means there's no host state to update in response to node table events
if (haveCapabilities())
nodeTable->setEventHandler(new HostNodeTableHandler(*this));
DEV_GUARDED(x_nodeTable)
m_nodeTable = nodeTable;
m_run = true;
// 加载P2P网络的相关信息 ⭐️⭐️⭐️
restoreNetwork(&m_restoreNetwork);
if (haveCapabilities())
{
LOG(m_logger) << "devp2p started. Node id: " << id();
// ⭐️⭐️⭐️⭐️
run(boost::system::error_code());
}
else
LOG(m_logger) << "No registered capabilities, devp2p not started.";
}
这函数主要看三处:
- ⭐️:runAcceptor()
- ⭐️⭐️:NodeTable的初始化调用了其构造函数
- ⭐️⭐️⭐️:restoreNetwork从缓存中重新加载P2P相关网络信息
- ⭐️⭐️⭐️⭐️:run(boost::system::error_code());
看下runAcceptor(),直接看注释:
void Host::runAcceptor()
{
assert(m_listenPort > 0);
if (m_tcp4Acceptor.is_open() && !m_accepting)
{
cnetdetails << "Listening on local port " << m_listenPort;
m_accepting = true;
// 这里只是对m_tcp4Acceptor的async_accept函数进行声明,并没有启动
// 真正的启动要等到m_ioContext.run()才启动了m_tcp4Acceptor
// 在Host的构造函数中对m_tcp4Acceptor的声明是这样的:m_tcp4Acceptor(m_ioContext)
// 这里是boost的异步网络编程
// 在下面传入async_accept的匿名函数中,可以看到其有些地方重新调用了runAcceptor这个函数
// 这是因为传入async_accept的匿名函数只能处理一次连接,连接关闭后函数结束
// 如果希望传入async_accept的匿名函数能够重复处理连接,则再次将匿名函数传入async_accept即可
m_tcp4Acceptor.async_accept([this](boost::system::error_code _ec, bi::tcp::socket _socket) {
std::cout << "void Host::runAcceptor()" << std::endl;
m_accepting = false;
if (_ec || !m_tcp4Acceptor.is_open())
return;
auto socket = make_shared<RLPXSocket>(std::move(_socket));
if (peerCount() > peerSlots(Ingress))
{
cnetdetails << "Dropping incoming connect due to maximum peer count (" << Ingress
<< " * ideal peer count): " << socket->remoteEndpoint();
socket->close();
if (_ec.value() < 1)
runAcceptor();
return;
}
// 这里会尝试与对方进行RLPXHandshake
// 会把handshake存储在m_connecting中
// 然后开始handshake
// 握手具体流程放在最后讲
bool success = false;
try
{
// incoming connection; we don't yet know nodeid
auto handshake = make_shared<RLPXHandshake>(this, socket);
m_connecting.push_back(handshake);
handshake->start();
success = true;
}
catch (Exception const& _e)
{
cwarn << "ERROR: " << diagnostic_information(_e);
}
catch (exception const& _e)
{
cwarn << "ERROR: " << _e.what();
}
if (!success)
socket->ref().close();
runAcceptor();
});
}
}
NodeTable构造函数中最重要的三步:
try
{
m_socket->connect(); // 绑定了UDP端口并接收处理UDP的ENR消息,主要在void NodeTable::onPacketReceived处理Ping、Pong、ENRRequest等等
doDiscovery(); // 这里进行发现节点,根据代码,好像是随机一个nodeid然后去查
doHandleTimeouts(); // 如果其他节点未能及时回复我们的ping,则将其从k桶中删去,并从替补节点中找一个补上
doEndpointTracking(); // 做了一些回收工作,处理某些列表中的无用信息
}
然后是从缓存中加载网络⭐️⭐️⭐️restoreNetwork:network缓存在初始化eth服务时对m_net的初始化时已经从磁盘读出并写入m_net中,只是一直未解析。restoreNetwork则从变量中解析缓存的节点并重新连接他们(将节点加入到Kad NodeTable中,NodeTable会自动去连接次节点)。
然后是⭐️⭐️⭐️⭐️:run(boost::system::error_code()),看下干啥了:
// 运行P2P服务
void Host::run(boost::system::error_code const& _ec)
{
if (!m_run || _ec)
return;
// This again requires x_nodeTable, which is why an additional variable nodeTable is used.
// 这里启动了NodeTable的事件监听器,比如节点加入NodeTable,NodeTable会自动连接该节点
if (auto nodeTable = this->nodeTable())
nodeTable->processEvents();
// cleanup zombies
DEV_GUARDED(x_connecting)
m_connecting.remove_if([](weak_ptr<RLPXHandshake> h) { return h.expired(); });
keepAlivePeers();
logActivePeers();
// At this time peers will be disconnected based on natural TCP timeout.
// disconnectLatePeers needs to be updated for the assumption that Session
// is always live and to ensure reputation and fallback timers are properly
// updated. // disconnectLatePeers();
// todo: update peerSlotsAvailable()
list<shared_ptr<Peer>> toConnect;
unsigned reqConn = 0;
{
RecursiveGuard l(x_sessions);
auto p = m_peers.cbegin();
while (p != m_peers.cend())
{
bool peerRemoved = false;
bool haveSession = havePeerSession(p->second->id);
bool required = p->second->peerType == PeerType::Required;
if (haveSession && required)
reqConn++;
else if (!haveSession)
{
if (p->second->isUseless())
{
peerRemoved = true;
p = m_peers.erase(p);
}
else if (p->second->shouldReconnect() && (!m_netConfig.pin || required))
toConnect.push_back(p->second);
}
if (!peerRemoved)
p++;
}
}
for (auto p : toConnect)
if (p->peerType == PeerType::Required && reqConn++ < m_idealPeerCount)
connect(p);
if (!m_netConfig.pin)
{
unsigned const maxSlots = m_idealPeerCount + reqConn;
unsigned occupiedSlots = peerCount() + m_pendingPeerConns.size();
for (auto peerToConnect = toConnect.cbegin();
occupiedSlots <= maxSlots && peerToConnect != toConnect.cend(); ++peerToConnect)
{
if ((*peerToConnect)->peerType == PeerType::Optional)
{
connect(*peerToConnect);
++occupiedSlots;
}
}
}
if (!m_run)
return;
auto runcb = [this](boost::system::error_code const& error) { run(error); };
// 每c_runTimerInterval ms 调用一次run().
m_runTimer.expires_after(c_runTimerInterval);
m_runTimer.async_wait(runcb);
}
这个函数会每c_runTimerInterval毫秒执行一次,具体做的就是针对已经连接的节点们,检查连接状态,如果需要重新连接就重新连接,或者将节点删掉。
再回到前面这里:
try
{
startedWorking();
// workLoop()这里面有一个死循环
// 所以workLoop()会一直阻塞运行
workLoop();
// doneWorking()会在workLoop()结束后关闭监听,停止与其他节点的连接
doneWorking();
}
startedWorking()看完之后,看下workLoop()干啥了。
void Worker::workLoop()
{
while (m_state == WorkerState::Started)
{
// m_idleWaitMs 是0
if (m_idleWaitMs)
this_thread::sleep_for(chrono::milliseconds(m_idleWaitMs));
doWork();
}
}
很简单,doWork();在m_state == WorkerState::Started的前提下一直在不间断的执行。直接看doWork();
void Host::doWork()
{
try
{
if (m_run)
m_ioContext.run(); // 这个函数运行时会阻塞
}
catch (exception const& _e)
{
cwarn << "Exception in Network Thread: " << _e.what();
cwarn << "Network Restart is Recommended.";
}
}
通过m_ioContext.run()启动了runAcceptor()中的接收器,接收来发来的节点连接请求。
RLPXHandshake
上文runAcceptor()中
{
// incoming connection; we don't yet know nodeid
auto handshake = make_shared<RLPXHandshake>(this, socket);
m_connecting.push_back(handshake);
handshake->start();
success = true;
}
进行了握手,也就是注释所说的“对方发过来的连接,我们还不知道其nodeid”。
这里先说一下代码中定义的握手状态:
/// Sequential states of handshake
enum State
{
Error = -1,
New,
AckAuth,
AckAuthEIP8,
WriteHello,
ReadHello,
StartSession
};
/// 很明显,握手完成后就要开启会话(Session)
看下handshake->start();干嘛了,这段代码太长,主要流程我直接写到了下面代码注释中。这里的handshake是对方主动发起的连接请求:
// 一个RLPXHandshake对象对应着与一个远程节点的握手过程
// 所以这个transition函数会被执行多次
// 因为有m_nextState这个变量在控制,所以即使执行多次,也总是会成功或失败而退出
void RLPXHandshake::transition(boost::system::error_code _ech)
{
// reset timeout
m_idleTimer.cancel();
// 判断是否出错
if (_ech || m_nextState == Error || m_cancel)
{
if (_ech)
m_failureReason = HandshakeFailureReason::TCPError;
return error(_ech);
}
auto self(shared_from_this());
// assert只有在 Debug 版本中才有效,如果编译为 Release 版本则被忽略。
assert(m_nextState != StartSession);
// 开启一个计时器
m_idleTimer.expires_after(c_timeout);
// 如果计时器到时,则会执行如下函数
// 将m_nextState设置为Error后再transition(),transition()则会在前面那里判断处跳出函数
m_idleTimer.async_wait([this, self](boost::system::error_code const& _ec)
{
if (!_ec)
{
LOG(m_logger) << "Disconnecting (Handshake Timeout) from";
m_failureReason = HandshakeFailureReason::Timeout;
m_nextState = Error;
transition();
}
});
// m_nextState 初始值即为 New
if (m_nextState == New)
{
m_nextState = AckAuth;
// 如果我们是连接发起方,则m_originated=ture,需要我方先发送认证消息
if (m_originated)
// writeAuth()将我方的公钥等认证信息发过去,发送之后得到一个错误码ec,没有错误的话ec就是0
// writeAuth()内部会调用transition(ec)再次进行后续流程
writeAuth();
else
readAuth();
// 对于readAuth(),接收方拿到对方发来的认证信息,接收出错直接调用transition(ec)
// 如果接收没出错,则会调用RLPXHandshake::setAuthValues将认证信息存起来
// 然后将m_nextState设置为AckAuth
// 如果消息是EIP8认证信息,读取之后将m_nextState设置为AckAuthEIP8
// readAuth()最终总是会调用transition(ec)再次进行后续流程
// 注意,这里readAuth内部是异步的,声明完read到之后要进行的步骤后就马上退出了
// 只有read到之后才会向后进行
}
// 在上一步中,发起方发送了认证信息,接收方读取了认证信息,双方都将m_nextState设置为AckAuth
// 然后接收方开始执行writeAck(),发起方执行readAck()读取接收方发来的ack
else if (m_nextState == AckAuth)
{
m_nextState = WriteHello;
if (m_originated)
// readAck()和readAuth()一样,读取了对方发来的认证信息,同时也包含了EIP8认证信息的可能性
// 然后调用transition(ec)再次进行后续流程
// 注意,这里readAck内部是异步的,声明完read到之后要进行的步骤后就马上退出了
// 只有read到之后才会向后进行
// 如果对方的认证信息验证通过,则双方已经完成身份认证,我方可以发送hello
readAck();
else
writeAck();
// writeAck()是接收方将自己的公钥等认证信息发给了发起方,发送后调用transition(ec)再次进行后续流程
// 至此,双方已完成身份认证(如果我方发出的认证信息无误),接收方可以无需等待直接发送hello
// 这里有个问题,就是接收方发了hello,但是我方还没有开始接收怎么办?
}
else if (m_nextState == AckAuthEIP8) // 这个不再细说,和前面的都一样
{
m_nextState = WriteHello;
if (m_originated)
readAck();
else
writeAckEIP8();
}
else if (m_nextState == WriteHello) // 双方均向对端发送WriteHello
{
// Send the p2p capability Hello frame
LOG(m_logger) << p2pPacketTypeToString(HelloPacket) << " to";
m_nextState = ReadHello;
/// This pointer will be freed if there is an error otherwise
/// it will be passed to Host which will take ownership.
m_io.reset(new RLPXFrameCoder(*this));
// 在RLPStream中带上了自己的协议版本,客户端版本,caps,监听的端口和nodeid
RLPStream s;
s.append((unsigned)HelloPacket).appendList(5)
<< dev::p2p::c_protocolVersion
<< m_host->m_clientVersion
<< m_host->caps()
<< m_host->listenPort()
<< m_host->id();
// 将stream转为了字节包发了出去
bytes packet;
s.swapOut(packet);
m_io->writeSingleFramePacket(&packet, m_handshakeOutBuffer);
ba::async_write(m_socket->ref(), ba::buffer(m_handshakeOutBuffer),
[this, self](boost::system::error_code ec, std::size_t) {
transition(ec);
});
}
else if (m_nextState == ReadHello) // 接收对方发来的Hello包,验证通过后启动会话
{
// Authenticate and decrypt initial hello frame with initial RLPXFrameCoder
// and request m_host to start session.
m_nextState = StartSession;
// read frame header
constexpr size_t handshakeSizeBytes = 32;
m_handshakeInBuffer.resize(handshakeSizeBytes);
ba::async_read(m_socket->ref(),
boost::asio::buffer(m_handshakeInBuffer, handshakeSizeBytes),
[this, self](boost::system::error_code ec, std::size_t) {
if (ec)
transition(ec);
else
{
if (!m_io)
{
LOG(m_errorLogger)
<< "Internal error in handshake: RLPXFrameCoder disappeared ("
<< m_remote << ")";
m_failureReason = HandshakeFailureReason::InternalError;
m_nextState = Error;
transition();
return;
}
LOG(m_logger) << "Frame header from";
/// authenticate and decrypt header
if (!m_io->authAndDecryptHeader(
bytesRef(m_handshakeInBuffer.data(), m_handshakeInBuffer.size())))
{
m_failureReason = HandshakeFailureReason::FrameDecryptionFailure;
m_nextState = Error;
transition();
return;
}
LOG(m_logger) << "Successfully decrypted frame header, validating contents...";
/// check frame size
bytes const& header = m_handshakeInBuffer;
uint32_t const frameSize = (uint32_t)(header[2]) | (uint32_t)(header[1]) << 8 |
(uint32_t)(header[0]) << 16;
constexpr size_t expectedFrameSizeBytes = 1024;
if (frameSize > expectedFrameSizeBytes)
{
// all future frames: 16777216
LOG(m_logger)
<< "Frame is too large! Expected size: " << expectedFrameSizeBytes
<< " bytes, actual size: " << frameSize << " bytes";
m_failureReason = HandshakeFailureReason::ProtocolError;
m_nextState = Error;
transition();
return;
}
/// rlp of header has protocol-type, sequence-id[, total-packet-size]
bytes headerRLP(header.size() - 3 - h128::size); // this is always 32 - 3 - 16
// = 13. wtf?
bytesConstRef(&header).cropped(3).copyTo(&headerRLP);
/// read padded frame and mac
constexpr size_t byteBoundary = 16;
m_handshakeInBuffer.resize(
frameSize + ((byteBoundary - (frameSize % byteBoundary)) % byteBoundary) +
h128::size);
LOG(m_logger) << "Frame header contents validated";
ba::async_read(m_socket->ref(),
boost::asio::buffer(m_handshakeInBuffer, m_handshakeInBuffer.size()),
[this, self, headerRLP](boost::system::error_code ec, std::size_t) {
m_idleTimer.cancel();
if (ec)
transition(ec);
else
{
if (!m_io)
{
LOG(m_errorLogger) << "Internal error in handshake: "
"RLPXFrameCoder disappeared";
m_failureReason = HandshakeFailureReason::InternalError;
m_nextState = Error;
transition();
return;
}
LOG(m_logger) << "Frame body from";
bytesRef frame(&m_handshakeInBuffer);
if (!m_io->authAndDecryptFrame(frame))
{
LOG(m_logger) << "Frame body decrypt failed";
m_failureReason =
HandshakeFailureReason::FrameDecryptionFailure;
m_nextState = Error;
transition();
return;
}
// 0x80 = special case for 0
P2pPacketType packetType =
frame[0] == 0x80 ? HelloPacket : static_cast<P2pPacketType>(frame[0]);
if (packetType != HelloPacket)
{
LOG(m_logger)
<< "Invalid packet type. Expected: "
<< p2pPacketTypeToString(HelloPacket)
<< ", received: " << p2pPacketTypeToString(packetType);
m_failureReason = HandshakeFailureReason::ProtocolError;
if (packetType == DisconnectPacket)
{
try
{
// Explicitly avoid RLP::FailIfTooLarge exception if RLP
// data is smaller than bytes buffer since msg data is
// significantly smaller than buffer size
RLP rlp{frame.cropped(1),
RLP::ThrowOnFail | RLP::FailIfTooSmall};
if (rlp)
{
auto const reason = static_cast<DisconnectReason>(
rlp[0].toInt<int>());
LOG(m_logger)
<< "Disconnect reason: " << reasonOf(reason);
// We set a failure reason of DisconnectRequested
// since it's not a violation of the protocol (a
// disconnect packet is allowed at any moment) so we
// will try to reconnect later.
m_failureReason =
HandshakeFailureReason::DisconnectRequested;
}
}
catch (std::exception const& _e)
{
LOG(m_errorLogger)
<< "Exception occurred while decoding RLP msg data "
"in "
<< p2pPacketTypeToString(DisconnectPacket) << ": "
<< _e.what();
}
}
m_nextState = Error;
transition();
return;
}
LOG(m_logger) << p2pPacketTypeToString(HelloPacket)
<< " verified. Starting session with";
try
{
RLP rlp(
frame.cropped(1), RLP::ThrowOnFail | RLP::FailIfTooSmall);
m_host->startPeerSession(m_remote, rlp, move(m_io), m_socket);
}
catch (std::exception const& _e)
{
LOG(m_errorLogger)
<< "Handshake causing an exception: " << _e.what();
m_failureReason = HandshakeFailureReason::UnknownFailure;
m_nextState = Error;
transition();
}
}
});
}
});
}
}
流程如下图:

连接节点数量
以太坊Kad协议一直维护着可用的节点列表,而以太坊并不与列表中的所有节点连接,而是如下定义:
unsigned peerSlots(PeerSlotType _type) { return _type == Egress ? m_idealPeerCount : m_idealPeerCount * m_stretchPeers; }
unsigned m_idealPeerCount = 11; ///< Ideal number of peers to be connected to.
unsigned m_stretchPeers = 7; ///< Accepted connection multiplier (max peers = ideal*stretch).
意味着最多有77个节点可以尝试和我们连接,但是我们最终只与11个节点成功建立连接。
NodeTable的构造函数
构造函数的函数体如下:
{
/// 对桶进行初始化
/// 第i个桶存放了i距离的节点
for (unsigned i = 0; i < s_bins; i++)
m_buckets[i].distance = i;
if (!_enabled)
{
cwarn << "\"_enabled\" parameter is false, discovery is disabled";
return;
}
try
{
m_socket->connect();
doDiscovery();
doHandleTimeouts();
doEndpointTracking();
}
catch (exception const& _e)
{
cwarn << "Exception connecting NodeTable socket: " << _e.what();
cwarn << "Discovery disabled.";
}
}
重点看其中的try:
m_socket->connect(); // 绑定UDP端口
doDiscovery(); // 启动发现服务
doHandleTimeouts(); //
doEndpointTracking();//
doDiscovery()
void NodeTable::doDiscovery()
{
// 设置一个7200ms的定时器
m_discoveryTimer->expires_after(c_bucketRefreshMs);
auto discoveryTimer{m_discoveryTimer};
// 每次定时器触发都执行如下函数发现节点
m_discoveryTimer->async_wait([this, discoveryTimer](boost::system::error_code const& _ec) {
// We can't use m_logger if an error occurred because captured this might be already
// destroyed
if (_ec.value() == boost::asio::error::operation_aborted ||
discoveryTimer->expiry() == c_steadyClockMin)
{
clog(VerbosityDebug, "discov") << "Discovery timer was cancelled";
return;
}
else if (_ec)
{
clog(VerbosityDebug, "discov")
<< "Discovery timer error detected: " << _ec.value() << " " << _ec.message();
return;
}
// 随机选择一个nodeid,进行发现
NodeID randNodeId;
crypto::Nonce::get().ref().copyTo(randNodeId.ref().cropped(0, h256::size));
crypto::Nonce::get().ref().copyTo(randNodeId.ref().cropped(h256::size, h256::size));
LOG(m_logger) << "Starting discovery algorithm run for random node id: " << randNodeId;
doDiscoveryRound(randNodeId, 0 /* round */, make_shared<set<shared_ptr<NodeEntry>>>());
});
}
我方主动发起连接请求
上面提到的⭐️⭐️⭐️⭐️:run(boost::system::error_code())函数中启动了NodeTable的事件监听器,可以看到只有NodeEntryAdded和NodeEntryDropped两种事件:
void Host::onNodeTableEvent(NodeID const& _n, NodeTableEventType const& _e)
{
// 节点入桶事件
if (_e == NodeEntryAdded)
{
LOG(m_logger) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n;
if (Node n = nodeFromNodeTable(_n))
{
shared_ptr<Peer> p;
DEV_RECURSIVE_GUARDED(x_sessions)
{
// 如果m_peers中已经有这个节点了,那么就直接从m_peers拿这个节点
if (m_peers.count(_n))
{
p = m_peers[_n];
p->endpoint = n.endpoint;
}
else
{ // 如果m_peers中没有这个节点,就把这个节点存在m_peers中
p = make_shared<Peer>(n);
m_peers[_n] = p;
LOG(m_logger) << "p2p.host.peers.events.peerAdded " << _n << " " << p->endpoint;
}
}
// 如果还可以连接新的节点,那就连接他
if (peerSlotsAvailable(Egress))
connect(p);
}
}
// 节点从桶中删除事件
else if (_e == NodeEntryDropped)
{
LOG(m_logger) << "p2p.host.nodeTable.events.NodeEntryDropped " << _n;
RecursiveGuard l(x_sessions);
if (m_peers.count(_n) && m_peers[_n]->peerType == PeerType::Optional)
m_peers.erase(_n);
}
}
主动连接时是这个函数,可以看到仍然是RLPX握手:
void Host::connect(shared_ptr<Peer> const& _p)
{
if (!m_run)
{
cwarn << "Network not running so cannot connect to peer " << _p->id << "@" << _p->address();
return;
}
if (!haveCapabilities())
{
cwarn << "No capabilities registered so cannot connect to peer " << _p->id << "@" << _p->address();
return;
}
if (isHandshaking(_p->id))
{
cwarn << "Aborted connection. RLPX handshake with peer already in progress: " << _p->id
<< "@" << _p->endpoint;
return;
}
if (havePeerSession(_p->id))
{
cnetdetails << "Aborted connection. Peer already connected: " << _p->id << "@"
<< _p->endpoint;
return;
}
if (!nodeTableHasNode(_p->id) && _p->peerType == PeerType::Optional)
return;
// prevent concurrently connecting to a node
Peer *nptr = _p.get();
if (m_pendingPeerConns.count(nptr))
return;
m_pendingPeerConns.insert(nptr);
_p->m_lastAttempted = chrono::system_clock::now();
bi::tcp::endpoint ep(_p->endpoint);
cnetdetails << "Attempting connection to " << _p->id << "@" << ep << " from " << id();
auto socket = make_shared<RLPXSocket>(bi::tcp::socket{m_ioContext});
socket->ref().async_connect(ep, [=](boost::system::error_code const& ec)
{
_p->m_lastAttempted = chrono::system_clock::now();
_p->m_failedAttempts++;
if (ec)
{
cnetdetails << "Connection refused to node " << _p->id << "@" << ep << " ("
<< ec.message() << ")";
// Manually set error (session not present)
_p->m_lastDisconnect = TCPError;
}
else
{
cnetdetails << "Starting RLPX handshake with " << _p->id << "@" << ep;
auto handshake = make_shared<RLPXHandshake>(this, socket, _p->id);
{
Guard l(x_connecting);
m_connecting.push_back(handshake);
}
handshake->start();
}
m_pendingPeerConns.erase(nptr);
});
}
最后,我们看下Host类的一些私有变量:
std::set<Peer*> m_pendingPeerConns; /// Used only by connect(Peer&) to limit concurrently connecting to same node. See connect(shared_ptr<Peer>const&).
bi::tcp::endpoint m_tcpPublic; ///< Our public listening endpoint.
/// Alias for network communication.
KeyPair m_alias;
/// Host's Ethereum Node Record restored from network.rlp
ENR const m_restoredENR;
std::shared_ptr<NodeTable> m_nodeTable; ///< Node table (uses kademlia-like discovery).
mutable std::mutex x_nodeTable;
std::shared_ptr<NodeTable> nodeTable() const { Guard l(x_nodeTable); return m_nodeTable; }
/// Shared storage of Peer objects. Peers are created or destroyed on demand by the Host. Active sessions maintain a shared_ptr to a Peer;
std::unordered_map<NodeID, std::shared_ptr<Peer>> m_peers;
/// Peers we try to connect regardless of p2p network.
std::set<NodeID> m_requiredPeers;
mutable Mutex x_requiredPeers;
/// The nodes to which we are currently connected. Used by host to service peer requests and keepAlivePeers and for shutdown. (see run())
/// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method.
mutable std::unordered_map<NodeID, std::weak_ptr<SessionFace>> m_sessions;
mutable RecursiveMutex x_sessions;
/// Pending connections. Completed handshakes are garbage-collected in run() (a handshake is
/// complete when there are no more shared_ptrs in handlers)
std::list<std::weak_ptr<RLPXHandshake>> m_connecting;
mutable Mutex x_connecting; ///< Mutex for m_connecting.
unsigned m_idealPeerCount = 11; ///< Ideal number of peers to be connected to.
unsigned m_stretchPeers = 7; ///< Accepted connection multiplier (max peers = ideal*stretch).
/// Each of the capabilities we support. CapabilityRuntime is used to run a capability's
/// background work loop
std::map<CapDesc, CapabilityRuntime> m_capabilities;
std::chrono::steady_clock::time_point m_lastPing; ///< Time we sent the last ping to all peers.
其中有几个存储节点的变量有必要分析一下。
std::set<Peer*> m_pendingPeerConns
:存储那些将要发起连接,但还未连接成功的节点。用于防止对同一节点重复发起连接。std::shared_ptr<NodeTable> m_nodeTable
:Node table (uses kademlia-like discovery).std::unordered_map<NodeID, std::shared_ptr<Peer>> m_peers;
:认为已经连接的节点列表,Peer中含有session。这里会有一些掉线的节点,尝试重连。std::set<NodeID> m_requiredPeers;
:无论 p2p 网络如何,我们都会尝试连接的节点。这里是用户手动添加的节点。mutable std::unordered_map<NodeID, std::weak_ptr<SessionFace>> m_sessions;
:连接成功的节点Session,这与m_peers中存储的是相同的,但是这里存储的session全都是连接正常的,一旦session断开,就会从这里清除。std::list<std::weak_ptr<RLPXHandshake>> m_connecting;
:正在连接的节点,系统会在握手之前将节点写入此列表。
BootNodes
博客以太坊的P2P节点入网 - 小木槌给出了Geth的BootNodes。
Aleth的BootNodes位于文件libp2p/Common.cpp
中,
