Skip to content

title: Aleth启动-P2P description: 分析了Aleth启动过程中P2P服务的代码运行细节 author: 付铭 version: 1.0.0 date: 2023-02-28 19:00:00

先上图,整体的流程如下:

1

main()

aleth启动的主函数位于aleth/aleth/main.cpp中,。

main函数中下面这行代码启动了eth服务:

cpp
dev::WebThreeDirect web3(WebThreeDirect::composeClientVersion("aleth"), db::databasePath(),
        snapshotPath, chainParams, withExisting, netPrefs, &nodesState, testingMode);

// 在WebThreeDirect中,传入的参数WebThreeDirect::composeClientVersion("aleth"), netPrefs, &nodesState和P2P网络有关,用于生成在WebThreeDirect中的成员变量m_net
// dev::WebThreeDirect的成员变量m_net就是eth服务的P2P服务,m_net会在后台运行并在接收到新块时向eth服务发送消息,并且也允许eth广播新块

再往下,这里启动了P2P网络服务:

cpp
if (bootstrap || !remoteHost.empty() || !disableDiscovery || listenSet || !preferredNodes.empty())
    {
        // 这里启动了P2P服务
        web3.startNetwork();
        cout << "Node ID: " << web3.enode() << "\n";
    }

再往下,这里添加了预设节点:

cpp
// 这里主要是在p2p服务启动后,向Kad协议桶中加入用户预定义的节点和bootnodes
    if (web3.isNetworkStarted())
    {
        for (auto const& p: preferredNodes)
            if (p.second.second)
                web3.requirePeer(p.first, p.second.first);
            else
                web3.addNode(p.first, p.second.first);

        // 添加默认bootnodes
      	// 如果启动时声明了test、no-discovery或no-bootstrap,则bootstrap=false
        if (bootstrap)
            for (auto const& i : defaultBootNodes())
                web3.addNode(i.first, i.second);
        if (!remoteHost.empty())
            web3.addNode(p2p::NodeID(), remoteHost + ":" + toString(remotePort));
    }

就此,main函数关于P2P启动部分就差不多了,最重要的就是web3.startNetwork();,看下他做了什么工作。

web3.startNetwork()

步进之后,可以看到其直接调用了此函数:

cpp
void Host::start()
{
    DEV_TIMED_FUNCTION_ABOVE(500);
    if (m_nodeTable)
        BOOST_THROW_EXCEPTION(NetworkRestartNotSupported());

    startWorking(); // ⭐️⭐️⭐️ 
    while (isWorking() && !haveNetwork())
        this_thread::sleep_for(chrono::milliseconds(10));

    // network start failed!
    if (isWorking())
        return;

    cwarn << "Network start failed!";
    doneWorking();
}

最重要的部分就是startWorking();,再深入查看:

cpp
void Worker::startWorking()
{
    //	cnote << "startWorking for thread" << m_name;
    std::unique_lock<std::mutex> l(x_work);
    if (m_work)
    {
        WorkerState ex = WorkerState::Stopped;
        m_state.compare_exchange_strong(ex, WorkerState::Starting);
        m_state_notifier.notify_all();
    }
    else
    {
        m_state = WorkerState::Starting;
        m_state_notifier.notify_all();
        // 创建线程⭐️⭐️⭐️ 
        m_work.reset(new thread([&]() {
            setThreadName(m_name.c_str());
            cnote << "New thread begins: " << m_name.c_str();
            while (m_state != WorkerState::Killing)
            {
                WorkerState ex = WorkerState::Starting;
                {
                    // the condition variable-related lock
                    unique_lock<mutex> l(x_work);
                    m_state = WorkerState::Started;
                }
                //cnote << "Trying to set Started: Thread was" << (unsigned)ex << "; " <<
                //ok;
                m_state_notifier.notify_all();

                try
                {
                    startedWorking();
                    workLoop();
                    doneWorking();
                }
                catch (std::exception const& _e)
                {
                    cwarn << "Exception thrown in Worker thread: " << _e.what();
                }

                //				ex = WorkerState::Stopping;
                //				m_state.compare_exchange_strong(ex, WorkerState::Stopped);

                {
                    // the condition variable-related lock
                    unique_lock<mutex> l(x_work);
                    ex = m_state.exchange(WorkerState::Stopped);
                    //					cnote << "State: Stopped: Thread was" << (unsigned)ex;
                    if (ex == WorkerState::Killing || ex == WorkerState::Starting)
                        m_state.exchange(ex);
                }
                m_state_notifier.notify_all();
                //				cnote << "Waiting until not Stopped...";

                {
                    unique_lock<mutex> l(x_work);
                    DEV_TIMED_ABOVE("Worker stopping", 100)
                    while (m_state == WorkerState::Stopped)
                        m_state_notifier.wait(l);
                }
            }
        }));
        //		cnote << "Spawning" << m_name;
    }

    DEV_TIMED_ABOVE("Start worker", 100)
    while (m_state == WorkerState::Starting)
        m_state_notifier.wait(l);
}

最重要的就是上面标⭐️⭐️⭐️的部分创建了线程,然后便是下面的

cpp
                try
                {
                    startedWorking();
                    workLoop();
                    doneWorking();
                }

在线程中启动了主要的服务。

再往下看startedWorking();

cpp
// Called after thread has been started to perform additional class-specific state
// initialization (e.g. start capability threads, start TCP listener, and kick off timers)
void Host::startedWorking()
{
    if (haveCapabilities())
    {
        startCapabilities();

        // try to open acceptor (todo: ipv6)
        // 注意这里传了一个m_tcp4Acceptor进去到tcp4Listen
        int port = Network::tcp4Listen(m_tcp4Acceptor, m_netConfig);
        if (port > 0)
        {
            m_listenPort = port;
            runAcceptor(); // ⭐️
        }
        else
            LOG(m_logger) << "p2p.start.notice id: " << id()
                          << " TCP Listen port is invalid or unavailable.";
    }
    else
        m_listenPort = m_netConfig.listenPort;

    m_tcpPublic = determinePublic();
    ENR const enr = updateENR(m_restoredENR, m_tcpPublic, listenPort());

    // 创建Kad协议NodeTable ⭐️⭐️
    // New NodeTable会调用其构造函数,构造函数绑定了本地端口,启动了发现服务,
    auto nodeTable = make_shared<NodeTable>(m_ioContext, m_alias,
        NodeIPEndpoint(bi::make_address(listenAddress()), listenPort(), listenPort()), enr,
        m_netConfig.discovery, m_netConfig.allowLocalDiscovery);

    // Don't set an event handler if we don't have capabilities, because no capabilities
    // means there's no host state to update in response to node table events
    if (haveCapabilities())
        nodeTable->setEventHandler(new HostNodeTableHandler(*this));
    DEV_GUARDED(x_nodeTable)
    m_nodeTable = nodeTable;

    m_run = true;
  	// 加载P2P网络的相关信息 ⭐️⭐️⭐️
    restoreNetwork(&m_restoreNetwork);
    if (haveCapabilities())
    {
        LOG(m_logger) << "devp2p started. Node id: " << id();
      	// ⭐️⭐️⭐️⭐️
        run(boost::system::error_code()); 
    }
    else
        LOG(m_logger) << "No registered capabilities, devp2p not started.";
}

这函数主要看三处:

  • ⭐️:runAcceptor()
  • ⭐️⭐️:NodeTable的初始化调用了其构造函数
  • ⭐️⭐️⭐️:restoreNetwork从缓存中重新加载P2P相关网络信息
  • ⭐️⭐️⭐️⭐️:run(boost::system::error_code());

看下runAcceptor(),直接看注释:

cpp
void Host::runAcceptor()
{
    assert(m_listenPort > 0);

    if (m_tcp4Acceptor.is_open() && !m_accepting)
    {
        cnetdetails << "Listening on local port " << m_listenPort;
        m_accepting = true;

        // 这里只是对m_tcp4Acceptor的async_accept函数进行声明,并没有启动
    		// 真正的启动要等到m_ioContext.run()才启动了m_tcp4Acceptor
    		// 在Host的构造函数中对m_tcp4Acceptor的声明是这样的:m_tcp4Acceptor(m_ioContext)
    		// 这里是boost的异步网络编程
    		// 在下面传入async_accept的匿名函数中,可以看到其有些地方重新调用了runAcceptor这个函数
    		// 这是因为传入async_accept的匿名函数只能处理一次连接,连接关闭后函数结束
    		// 如果希望传入async_accept的匿名函数能够重复处理连接,则再次将匿名函数传入async_accept即可
        m_tcp4Acceptor.async_accept([this](boost::system::error_code _ec, bi::tcp::socket _socket) {
            std::cout << "void Host::runAcceptor()" << std::endl;
            m_accepting = false;
            if (_ec || !m_tcp4Acceptor.is_open())
                return;

            auto socket = make_shared<RLPXSocket>(std::move(_socket));
            if (peerCount() > peerSlots(Ingress))
            {
                cnetdetails << "Dropping incoming connect due to maximum peer count (" << Ingress
                            << " * ideal peer count): " << socket->remoteEndpoint();
                socket->close();
                if (_ec.value() < 1)
                    runAcceptor();
                return;
            }

            // 这里会尝试与对方进行RLPXHandshake
            // 会把handshake存储在m_connecting中
            // 然后开始handshake
          	// 握手具体流程放在最后讲
            bool success = false;
            try
            {
                // incoming connection; we don't yet know nodeid
                auto handshake = make_shared<RLPXHandshake>(this, socket);
                m_connecting.push_back(handshake);
                handshake->start();
                success = true;
            }
            catch (Exception const& _e)
            {
                cwarn << "ERROR: " << diagnostic_information(_e);
            }
            catch (exception const& _e)
            {
                cwarn << "ERROR: " << _e.what();
            }

            if (!success)
                socket->ref().close();
            runAcceptor();
        });
    }
}

NodeTable构造函数中最重要的三步:

cpp
try
{
    m_socket->connect(); // 绑定了UDP端口并接收处理UDP的ENR消息,主要在void NodeTable::onPacketReceived处理Ping、Pong、ENRRequest等等
    doDiscovery(); // 这里进行发现节点,根据代码,好像是随机一个nodeid然后去查
    doHandleTimeouts(); // 如果其他节点未能及时回复我们的ping,则将其从k桶中删去,并从替补节点中找一个补上
    doEndpointTracking(); // 做了一些回收工作,处理某些列表中的无用信息
}

然后是从缓存中加载网络⭐️⭐️⭐️restoreNetwork:network缓存在初始化eth服务时对m_net的初始化时已经从磁盘读出并写入m_net中,只是一直未解析。restoreNetwork则从变量中解析缓存的节点并重新连接他们(将节点加入到Kad NodeTable中,NodeTable会自动去连接次节点)。

然后是⭐️⭐️⭐️⭐️:run(boost::system::error_code()),看下干啥了:

cpp
// 运行P2P服务
void Host::run(boost::system::error_code const& _ec)
{
    if (!m_run || _ec)
        return;

    // This again requires x_nodeTable, which is why an additional variable nodeTable is used.
  	// 这里启动了NodeTable的事件监听器,比如节点加入NodeTable,NodeTable会自动连接该节点
    if (auto nodeTable = this->nodeTable())
        nodeTable->processEvents();

    // cleanup zombies
    DEV_GUARDED(x_connecting)
    m_connecting.remove_if([](weak_ptr<RLPXHandshake> h) { return h.expired(); });

    keepAlivePeers();
    logActivePeers();

    // At this time peers will be disconnected based on natural TCP timeout.
    // disconnectLatePeers needs to be updated for the assumption that Session
    // is always live and to ensure reputation and fallback timers are properly
    // updated. // disconnectLatePeers();

    // todo: update peerSlotsAvailable()

    list<shared_ptr<Peer>> toConnect;
    unsigned reqConn = 0;
    {
        RecursiveGuard l(x_sessions);
        auto p = m_peers.cbegin();
        while (p != m_peers.cend())
        {
            bool peerRemoved = false;
            bool haveSession = havePeerSession(p->second->id);
            bool required = p->second->peerType == PeerType::Required;
            if (haveSession && required)
                reqConn++;
            else if (!haveSession)
            {
                if (p->second->isUseless())
                {
                    peerRemoved = true;
                    p = m_peers.erase(p);
                }
                else if (p->second->shouldReconnect() && (!m_netConfig.pin || required))
                    toConnect.push_back(p->second);
            }
            if (!peerRemoved)
                p++;
        }
    }

    for (auto p : toConnect)
        if (p->peerType == PeerType::Required && reqConn++ < m_idealPeerCount)
            connect(p);

    if (!m_netConfig.pin)
    {
        unsigned const maxSlots = m_idealPeerCount + reqConn;
        unsigned occupiedSlots = peerCount() + m_pendingPeerConns.size();
        for (auto peerToConnect = toConnect.cbegin();
             occupiedSlots <= maxSlots && peerToConnect != toConnect.cend(); ++peerToConnect)
        {
            if ((*peerToConnect)->peerType == PeerType::Optional)
            {
                connect(*peerToConnect);
                ++occupiedSlots;
            }
        }
    }

    if (!m_run)
        return;

    auto runcb = [this](boost::system::error_code const& error) { run(error); };
    // 每c_runTimerInterval ms 调用一次run().
    m_runTimer.expires_after(c_runTimerInterval);
    m_runTimer.async_wait(runcb);
}

这个函数会每c_runTimerInterval毫秒执行一次,具体做的就是针对已经连接的节点们,检查连接状态,如果需要重新连接就重新连接,或者将节点删掉。

再回到前面这里:

cpp
								try
                {
                    startedWorking();
                  	// workLoop()这里面有一个死循环
                  	// 所以workLoop()会一直阻塞运行
                    workLoop();
                  	// doneWorking()会在workLoop()结束后关闭监听,停止与其他节点的连接
                    doneWorking();
                }

startedWorking()看完之后,看下workLoop()干啥了。

cpp
void Worker::workLoop()
{
    while (m_state == WorkerState::Started)
    {
      	// m_idleWaitMs 是0
        if (m_idleWaitMs)
            this_thread::sleep_for(chrono::milliseconds(m_idleWaitMs));
        doWork();
    }
}

很简单,doWork();在m_state == WorkerState::Started的前提下一直在不间断的执行。直接看doWork();

cpp
void Host::doWork()
{
    try
    {
        if (m_run)
            m_ioContext.run(); // 这个函数运行时会阻塞
    }
    catch (exception const& _e)
    {
        cwarn << "Exception in Network Thread: " << _e.what();
        cwarn << "Network Restart is Recommended.";
    }
}

通过m_ioContext.run()启动了runAcceptor()中的接收器,接收来发来的节点连接请求。

RLPXHandshake

上文runAcceptor()中

cpp
						{
                // incoming connection; we don't yet know nodeid
                auto handshake = make_shared<RLPXHandshake>(this, socket);
                m_connecting.push_back(handshake);
                handshake->start();
                success = true;
            }

进行了握手,也就是注释所说的“对方发过来的连接,我们还不知道其nodeid”。

这里先说一下代码中定义的握手状态:

cpp
    /// Sequential states of handshake
    enum State
    {
        Error = -1,
        New,
        AckAuth,
        AckAuthEIP8,
        WriteHello,
        ReadHello,
        StartSession
    };
		/// 很明显,握手完成后就要开启会话(Session)

看下handshake->start();干嘛了,这段代码太长,主要流程我直接写到了下面代码注释中。这里的handshake是对方主动发起的连接请求:

cpp
// 一个RLPXHandshake对象对应着与一个远程节点的握手过程
// 所以这个transition函数会被执行多次
// 因为有m_nextState这个变量在控制,所以即使执行多次,也总是会成功或失败而退出
void RLPXHandshake::transition(boost::system::error_code _ech)
{
    // reset timeout
    m_idleTimer.cancel();

    // 判断是否出错
    if (_ech || m_nextState == Error || m_cancel)
    {
        if (_ech)
            m_failureReason = HandshakeFailureReason::TCPError;
        return error(_ech);
    }

    auto self(shared_from_this());
    // assert只有在 Debug 版本中才有效,如果编译为 Release 版本则被忽略。
    assert(m_nextState != StartSession);
    // 开启一个计时器
    m_idleTimer.expires_after(c_timeout);
    // 如果计时器到时,则会执行如下函数
    // 将m_nextState设置为Error后再transition(),transition()则会在前面那里判断处跳出函数
    m_idleTimer.async_wait([this, self](boost::system::error_code const& _ec)
    {
        if (!_ec)
        {
            LOG(m_logger) << "Disconnecting (Handshake Timeout) from";
            m_failureReason = HandshakeFailureReason::Timeout;
            m_nextState = Error;
            transition();
        }
    });

    // m_nextState 初始值即为 New
    if (m_nextState == New)
    {
        m_nextState = AckAuth;
        // 如果我们是连接发起方,则m_originated=ture,需要我方先发送认证消息
        if (m_originated)
            // writeAuth()将我方的公钥等认证信息发过去,发送之后得到一个错误码ec,没有错误的话ec就是0
            // writeAuth()内部会调用transition(ec)再次进行后续流程
            writeAuth();
        else
            readAuth();
            // 对于readAuth(),接收方拿到对方发来的认证信息,接收出错直接调用transition(ec)
            // 如果接收没出错,则会调用RLPXHandshake::setAuthValues将认证信息存起来
            // 然后将m_nextState设置为AckAuth
            // 如果消息是EIP8认证信息,读取之后将m_nextState设置为AckAuthEIP8
            // readAuth()最终总是会调用transition(ec)再次进行后续流程
            // 注意,这里readAuth内部是异步的,声明完read到之后要进行的步骤后就马上退出了
            // 只有read到之后才会向后进行
    }
    // 在上一步中,发起方发送了认证信息,接收方读取了认证信息,双方都将m_nextState设置为AckAuth
    // 然后接收方开始执行writeAck(),发起方执行readAck()读取接收方发来的ack
    else if (m_nextState == AckAuth)
    {
        m_nextState = WriteHello;
        if (m_originated)
            // readAck()和readAuth()一样,读取了对方发来的认证信息,同时也包含了EIP8认证信息的可能性
            // 然后调用transition(ec)再次进行后续流程
            // 注意,这里readAck内部是异步的,声明完read到之后要进行的步骤后就马上退出了
            // 只有read到之后才会向后进行
            // 如果对方的认证信息验证通过,则双方已经完成身份认证,我方可以发送hello
            readAck();
        else
            writeAck();
            // writeAck()是接收方将自己的公钥等认证信息发给了发起方,发送后调用transition(ec)再次进行后续流程
            // 至此,双方已完成身份认证(如果我方发出的认证信息无误),接收方可以无需等待直接发送hello
            // 这里有个问题,就是接收方发了hello,但是我方还没有开始接收怎么办?
    }
    else if (m_nextState == AckAuthEIP8) // 这个不再细说,和前面的都一样
    {
        m_nextState = WriteHello;
        if (m_originated)
            readAck();
        else
            writeAckEIP8();
    }
    else if (m_nextState == WriteHello) // 双方均向对端发送WriteHello
    {
        // Send the p2p capability Hello frame
        LOG(m_logger) << p2pPacketTypeToString(HelloPacket) << " to";

        m_nextState = ReadHello;

        /// This pointer will be freed if there is an error otherwise
        /// it will be passed to Host which will take ownership.
        m_io.reset(new RLPXFrameCoder(*this));

        // 在RLPStream中带上了自己的协议版本,客户端版本,caps,监听的端口和nodeid
        RLPStream s;
        s.append((unsigned)HelloPacket).appendList(5)
            << dev::p2p::c_protocolVersion
            << m_host->m_clientVersion
            << m_host->caps()
            << m_host->listenPort()
            << m_host->id();

        // 将stream转为了字节包发了出去
        bytes packet;
        s.swapOut(packet);
        m_io->writeSingleFramePacket(&packet, m_handshakeOutBuffer);
        ba::async_write(m_socket->ref(), ba::buffer(m_handshakeOutBuffer),
            [this, self](boost::system::error_code ec, std::size_t) {
                transition(ec);
            });
    }
    else if (m_nextState == ReadHello) // 接收对方发来的Hello包,验证通过后启动会话
    {
        // Authenticate and decrypt initial hello frame with initial RLPXFrameCoder
        // and request m_host to start session.
        m_nextState = StartSession;

        // read frame header
        constexpr size_t handshakeSizeBytes = 32;
        m_handshakeInBuffer.resize(handshakeSizeBytes);
        ba::async_read(m_socket->ref(),
            boost::asio::buffer(m_handshakeInBuffer, handshakeSizeBytes),
            [this, self](boost::system::error_code ec, std::size_t) {
                if (ec)
                    transition(ec);
                else
                {
                    if (!m_io)
                    {
                        LOG(m_errorLogger)
                            << "Internal error in handshake: RLPXFrameCoder disappeared ("
                            << m_remote << ")";
                        m_failureReason = HandshakeFailureReason::InternalError;
                        m_nextState = Error;
                        transition();
                        return;
                    }

                    LOG(m_logger) << "Frame header from";

                    /// authenticate and decrypt header
                    if (!m_io->authAndDecryptHeader(
                            bytesRef(m_handshakeInBuffer.data(), m_handshakeInBuffer.size())))
                    {
                        m_failureReason = HandshakeFailureReason::FrameDecryptionFailure;
                        m_nextState = Error;
                        transition();
                        return;
                    }

                    LOG(m_logger) << "Successfully decrypted frame header, validating contents...";

                    /// check frame size
                    bytes const& header = m_handshakeInBuffer;
                    uint32_t const frameSize = (uint32_t)(header[2]) | (uint32_t)(header[1]) << 8 |
                                               (uint32_t)(header[0]) << 16;
                    constexpr size_t expectedFrameSizeBytes = 1024;
                    if (frameSize > expectedFrameSizeBytes)
                    {
                        // all future frames: 16777216
                        LOG(m_logger)
                            << "Frame is too large! Expected size: " << expectedFrameSizeBytes
                            << " bytes, actual size: " << frameSize << " bytes";
                        m_failureReason = HandshakeFailureReason::ProtocolError;
                        m_nextState = Error;
                        transition();
                        return;
                    }

                    /// rlp of header has protocol-type, sequence-id[, total-packet-size]
                    bytes headerRLP(header.size() - 3 - h128::size);  // this is always 32 - 3 - 16
                                                                      // = 13. wtf?
                    bytesConstRef(&header).cropped(3).copyTo(&headerRLP);

                    /// read padded frame and mac
                    constexpr size_t byteBoundary = 16;
                    m_handshakeInBuffer.resize(
                        frameSize + ((byteBoundary - (frameSize % byteBoundary)) % byteBoundary) +
                        h128::size);

                    LOG(m_logger) << "Frame header contents validated";

                    ba::async_read(m_socket->ref(),
                        boost::asio::buffer(m_handshakeInBuffer, m_handshakeInBuffer.size()),
                        [this, self, headerRLP](boost::system::error_code ec, std::size_t) {
                            m_idleTimer.cancel();

                            if (ec)
                                transition(ec);
                            else
                            {
                                if (!m_io)
                                {
                                    LOG(m_errorLogger) << "Internal error in handshake: "
                                                          "RLPXFrameCoder disappeared";
                                    m_failureReason = HandshakeFailureReason::InternalError;
                                    m_nextState = Error;
                                    transition();
                                    return;
                                }
                                LOG(m_logger) << "Frame body from";
                                bytesRef frame(&m_handshakeInBuffer);
                                if (!m_io->authAndDecryptFrame(frame))
                                {
                                    LOG(m_logger) << "Frame body decrypt failed";
                                    m_failureReason =
                                        HandshakeFailureReason::FrameDecryptionFailure;
                                    m_nextState = Error;
                                    transition();
                                    return;
                                }

                                // 0x80 = special case for 0
                                P2pPacketType packetType =
                                    frame[0] == 0x80 ? HelloPacket : static_cast<P2pPacketType>(frame[0]);
                                if (packetType != HelloPacket)
                                {
                                    LOG(m_logger)
                                        << "Invalid packet type. Expected: "
                                        << p2pPacketTypeToString(HelloPacket)
                                        << ", received: " << p2pPacketTypeToString(packetType);
                                    m_failureReason = HandshakeFailureReason::ProtocolError;
                                    if (packetType == DisconnectPacket)
                                    {
                                        try
                                        {
                                            // Explicitly avoid RLP::FailIfTooLarge exception if RLP
                                            // data is smaller than bytes buffer since msg data is
                                            // significantly smaller than buffer size
                                            RLP rlp{frame.cropped(1),
                                                RLP::ThrowOnFail | RLP::FailIfTooSmall};
                                            if (rlp)
                                            {
                                                auto const reason = static_cast<DisconnectReason>(
                                                    rlp[0].toInt<int>());
                                                LOG(m_logger)
                                                    << "Disconnect reason: " << reasonOf(reason);

                                                // We set a failure reason of DisconnectRequested
                                                // since it's not a violation of the protocol (a
                                                // disconnect packet is allowed at any moment) so we
                                                // will try to reconnect later.
                                                m_failureReason =
                                                    HandshakeFailureReason::DisconnectRequested;
                                            }
                                        }
                                        catch (std::exception const& _e)
                                        {
                                            LOG(m_errorLogger)
                                                << "Exception occurred while decoding RLP msg data "
                                                   "in "
                                                << p2pPacketTypeToString(DisconnectPacket) << ": "
                                                << _e.what();
                                        }
                                    }
                                    m_nextState = Error;
                                    transition();
                                    return;
                                }

                                LOG(m_logger) << p2pPacketTypeToString(HelloPacket)
                                              << " verified. Starting session with";
                                try
                                {
                                    RLP rlp(
                                        frame.cropped(1), RLP::ThrowOnFail | RLP::FailIfTooSmall);
                                    m_host->startPeerSession(m_remote, rlp, move(m_io), m_socket);
                                }
                                catch (std::exception const& _e)
                                {
                                    LOG(m_errorLogger)
                                        << "Handshake causing an exception: " << _e.what();
                                    m_failureReason = HandshakeFailureReason::UnknownFailure;
                                    m_nextState = Error;
                                    transition();
                                }
                            }
                        });
                }
            });
    }
}

流程如下图:

2

连接节点数量

以太坊Kad协议一直维护着可用的节点列表,而以太坊并不与列表中的所有节点连接,而是如下定义:

cpp
unsigned peerSlots(PeerSlotType _type) { return _type == Egress ? m_idealPeerCount : m_idealPeerCount * m_stretchPeers; }

unsigned m_idealPeerCount = 11;          ///< Ideal number of peers to be connected to.
unsigned m_stretchPeers = 7;             ///< Accepted connection multiplier (max peers = ideal*stretch).

意味着最多有77个节点可以尝试和我们连接,但是我们最终只与11个节点成功建立连接。

NodeTable的构造函数

构造函数的函数体如下:

cpp
{
  	/// 对桶进行初始化
    /// 第i个桶存放了i距离的节点
    for (unsigned i = 0; i < s_bins; i++)
        m_buckets[i].distance = i;

    if (!_enabled)
    {
        cwarn << "\"_enabled\" parameter is false, discovery is disabled";
        return;
    }

    try
    {
        m_socket->connect();
        doDiscovery();
        doHandleTimeouts();
        doEndpointTracking();
    }
    catch (exception const& _e)
    {
        cwarn << "Exception connecting NodeTable socket: " << _e.what();
        cwarn << "Discovery disabled.";
    }
}

重点看其中的try:

cpp
				m_socket->connect(); // 绑定UDP端口
        doDiscovery();			 // 启动发现服务
        doHandleTimeouts();	 // 
        doEndpointTracking();//

doDiscovery()

cpp
void NodeTable::doDiscovery()
{
  	// 设置一个7200ms的定时器
    m_discoveryTimer->expires_after(c_bucketRefreshMs);
    auto discoveryTimer{m_discoveryTimer};
  	// 每次定时器触发都执行如下函数发现节点
    m_discoveryTimer->async_wait([this, discoveryTimer](boost::system::error_code const& _ec) {
        // We can't use m_logger if an error occurred because captured this might be already
        // destroyed
        if (_ec.value() == boost::asio::error::operation_aborted ||
            discoveryTimer->expiry() == c_steadyClockMin)
        {
            clog(VerbosityDebug, "discov") << "Discovery timer was cancelled";
            return;
        }
        else if (_ec)
        {
            clog(VerbosityDebug, "discov")
                << "Discovery timer error detected: " << _ec.value() << " " << _ec.message();
            return;
        }

        // 随机选择一个nodeid,进行发现
        NodeID randNodeId;
        crypto::Nonce::get().ref().copyTo(randNodeId.ref().cropped(0, h256::size));
        crypto::Nonce::get().ref().copyTo(randNodeId.ref().cropped(h256::size, h256::size));
        LOG(m_logger) << "Starting discovery algorithm run for random node id: " << randNodeId;
        doDiscoveryRound(randNodeId, 0 /* round */, make_shared<set<shared_ptr<NodeEntry>>>());
    });
}

我方主动发起连接请求

上面提到的⭐️⭐️⭐️⭐️:run(boost::system::error_code())函数中启动了NodeTable的事件监听器,可以看到只有NodeEntryAdded和NodeEntryDropped两种事件:

cpp
void Host::onNodeTableEvent(NodeID const& _n, NodeTableEventType const& _e)
{
    // 节点入桶事件
    if (_e == NodeEntryAdded)
    {
        LOG(m_logger) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n;
        if (Node n = nodeFromNodeTable(_n))
        {
            shared_ptr<Peer> p;
            DEV_RECURSIVE_GUARDED(x_sessions)
            {
                // 如果m_peers中已经有这个节点了,那么就直接从m_peers拿这个节点
                if (m_peers.count(_n))
                {
                    p = m_peers[_n];
                    p->endpoint = n.endpoint;
                }
                else
                { // 如果m_peers中没有这个节点,就把这个节点存在m_peers中
                    p = make_shared<Peer>(n);
                    m_peers[_n] = p;
                    LOG(m_logger) << "p2p.host.peers.events.peerAdded " << _n << " " << p->endpoint;
                }
            }
            // 如果还可以连接新的节点,那就连接他
            if (peerSlotsAvailable(Egress))
                connect(p);
        }
    }
    // 节点从桶中删除事件
    else if (_e == NodeEntryDropped)
    {
        LOG(m_logger) << "p2p.host.nodeTable.events.NodeEntryDropped " << _n;
        RecursiveGuard l(x_sessions);
        if (m_peers.count(_n) && m_peers[_n]->peerType == PeerType::Optional)
            m_peers.erase(_n);
    }
}

主动连接时是这个函数,可以看到仍然是RLPX握手:

cpp
void Host::connect(shared_ptr<Peer> const& _p)
{
    if (!m_run)
    {
        cwarn << "Network not running so cannot connect to peer " << _p->id << "@" << _p->address();
        return;
    }

    if (!haveCapabilities())
    {
        cwarn << "No capabilities registered so cannot connect to peer " << _p->id << "@" << _p->address();
        return;
    }

    if (isHandshaking(_p->id))
    {
        cwarn << "Aborted connection. RLPX handshake with peer already in progress: " << _p->id
              << "@" << _p->endpoint;
        return;
    }

    if (havePeerSession(_p->id))
    {
        cnetdetails << "Aborted connection. Peer already connected: " << _p->id << "@"
                    << _p->endpoint;
        return;
    }

    if (!nodeTableHasNode(_p->id) && _p->peerType == PeerType::Optional)
        return;

    // prevent concurrently connecting to a node
    Peer *nptr = _p.get();
    if (m_pendingPeerConns.count(nptr))
        return;
    m_pendingPeerConns.insert(nptr);

    _p->m_lastAttempted = chrono::system_clock::now();
    
    bi::tcp::endpoint ep(_p->endpoint);
    cnetdetails << "Attempting connection to " << _p->id << "@" << ep << " from " << id();
    auto socket = make_shared<RLPXSocket>(bi::tcp::socket{m_ioContext});
    socket->ref().async_connect(ep, [=](boost::system::error_code const& ec)
    {
        _p->m_lastAttempted = chrono::system_clock::now();
        _p->m_failedAttempts++;
        
        if (ec)
        {
            cnetdetails << "Connection refused to node " << _p->id << "@" << ep << " ("
                        << ec.message() << ")";
            // Manually set error (session not present)
            _p->m_lastDisconnect = TCPError;
        }
        else
        {
            cnetdetails << "Starting RLPX handshake with " << _p->id << "@" << ep;
            auto handshake = make_shared<RLPXHandshake>(this, socket, _p->id);
            {
                Guard l(x_connecting);
                m_connecting.push_back(handshake);
            }

            handshake->start();
        }
        
        m_pendingPeerConns.erase(nptr);
    });
}

最后,我们看下Host类的一些私有变量:

cpp
    std::set<Peer*> m_pendingPeerConns;									/// Used only by connect(Peer&) to limit concurrently connecting to same node. See connect(shared_ptr<Peer>const&).

    bi::tcp::endpoint m_tcpPublic;											///< Our public listening endpoint.
    /// Alias for network communication.
    KeyPair m_alias;
    /// Host's Ethereum Node Record restored from network.rlp
    ENR const m_restoredENR;
    std::shared_ptr<NodeTable> m_nodeTable;									///< Node table (uses kademlia-like discovery).
    mutable std::mutex x_nodeTable;
    std::shared_ptr<NodeTable> nodeTable() const { Guard l(x_nodeTable); return m_nodeTable; }

    /// Shared storage of Peer objects. Peers are created or destroyed on demand by the Host. Active sessions maintain a shared_ptr to a Peer;
    std::unordered_map<NodeID, std::shared_ptr<Peer>> m_peers;
    
    /// Peers we try to connect regardless of p2p network.
    std::set<NodeID> m_requiredPeers;
    mutable Mutex x_requiredPeers;

    /// The nodes to which we are currently connected. Used by host to service peer requests and keepAlivePeers and for shutdown. (see run())
    /// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method.
    mutable std::unordered_map<NodeID, std::weak_ptr<SessionFace>> m_sessions;
    mutable RecursiveMutex x_sessions;

    /// Pending connections. Completed handshakes are garbage-collected in run() (a handshake is
    /// complete when there are no more shared_ptrs in handlers)
    std::list<std::weak_ptr<RLPXHandshake>> m_connecting;
    mutable Mutex x_connecting;													///< Mutex for m_connecting.

    unsigned m_idealPeerCount = 11;										///< Ideal number of peers to be connected to.
    unsigned m_stretchPeers = 7;										///< Accepted connection multiplier (max peers = ideal*stretch).

    /// Each of the capabilities we support. CapabilityRuntime is used to run a capability's
    /// background work loop
    std::map<CapDesc, CapabilityRuntime> m_capabilities;

    std::chrono::steady_clock::time_point m_lastPing;						///< Time we sent the last ping to all peers.

其中有几个存储节点的变量有必要分析一下。

  • std::set<Peer*> m_pendingPeerConns:存储那些将要发起连接,但还未连接成功的节点。用于防止对同一节点重复发起连接。
  • std::shared_ptr<NodeTable> m_nodeTable:Node table (uses kademlia-like discovery).
  • std::unordered_map<NodeID, std::shared_ptr<Peer>> m_peers;:认为已经连接的节点列表,Peer中含有session。这里会有一些掉线的节点,尝试重连。
  • std::set<NodeID> m_requiredPeers;:无论 p2p 网络如何,我们都会尝试连接的节点。这里是用户手动添加的节点。
  • mutable std::unordered_map<NodeID, std::weak_ptr<SessionFace>> m_sessions;:连接成功的节点Session,这与m_peers中存储的是相同的,但是这里存储的session全都是连接正常的,一旦session断开,就会从这里清除。
  • std::list<std::weak_ptr<RLPXHandshake>> m_connecting;:正在连接的节点,系统会在握手之前将节点写入此列表。

BootNodes

博客以太坊的P2P节点入网 - 小木槌给出了Geth的BootNodes。

Aleth的BootNodes位于文件libp2p/Common.cpp中,

3