1. 程式人生 > >以太坊原始碼分析之 P2P網路(二、節點發現流程)

以太坊原始碼分析之 P2P網路(二、節點發現流程)

區塊鏈特輯 :https://blog.csdn.net/fusan2004/article/details/80879343,歡迎查閱,原創作品,轉載請標明!

上一篇文章簡單介紹了下一些基礎的型別定義,從這一篇開始我們將描述p2p網路的更多細節。從關於節點的定義來看,其實不同定義是有不同含義的,Node代表的是一個孤立的節點,這個節點不代表我們和他會建立連線,而Peer是肯定會去連線的,但是不代表一定會建立出連線,只有建立連線以後才會生成session,在session上才進行了以太坊的資料的交換。

對於瞭解p2p系統的人來說,肯定對區塊鏈p2p底層有一種疑惑,為什麼呢?因為在中心化的p2p網路中,會有一個server用來蒐集peer資訊,這樣在資料互動過程中,每個peer一般情況下是先通過這個server拿到一定數量的peer列表,然後挨個去建立連線,最後進行資料互動。但眾所周知的是,區塊鏈是一個去中心化的系統,這種server的存在將會徹底破壞區塊鏈可信任的基礎,那麼以太坊是如何解決節點獲取問題的呢?答案就是Kademlia演算法,這是一種分散式儲存及路由的演算法,能夠保證經過最多n步後找到需要的資料,具體的演算法可以參考 https://www.jianshu.com/p/f2c31e632f1d 這篇文章,比較通俗易懂。

在這裡我們更加關注以太坊中關於節點發現的實現,這部分的邏輯都是在NodeTable中,我們先來看下NodeTable類的成員變數和函式,然後再根據程式碼邏輯詳細說明整個流程,後續如果有時間我會再補個流程圖。

NodeTable類

// NodeTable類負責以太坊p2p網路底層節點發現的所有管理
// 節點發現是通過udp來完成,因此這裡繼承了UDPSocketEvents,來響應一些事件
class NodeTable: UDPSocketEvents, public std::enable_shared_from_this<NodeTable>
{
    friend std::ostream& operator<<(std::ostream& _out, NodeTable const& _nodeTable);
    using NodeSocket = UDPSocket<NodeTable, 1280>;              // UDPSocket,這是在UDP.h中定義,1280表示的是最大資料報大小
    using TimePoint = std::chrono::steady_clock::time_point;    // < Steady time point.
    using NodeIdTimePoint = std::pair<NodeID, TimePoint>;
    struct EvictionTimeout                                      // 用於記錄淘汰的節點的timepoint,以及用於替代他的新節點id
    { 
        NodeID newNodeID;
        TimePoint evictedTimePoint;
    };

public:
    enum NodeRelation { Unknown = 0, Known };   // 判斷節點的關係,在部分函式引數中需要
    enum DiscoverType { Random = 0 }; 
    NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint const& _endpoint, bool _enabled = true);   //建構函式需要一個用於io的host,證書以及要監聽的ip地址和埠
    ~NodeTable();
    //返回兩個nodeid基於異或計算的距離,這就是NodeEntry中的distance,也是判斷兩個節點邏輯上“距離”的計算方法,可不用關注細節
    static int distance(NodeID const& _a, NodeID const& _b) { u256 d = sha3(_a) ^ sha3(_b); unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
    void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEventHandler.reset(_handler); }   //為NodeEntryAdded和NodeEntryDropped事件設定事件控制代碼,實際上這兩個事件都會在上層被處理,這裡暫不關注
    void processEvents();                      // 這個函式也是在上層被呼叫的,這樣上層就可以來處理setEventHandler設定的事件了
    std::shared_ptr<NodeEntry> addNode(Node const& _node, NodeRelation _relation = NodeRelation::Unknown);  //新增節點,這部分內容較多,會在後面流程介紹細說
    std::list<NodeID> nodes() const;           // 返回node table中活躍的node id的列表
    unsigned count() const { return m_nodes.size(); }  // 返回節點數量
    std::list<NodeEntry> snapshot() const;             //返回節點快照,這裡可以發現關注的都是NodeEntry,這是因為node table需要關心distance
    bool haveNode(NodeID const& _id) { Guard l(x_nodes); return m_nodes.count(_id) > 0; }  // 判斷節點是否已經存在
    Node node(NodeID const& _id);              // 返回該node id對應的node,如果不存在返回空節點
    // 下面就是Kademlia演算法需要配置的一些常量
    static unsigned const s_addressByteSize = h256::size;                   // < Size of address type in bytes. 32位
    static unsigned const s_bits = 8 * s_addressByteSize;                   // < Denoted by n in [Kademlia].256個bit
    static unsigned const s_bins = s_bits - 1;                              // < Size of m_state (excludes root, which is us). 255個槽位
    static unsigned const s_maxSteps = boost::static_log2<s_bits>::value;   // < Max iterations of discovery. (discover), discovery的最大迭代次數,n取log
    // 可選的引數
    static unsigned const s_bucketSize = 16;            // < Denoted by k in [Kademlia]. Number of nodes stored in each bucket. 每一個bucket儲存的node數
    static unsigned const s_alpha = 3;                  // < Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. findNode請求的併發數
    // 一些定時器間隔
    std::chrono::milliseconds const c_evictionCheckInterval = std::chrono::milliseconds(75);      // 淘汰超時檢測的間隔
    std::chrono::milliseconds const c_reqTimeout = std::chrono::milliseconds(300);                // 每個請求的等待時間
    std::chrono::milliseconds const c_bucketRefresh = std::chrono::milliseconds(7200);            // 更新bucket的時間,避免node資料變得老舊
    struct NodeBucket   //槽位,每個不同的distance都會包含若干個節點,最多不超過上面的s_bucketSize,也就是16個
    {
        unsigned distance;
        std::list<std::weak_ptr<NodeEntry>> nodes;
    };
    void ping(NodeIPEndpoint _to) const;     // ping, 連線某個端點
    void ping(NodeEntry* _n) const;          // 用來ping已知節點,這是node table在更新buckets或者淘汰過程中呼叫
    NodeEntry center() const { return NodeEntry(m_node.id, m_node.publicKey(), m_node.endpoint); }
    std::shared_ptr<NodeEntry> nodeEntry(NodeID _id);
    void doDiscover(NodeID _target, unsigned _round = 0, std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>> _tried =  std::shared_ptr<std::set<std::shared_ptr<NodeEntry>>>());    // 用於發現給定目標距離近的節點
    std::vector<std::shared_ptr<NodeEntry>> nearestNodeEntries(NodeID _target);          //返回距離target最近的節點列表
    void evict(std::shared_ptr<NodeEntry> _leastSeen, std::shared_ptr<NodeEntry> _new);  // 非同步丟棄不響應的_leastSeen節點,並新增_new節點,否則丟棄_new
    void noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint);        //為了維持節點table,無論何時從一個節點獲取到activity,都會呼叫這個noteActiveNode
    void dropNode(std::shared_ptr<NodeEntry> _n);     //當超時出現後,呼叫
    NodeBucket& bucket_UNSAFE(NodeEntry const* _n);   //這是返回bucket的引用,後面可以看到,這是唯一新增node到bucket的入口
    void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet); //當m_socket收到資料包,呼叫該函式,這是繼承的UDPSocketEvents裡函式
    void onDisconnected(UDPSocketFace*) {}            //當socket埠後呼叫,也是繼承的UDPSocketEvents裡函式
    void doCheckEvictions();                          // 被evict呼叫確認淘汰檢查被排程,並且在沒有淘汰剩餘時停止,非同步操作
    void doDiscovery();                               // 在c_bucketRefresh間隔內查詢隨機node
    std::unique_ptr<NodeTableEventHandler> m_nodeEventHandler;      // < Event handler for node events. node事件的事件控制代碼
    Node m_node;                                                    // < This node. LOCK x_state if endpoint access or mutation is required. Do not modify id. 當前自己這個節點
    Secret m_secret;                                                // < This nodes secret key. 當前節點的私鑰
    mutable Mutex x_nodes;                                          // < LOCK x_state first if both locks are required. Mutable for thread-safe copy in nodes() const.
    std::unordered_map<NodeID, std::shared_ptr<NodeEntry>> m_nodes; // 已知的節點endpoints,m_nodes記錄的是建立過連線的node資訊
    mutable Mutex x_state;                                          // < LOCK x_state first if both x_nodes and x_state locks are required.
    std::array<NodeBucket, s_bins> m_state;                         // p2p節點網路的狀態, m_state是記錄了不同bucket的節點,不代表就能連上,在noteActiveNode這個函式中新增
    Mutex x_evictions;                                              // < LOCK x_evictions first if both x_nodes and x_evictions locks are required.
    std::unordered_map<NodeID, EvictionTimeout> m_evictions;        // < Eviction timeouts. 
    Mutex x_pubkDiscoverPings;                                      // < LOCK x_nodes first if both x_nodes and x_pubkDiscoverPings locks are required.
    std::unordered_map<bi::address, TimePoint> m_pubkDiscoverPings; // 由於可能不知道pubk,有些節點會有一個獲取pubk的流程,當處於這個流程時,節點資訊儲存在這裡
    Mutex x_findNodeTimeout;
    std::list<NodeIdTimePoint> m_findNodeTimeout;                   // FindNode請求超時
    std::shared_ptr<NodeSocket> m_socket;                           // < Shared pointer for our UDPSocket; ASIO requires shared_ptr.
    NodeSocket* m_socketPointer;                                    // < Set to m_socket.get(). Socket is created in constructor and disconnected in destructor to ensure access to pointer is safe.
    Logger m_logger{createLogger(VerbosityDebug, "discov")};
    DeadlineOps m_timers; ///< this should be the last member - it must be destroyed first
};

從上面的成員變數和成員函式的數量可以看出,整個NodeTable還是比較複雜的,這裡只需要關心不同成員變數對應的含義是什麼,不同成員函式的功能是什麼,大概瞭解清楚即可,下面我們將根據流程介紹詳細的程式碼。

首先看下NodeTable的建構函式,成員變數的賦值可暫時不考慮,看下函式內部細節,重點可以關注兩處,一個是m_socketPointer指標呼叫了connect函式,m_socketPointer是對UDPSocket一個包裝後的指標,connect函式裡面實際上是一個等待連線的操作,具體函式內容可以在UDP.h/cpp檔案檢視,這樣NodeTable就可以接受別的節點的連線請求了;其次是doDiscovery函式,這個函式裡面就是進行節點發現的操作。

NodeTable::NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint const& _endpoint, bool _enabled):
    m_node(Node(_alias.pub(), _endpoint)),
    m_secret(_alias.secret()),
    m_socket(make_shared<NodeSocket>(_io, *reinterpret_cast<UDPSocketEvents*>(this), (bi::udp::endpoint)m_node.endpoint)),
    m_socketPointer(m_socket.get()),
    m_timers(_io)
{
    for (unsigned i = 0; i < s_bins; i++)
        m_state[i].distance = i;  //這裡說明的是每個節點槽的個數,每個nodeid有32位,因此256個bit,對應256個槽位
    if (!_enabled)
        return;
    try
    {
        m_socketPointer->connect(); //開啟連線,這時候就可以接受外界發來的訊息了,m_socketPointer指定了回撥控制代碼就是NodeTable
        doDiscovery();  //節點發現
    }
    catch (std::exception const& _e)
    {
        cwarn << "Exception connecting NodeTable socket: " << _e.what();
        cwarn << "Discovery disabled.";
    }
}

下面就讓我們緊跟doDiscovery函式看看裡面幹了些啥。。

void NodeTable::doDiscovery()
{
    //定時器,為了避免bucket過於老舊,需要定時重新整理bucket,間隔時間為7200ms,這個定時器只會跑一次
    m_timers.schedule(c_bucketRefresh.count(), [this](boost::system::error_code const& _ec)
    {
        if (_ec)
            // we can't use m_logger here, because captured this might be already destroyed
            clog(VerbosityDebug, "discov")
                << "Discovery timer was probably cancelled: " << _ec.value() << " "
                << _ec.message();
        if (_ec.value() == boost::asio::error::operation_aborted || m_timers.isStopped())
            return;
        LOG(m_logger) << "performing random discovery";
        NodeID randNodeId;
        crypto::Nonce::get().ref().copyTo(randNodeId.ref().cropped(0, h256::size));
        crypto::Nonce::get().ref().copyTo(randNodeId.ref().cropped(h256::size, h256::size));
        doDiscover(randNodeId);
    });
}

從函式體中可以看到,這個函式啟動了一個定時任務,也就是定時完成節點bucket的更新操作,實際上,第一次啟動時,每個槽位裡面時沒有任何節點的,不過不影響更新過程,從定時器回撥函式可以看出重新整理的流程,首先時隨機從NodeId的值域空間選擇一個隨機的NodeID(randNodeId),然後根據這個id呼叫了doDiscover函式,繼續看doDiscover(randNodeId)幹了啥。。

void NodeTable::doDiscover(NodeID _node, unsigned _round, shared_ptr<set<shared_ptr<NodeEntry>>> _tried)
{
    // NOTE: ONLY called by doDiscovery!
    
    if (!m_socketPointer->isOpen())  //如果監聽的socket已經掛了,沒有繼續的意義了
        return;    
    if (_round == s_maxSteps)  //已經跑到了最大的輪數,停止
    {
        LOG(m_logger) << "Terminating discover after " << _round << " rounds.";
        doDiscovery();  //這裡面其實是又註冊了重新整理節點的定時器任務
        return;
    }
    else if (!_round && !_tried)  //這是表示第一次這個函式被呼叫
        // initialized _tried on first round
        _tried = make_shared<set<shared_ptr<NodeEntry>>>();  
    
    auto nearest = nearestNodeEntries(_node);  //獲取距離_node距離近的節點資訊
    list<shared_ptr<NodeEntry>> tried; //每次記錄嘗試獲取的節點list
    for (unsigned i = 0; i < nearest.size() && tried.size() < s_alpha; i++)  //每次請求數目為s_alpha個
        if (!_tried->count(nearest[i]))  //如果這個節點已經探測過,跳過
        {
            auto r = nearest[i];
            tried.push_back(r); //新增到tried,注意和_tried不一樣
            FindNode p(r->endpoint, _node);
            p.sign(m_secret);
            DEV_GUARDED(x_findNodeTimeout)
                m_findNodeTimeout.push_back(make_pair(r->id, chrono::steady_clock::now()));  //記錄findNode可能超時的list
            m_socketPointer->send(p); //傳送findnode資料包
        }
    
    if (tried.empty()) //如果沒有可以連線的最近節點,重新生成隨機節點探測,退出當前discover流程
    {
        LOG(m_logger) << "Terminating discover after " << _round << " rounds.";
        doDiscovery();
        return;
    }
        
    while (!tried.empty()) //這個裡面很奇怪,為啥不在新增到tried的時候直接新增給_tried ?
    {
        _tried->insert(tried.front());  //新增到_tried裡面
        tried.pop_front();
    }

    //定時檢查請求是否超時,間隔時間600ms
    m_timers.schedule(c_reqTimeout.count() * 2, [this, _node, _round, _tried](boost::system::error_code const& _ec)
    {
        if (_ec)
            // we can't use m_logger here, because captured this might be already destroyed
            clog(VerbosityDebug, "discov")
                << "Discovery timer was probably cancelled: " << _ec.value() << " "
                << _ec.message();

        if (_ec.value() == boost::asio::error::operation_aborted || m_timers.isStopped())
            return;

        // error::operation_aborted means that the timer was probably aborted. 
        // It usually happens when "this" object is deallocated, in which case 
        // subsequent call to doDiscover() would cause a crash. We can not rely on 
        // m_timers.isStopped(), because "this" pointer was captured by the lambda,
        // and therefore, in case of deallocation m_timers object no longer exists.

        doDiscover(_node, _round + 1, _tried);  //進行下一次迴圈discover
    });
}

這個函式開始有點複雜了,我們看到在doDiscover實際上包含了3個引數,而在doDiscovery函式中呼叫時只使用了一個隨機nodeid, 看標頭檔案我們看到了其他兩個引數有預設引數,通過函式過程說明我們知道,這個函式是一個遞迴呼叫,其實就是幹了這麼一件事,把這個隨機節點的所有鄰近節點全部獲取到,其中_round是表示遞迴的深度,最多呼叫這麼多次,_tried是表示我們嘗試的連線的節點集合,主要是為了去重用的,從中大概瞭解節點發現的大致流程:

  • 定時器開啟,定時執行重新整理過程,選擇一個隨機節點,然後呼叫doDiscover函式
  • 獲取這個節點的鄰近節點集合,然後根據配置,每次最多同時連線s_alpha個節點
  • 再次設定定時器,進行下一輪的discover過程
  • 停止discover的條件,一是這個隨機節點已經沒有其他鄰近的節點了,二是超過了設定的最大discover次數,無論是哪種情況都會停止本輪discover過程,再次呼叫doDiscovery生成一個隨機id進行下一次的節點發現過程

事情進行到這裡,似乎節點發現就結束了?實際上不是,從上述函式中還有兩個地方值得注意,一個是獲取到鄰近節點時,傳送的不是ping包,而是findNode包,這樣這個節點返回的時候返回的是該節點所擁有的節點資訊,這部分資訊是會和本地儲存的節點資訊進行一些整合的,這也是為什麼每次呼叫nearestNodeEntries返回結果不同的原因,另一個值得注意的是可以看到每兩輪doDiscover中間的時間間隔是reqTimeout * 2,這也是為了有足夠的時間讓findNode資料返回,也是為了呼叫nearestNodeEntries得到儘可能多的資訊。這時候,大家可能好奇nearestNodeEntries這個函式裡面到底是如何去獲取到最鄰近的節點資訊的,繼續看。。。

vector<shared_ptr<NodeEntry>> NodeTable::nearestNodeEntries(NodeID _target)
{
    // send s_alpha FindNode packets to nodes we know, closest to target
    // 傳送s_alpha個FindNode包給我們知道最接近target的節點
    static unsigned lastBin = s_bins - 1;   //最後一個bucket編號
    unsigned head = distance(m_node.id, _target);  //當前節點與target的邏輯異或距離
    unsigned tail = head == 0 ? lastBin : (head - 1) % s_bins; //環形,前一個
    
    map<unsigned, list<shared_ptr<NodeEntry>>> found;  //目前已發現的
    
    // if d is 0, then we roll look forward, if last, we reverse, else, spread from d
    if (head > 1 && tail != lastBin)
        while (head != tail && head < s_bins)
        {
            Guard l(x_state);
            for (auto const& n: m_state[head].nodes)
                if (auto p = n.lock())
                    found[distance(_target, p->id)].push_back(p);

            if (tail)
                for (auto const& n: m_state[tail].nodes)
                    if (auto p = n.lock())
                        found[distance(_target, p->id)].push_back(p);

            head++;
            if (tail)  //tail到0就不繼續了,後面由head處理,直到head到達隊尾,如果head先到,結束
                tail--;
        }
    else if (head < 2) //head == 0 or head == 1,head==0表明_target表明就是m_node自己,head == 1表明tail就是m_node.id自己,不需要新增
        while (head < s_bins)
        {
            Guard l(x_state);
            for (auto const& n: m_state[head].nodes)
                if (auto p = n.lock())
                    found[distance(_target, p->id)].push_back(p);
            head++;
        }
    else // head >=2 && tail == lastBin
        while (tail > 0)
        {
            Guard l(x_state);
            for (auto const& n: m_state[tail].nodes)
                if (auto p = n.lock())
                    found[distance(_target, p->id)].push_back(p);
            tail--;
        }
    
    vector<shared_ptr<NodeEntry>> ret;
    //每次最多返回s_bucketSize個node
    for (auto& nodes: found)
        for (auto const& n: nodes.second)
            if (ret.size() < s_bucketSize && !!n->endpoint && n->endpoint.isAllowed())
                ret.push_back(n);
    return ret;
}

這個函式的最主要語義是,先計算出隨機節點與本節點的距離,這樣才知道隨機節點在本節點儲存的節點的槽位,然後從該槽位兩側同時遍歷,這樣才能找到距離_target最近的節點,在本地的節點table全部遍歷完以後,選出其中s_bucketSize返回。

前面介紹了節點發現的主要流程,後面我們再回頭看下,當我們給最近的節點發送findNode資料包後,如果那些節點回復了,會是怎麼個流程,在NodeTable的建構函式中給UDPSocket註冊了event事件,也就是NodeTable本身,因此udp socket接受訊息後都會呼叫NodeTable的OnReceived函式,如下。。

//socket的訊息接受
void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet)
{
    try {
        //解析udp包
        unique_ptr<DiscoveryDatagram> packet = DiscoveryDatagram::interpretUDP(_from, _packet);
        if (!packet)
            return;
        if (packet->isExpired())  //判斷該包是否過期,所有資料包會攜帶一個ts,必須大於當前時間
        {
            LOG(m_logger) << "Invalid packet (timestamp in the past) from "
                          << _from.address().to_string() << ":" << _from.port();
            return;
        }
        
        //根據不同包型別,進行相應處理
        switch (packet->packetType())
        {
            case Pong::type:  //Ping訊息的迴應
            {
                auto in = dynamic_cast<Pong const&>(*packet);
                // whenever a pong is received, check if it's in m_evictions
                // 檢查該節點是否在m_evictions裡面
                bool found = false;
                NodeID leastSeenID;
                EvictionTimeout evictionEntry;   //淘汰超時的節點,這部分後面需要回來看???
                DEV_GUARDED(x_evictions)
                { 
                    auto e = m_evictions.find(in.sourceid);
                    if (e != m_evictions.end())
                    { 
                        if (e->second.evictedTimePoint > std::chrono::steady_clock::now())
                        {
                            found = true;
                            leastSeenID = e->first;
                            evictionEntry = e->second;
                            m_evictions.erase(e);  //從m_evictions中移除
                        }
                    }
                }

                if (found)
                {
                    if (auto n = nodeEntry(evictionEntry.newNodeID))
                        dropNode(n); //如果新節點也儲存在m_nodes中,刪除
                    if (auto n = nodeEntry(leastSeenID))
                        n->pending = false;
                }
                else
                {
                    // if not, check if it's known/pending or a pubk discovery ping
                    // 如果不再m_evictions裡面,觀察是否是know/pending,或者是一個pubk 發現ping
                    if (auto n = nodeEntry(in.sourceid))
                        n->pending = false;
                    else
                    {
                        DEV_GUARDED(x_pubkDiscoverPings)
                        {
                            if (!m_pubkDiscoverPings.count(_from.address()))  //如果不明pong,直接返回
                                return; // unsolicited pong; don't note node as active
                            m_pubkDiscoverPings.erase(_from.address());
                        }
                        if (!haveNode(in.sourceid))   //如果不在m_nodes中,新增node
                            addNode(Node(in.sourceid, NodeIPEndpoint(_from.address(), _from.port(), _from.port())));
                    }
                }
                
                // update our endpoint address and UDP port
                // 更新我們自己的地址和udp埠,難道每一個pong都要更新下?很浪費
                DEV_GUARDED(x_nodes)
                {
                    if ((!m_node.endpoint || !m_node.endpoint.isAllowed()) && isPublicAddress(in.destination.address))
                        m_node.endpoint.address = in.destination.address;
                    m_node.endpoint.udpPort = in.destination.udpPort;
                }

                LOG(m_logger) << "PONG from " << in.sourceid << " " << _from;
                break;
            }
                
            case Neighbours::type:  //獲取鄰居節點
            {
                auto in = dynamic_cast<Neighbours const&>(*packet);
                bool expected = false;  //標記是否向某個節點發送過findnode包
                auto now = chrono::steady_clock::now();
                DEV_GUARDED(x_findNodeTimeout)
                    //呼叫了std::list<>的remove_if,刪除list中所有滿足條件的元素
                    m_findNodeTimeout.remove_if([&](NodeIdTimePoint const& t)
                    {
                        if (t.first == in.sourceid && now - t.second < c_reqTimeout)
                            expected = true;
                        else if (t.first == in.sourceid)
                            return true; //只要是當前包傳送的節點,就移除
                        return false;
                    });
                if (!expected)    //如果未發現向某個節點發送findnode,要麼是誤收要麼該node已經被刪除
                {
                    cnetdetails << "Dropping unsolicited neighbours packet from "
                                << _from.address();
                    break;
                }

                for (auto n: in.neighbours)
                    addNode(Node(n.node, n.endpoint));  //挨個新增到自己的m_nodes中
                break;
            }

            case FindNode::type: //接收到findNode資料包,返回請求的target的相近節點
            {
                auto in = dynamic_cast<FindNode const&>(*packet);
                vector<shared_ptr<NodeEntry>> nearest = nearestNodeEntries(in.target);  //找到當前節點儲存的與target最相近的節點列表
                static unsigned const nlimit = (m_socketPointer->maxDatagramSize - 109) / 90;  //傳送數量限制,傳送的節點數量
                for (unsigned offset = 0; offset < nearest.size(); offset += nlimit)
                {
                    Neighbours out(_from, nearest, offset, nlimit); 
                    out.sign(m_secret);
                    if (out.data.size() > 1280)
                        cnetlog << "Sending truncated datagram, size: " << out.data.size();
                    m_socketPointer->send(out);
                }
                break;
            }

            case PingNode::type: //接收到ping資料,這是有節點過來建立連線
            {
                auto in = dynamic_cast<PingNode const&>(*packet);
                in.source.address = _from.address();
                in.source.udpPort = _from.port();
                addNode(Node(in.sourceid, in.source));  //新增節點
                
                Pong p(in.source);
                p.echo = in.echo;
                p.sign(m_secret);
                m_socketPointer->send(p); //返回pong資料包
                break;
            }
        }

        noteActiveNode(packet->sourceid, _from);  //標記活躍的節點
    }
    catch (std::exception const& _e)
    {
        LOG(m_logger) << "Exception processing message from " << _from.address().to_string() << ":"
                      << _from.port() << ": " << _e.what();
    }
    catch (...)
    {
        LOG(m_logger) << "Exception processing message from " << _from.address().to_string() << ":"
                      << _from.port();
    }
}

從這個函式可以看到,節點發現裡面一共由四種資料包型別,分別是Ping, Pong, FindNode, Neighbours四種,這裡我們先只看下discover中發起的FindNode請求,從函式中,當其他節點收到FindNode資料包後,首先去找出待查詢的_target節點id,然後根據這個_target在自己的節點table中找出這個自己儲存的與_target相近的節點資料,並封裝成 Neighbours 資料包返回給當前節點,具體的解析Neighbours的解析我們後面再來分析;這個函式還幹了一件事情,因為當節點收到了其他節點發來的訊息時,無論是其他節點主動發來訊息還是被動回覆訊息,都說明這個節點當前是活躍狀態,因此需要呼叫noteActiveNode函式來標識一下,繼續來看下這個函式。。。

void NodeTable::noteActiveNode(Public const& _pubk, bi::udp::endpoint const& _endpoint)
{
    if (_pubk == m_node.address() || !NodeIPEndpoint(_endpoint.address(), _endpoint.port(), _endpoint.port()).isAllowed())
        return; // 這裡判斷了下id是否正確以及地址是否可用

    shared_ptr<NodeEntry> newNode = nodeEntry(_pubk);  //查詢該id對應的nodeentry,這是在所有已經有連線關係的nodes中查詢的
    if (newNode && !newNode->pending) //如果該node正常執行,這裡面有個疑問?為何只處理在nodes中找到的,如果沒有找到,不更說明應該新增到nodes裡面去麼?
    {
        LOG(m_logger) << "Noting active node: " << _pubk << " " << _endpoint.address().to_string()
                      << ":" << _endpoint.port();
        newNode->endpoint.address = _endpoint.address();
        newNode->endpoint.udpPort = _endpoint.port();   //更新下,可能node的地址資訊會變化

        shared_ptr<NodeEntry> nodeToEvict; //有活躍的,必然將會有被淘汰的
        {
            Guard l(x_state);
            // Find a bucket to put a node to
            NodeBucket& s = bucket_UNSAFE(newNode.get());  //獲取該節點所在的bucket
            auto& nodes = s.nodes; //該bucket的所有node list

            // check if the node is already in the bucket 
            // 檢視該節點是否早已存在於該bucket
            auto it = std::find(nodes.begin(), nodes.end(), newNode);
            if (it != nodes.end())
            {
                // if it was in the bucket, move it to the last position
                // 如果已經存在,移動到最後的位置,表示最新
                nodes.splice(nodes.end(), nodes, it);
            }
            else
            {
                if (nodes.size() < s_bucketSize) //如果這個bucket還沒有超過最大限制
                {
                    // if it was not there, just add it as a most recently seen node
                    // (i.e. to the end of the list)
                    // 新增到該bucket中
                    nodes.push_back(newNode);
                    if (m_nodeEventHandler)
                        m_nodeEventHandler->appendEvent(newNode->id, NodeEntryAdded); //並向上層註冊節點新增事件
                }
                else
                {
                    // if bucket is full, start eviction process for the least recently seen node
                    // 如果bucket已滿,那麼為最少看見的node啟動淘汰程序,最前面的表示最少被看見
                    nodeToEvict = nodes.front().lock(); //獲取到需要被淘汰的node的shared_ptr
                    // It could have been replaced in addNode(), then weak_ptr is expired.
                    // If so, just add a new one instead of expired
                    if (!nodeToEvict) //如果在addNode的時候已經被刪除了,那麼直接新增
                    {
                        nodes.pop_front();
                        nodes.push_back(newNode);
                        if (m_nodeEventHandler)
                            m_nodeEventHandler->appendEvent(newNode->id, NodeEntryAdded);  //註冊節點新增事件
                    }
                }
            }
        }

        if (nodeToEvict) //如果有需要被刪除的節點
            evict(nodeToEvict, newNode); //如果有應該被刪除的節點,呼叫evict
    }
}

從上面函式中,我們可以看到這個函式就是看看需不需要以及能不能把這個活躍節點放到m_state中,也就是對應的bucket裡面去,另外如果能夠且是把一些老節點給踢出來了話,被拋棄的老節點就會走淘汰流程,這個流程將會在evict函式中進行。針對我在程式碼中的疑問,我自己理清楚了下,應該是這樣的,一個節點啟動後應該只是找幾個知名節點開始節點發現,這些知名節點是已經建立連線且儲存在bucket中了,當他們返回臨近節點後,這些臨近節點將會走addNode流程,在那裡面節點會發ping包過去,收到了回覆的pong包後會再走一邊noteActiveNode流程,因此noteActiveNode裡面只需要關心已經建立過連線的即可。下面我們繼續看下evict流程,pong包後面將會詳細介紹,繼續。。。

void NodeTable::evict(shared_ptr<NodeEntry> _leastSeen, shared_ptr<NodeEntry> _new)
{
    if (!m_socketPointer->isOpen())  //當前連線關閉
        return;
    
    unsigned evicts = 0;
    DEV_GUARDED(x_evictions)
    {
        EvictionTimeout evictTimeout{_new->id, chrono::steady_clock::now()};  
        m_evictions.emplace(_leastSeen->id, evictTimeout); //插入一個淘汰節點
        evicts = m_evictions.size();
    }

    if (evicts == 1)  //之所以等於1才觸發check流程,是因為在doCheckEvictions會一直處理至m_evictions為空
        doCheckEvictions(); //執行真正淘汰檢查
    ping(_leastSeen.get()); //最後的掙扎
}

其實淘汰一個節點還是挺難的,畢竟當時是辛辛苦苦連上的,evict函式中會對長時間沒有互動的節點做最後一次嘗試,如果你在設定的時間內沒有搭理我,那就對不起了,我真的要刪除你了,doCheckEvictions函式就設定了一個定時器,定時來看看這些待淘汰的節點什麼樣了,一起來看下。。。

void NodeTable::doCheckEvictions()
{
    m_timers.schedule(c_evictionCheckInterval.count(), [this](boost::system::error_code const& _ec)
    {  //c_evictionCheckInterval, 75ms
        if (_ec) // we can't use m_logger here, because captured this might be already destroyed 
            clog(VerbosityDebug, "discov") << "Check Evictions timer was probably cancelled: " << _ec.value() << " " << _ec.message(); 
        if (_ec.value() == boost::asio::error::operation_aborted || m_timers.isStopped()) 
            return; 
        bool evictionsRemain = false; //標記是否m_evictions是否還有未處理的超時標記 
        list<shared_ptr<NodeEntry>> drop; 
        { 
            Guard le(x_evictions); 
            Guard ln(x_nodes); 
            for (auto& e: m_evictions) //如果最後一次ping,依然超時了,那麼就會移除該node 
                if (chrono::steady_clock::now() - e.second.evictedTimePoint > c_reqTimeout) //請求超時300ms 
                    if (m_nodes.count(e.second.newNodeID)) drop.push_back(m_nodes[e.second.newNodeID]); //注意這是從m_nodes裡面移除,m_state會在後面來移除 
            evictionsRemain = (m_evictions.size() - drop.size() > 0); //是否還有超時時間未超過超時時間的
        } 
        drop.unique(); 
        for (auto n: drop) dropNode(n); //被移除的節點需要通知上層哦,畢竟節點不可用了,也就不用用來傳輸p2p資料了 
        if (evictionsRemain) doCheckEvictions(); //如果仍然還有未處理的等待超時檢測的節點,那就繼續設定定時任務來檢查,直到m_evictions為空 
}

簡單看下dropNode。。。

void NodeTable::dropNode(shared_ptr<NodeEntry> _n)
{
    // remove from nodetable 首先從node table中移除,注意看noteActiveNode裡面,對NodetoEvict的節點只有在節點不存在以後才pop了,否則還是保留著呢,避免其實節點好好的,先就被刪掉了
    {
        Guard l(x_state);
        NodeBucket& s = bucket_UNSAFE(_n.get());
        s.nodes.remove_if(
            [_n](weak_ptr<NodeEntry> const& _bucketEntry) { return _bucketEntry == _n; });
    }
    
    // notify host,通知host,告知上層節點被刪掉了
    LOG(m_logger) << "p2p.nodes.drop " << _n->id;
    if (m_nodeEventHandler)
        m_nodeEventHandler->appendEvent(_n->id, NodeEntryDropped);
}

回到evict函式,最後一行進行了最後一次上次,給待刪除節點發了一個ping包,如果該節點沒有響應,那麼這個節點就會被doCheckEvictions裡面給幹掉,但是如果回覆了一個pong包,會怎麼樣呢?這部分邏輯也是在上面onReceived中處理的,前面只介紹了這個函式裡面處理的FindNode請求,下面就看下收到其他三個請求後都幹了點啥,先看下Ping請求。。。

            case PingNode::type: //接收到ping資料,這是有節點過來建立連線
            {
                auto in = dynamic_cast<PingNode const&>(*packet);
                in.source.address = _from.address();
                in.source.udpPort = _from.port();
                addNode(Node(in.sourceid, in.source));  //新增節點
                
                Pong p(in.source);
                p.echo = in.echo;
                p.sign(m_secret);
                m_socketPointer->send(p); //返回pong資料包
                break;
            }

這裡面看到有節點發Ping請求過來,我們先把這個節點新增到自己的節點中,然後再發一個pong包回去,這裡暫時不繼續介紹addNode函式,因為這個後面別的資料包型別也會呼叫,這裡有這個記憶即可。Ping請求比較簡單,再來看看Pong請求,這個比較複雜。。。

           case Pong::type:  //Ping訊息的迴應
            {
                auto in = dynamic_cast<Pong const&>(*packet);
                // whenever a pong is received, check if it's in m_evictions
                // 檢查該節點是否在m_evictions裡面
                bool found = false;
                NodeID leastSeenID;
                EvictionTimeout evictionEntry; 
                DEV_GUARDED(x_evictions)
                { 
                    auto e = m_evictions.find(in.sourceid);
                    if (e != m_evictions.end())
                    { 
                        if (e->second.evictedTimePoint > std::chrono::steady_clock::now())
                        {
                            found = true;
                            leastSeenID = e->first;  //本來將被刪除的節點id
                            evictionEntry = e->second;
                            m_evictions.erase(e);  //這裡面就是收到了evict函式最後的ping了,逃出生天,從m_evictions中移除
                        }
                    }
                }

                if (found)
                {
                    if (auto n = nodeEntry(evictionEntry.newNodeID))
                        dropNode(n); //如果新節點也儲存在m_nodes中,刪除,這個我有點想不通,就不能共存?
                    if (auto n = nodeEntry(leastSeenID))
                        n->pending = false;  //這有點老子依然牛逼的意思,哈哈哈
                }
                else
                {
                    // if not, check if it's known/pending or a pubk discovery ping
                    // 如果不在m_evictions裡面,觀察是否是know/pending,或者是一個pubk 發現ping
                    if (auto n = nodeEntry(in.sourceid))
                        n->pending = false;  //原來這個節點是否是pending狀態
                    else
                    {
                        DEV_GUARDED(x_pubkDiscoverPings)
                        {
                            if (!m_pubkDiscoverPings.count(_from.address()))  //如果不明pong,直接返回
                                return; // unsolicited pong; don't note node as active
                            m_pubkDiscoverPings.erase(_from.address());
                        }
                        if (!haveNode(in.sourceid))   //如果不在m_nodes中,新增node
                            addNode(Node(in.sourceid, NodeIPEndpoint(_from.address(), _from.port(), _from.port())));
                    }
                }
                
                // update our endpoint address and UDP port
                // 更新我們自己的地址和udp埠,難道每一個pong都要更新下?很浪費
                DEV_GUARDED(x_nodes)
                {
                    if ((!m_node.endpoint || !m_node.endpoint.isAllowed()) && isPublicAddress(in.destination.address))
                        m_node.endpoint.address = in.destination.address;
                    m_node.endpoint.udpPort = in.destination.udpPort;
                }

                LOG(m_logger) << "PONG from " << in.sourceid << " " << _from;
                break;
            }

這裡面乾的事情就很多了,首先看下這個節點是不是在等待被淘汰的列表裡面,如果是的話把它給撈出來,如果不是的話要看下他是不是已經在m_nodes裡面了只不過是等待連線上的狀態,這一般是由addNode觸發的,addNode可以直接新增一個節點,然後把這個節點放到列表裡,再給他發一個ping訊息,等待回覆修改penging狀態,或者是有時候可能只知道一個ip地址不知道nodeid,這時候pong訊息會把這個nodeid傳過來,當前節點再把這個節點連帶地址新增到m_nodes裡面去。這裡也可以總結下發送ping的三種情況:

  • 最開始加入時,先新增節點,再發送ping訊息,然後等待回覆修改狀態
  • 淘汰節點的檢測,向待淘汰節點節點發送ping訊息,有回覆再從待淘汰的列表中移走
  • 不知道對方的nodeid,先發送ping,根據pong中訊息的nodeid進行組合

再來看下Neighbours訊息

            case Neighbours::type:  //獲取鄰居節點
            {
                auto in = dynamic_cast<Neighbours const&>(*packet);
                bool expected = false;  //標記是否向某個節點發送過findnode包
                auto now = chrono::steady_clock::now();
                DEV_GUARDED(x_findNodeTimeout)
                    //呼叫了std::list<>的remove_if,刪除list中所有滿足條件的元素
                    m_findNodeTimeout.remove_if([&](NodeIdTimePoint const& t)
                    {
                        if (t.first == in.sourceid && now - t.second < c_reqTimeout)
                            expected = true;
                        else if (t.first == in.sourceid)
                            return true; //只要是當前包傳送的節點,就移除
                        return false;
                    });
                if (!expected)    //如果未發現向某個節點發送findnode,要麼是誤收要麼該node已經被刪除
                {
                    cnetdetails << "Dropping unsolicited neighbours packet from "
                                << _from.address();
                    break;
                }

                for (auto n: in.neighbours)
                    addNode(Node(n.node, n.endpoint));  //挨個新增到自己的m_nodes中
                break;
            }

這部分先check了一遍,就是看這個Neighbours包的傳送方,到底是不是我們之前傳送過FindNode包的節點,我們傳送過FindNode包的節點都會儲存在m_findNodeTimeout裡面了,這個新增過程可以在doDiscover函式中看到;如果不是或者請求超時了,就認為這個節點發過來的資料無效,如果是,那就讀取返回的neighbours資料,根據返回的節點挨個呼叫addNode,那麼接下來,就讓我們來重點看看addNode裡面具體完成了哪些事件吧。。。

shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node, NodeRelation _relation)
{
    if (_relation == Known) //如果這個節點已經連線上了,直接註冊即可
    {
        auto ret = make_shared<NodeEntry>(m_node.id, _node.id, _node.endpoint); //生成對應NodeEntry
        ret->pending = false;
        DEV_GUARDED(x_nodes)
            m_nodes[_node.id] = ret; //新增到m_nodes
        noteActiveNode(_node.id, _node.endpoint);  //新增到m_state
        return ret;
    }
    
    if (!_node.endpoint)  //地址不存在,沒有辦法建立連線的,直接返回空說明新增node失敗
        return shared_ptr<NodeEntry>();
    
    // ping address to recover nodeid if nodeid is empty
    // 如果nodeid為空,發ping訊息過去,獲取對應nodeid
    if (!_node.id)
    {
        DEV_GUARDED(x_nodes)
        LOG(m_logger) << "Sending public key discovery Ping to "
                      << (bi::udp::endpoint)_node.endpoint
                      << " (Advertising: " << (bi::udp::endpoint)m_node.endpoint << ")";
        DEV_GUARDED(x_pubkDiscoverPings)
            m_pubkDiscoverPings[_node.endpoint.address] = std::chrono::steady_clock::now(); 
        ping(_node.endpoint);
        return shared_ptr<NodeEntry>();
    }
    
    DEV_GUARDED(x_nodes)
        if (m_nodes.count(_node.id))  //已經存在,直接返回
            return m_nodes[_node.id];
    
    auto ret = make_shared<NodeEntry>(m_node.id, _node.id, _node.endpoint);
    DEV_GUARDED(x_nodes)
        m_nodes[_node.id] = ret; //不存在則新增,併發送ping訊息過去
        LOG(m_logger) << "addNode pending for " << _node.endpoint;
        ping(_node.endpoint);
        return ret;
}

好了,到這裡NodeTable節點發現相關的細節已經全部介紹完了,至於底層的通訊細節可以留到後面來介紹。可能這是c++的版本的緣故,雖然整個實現比較完整,但是也有不少邏輯貌似還沒有完全的覆蓋,比如:

  1. addNode的時候預設關係都是unknown的,因此新增到m_nodes中去後pending狀態都是false,然後傳送ping訊息等待回覆,但是這些node都是從別的m_nodes中獲取到的,並不保證這些節點一直處於活躍狀態,對於一直沒有回覆pong訊息的,貌似只有在更新的節點過來後走淘汰流程才給剔除了,是否不太合理?
  2. 前面addNode中處於pending狀態的節點還可能被剔除,但是m_pubkDiscoverPings和m_findNodeTimeout中沒有回覆的節點可能再也沒有機會被刪除掉了,在公鏈這種節點動態變化非常頻繁的網路中,是否可能存在這兩者資料會越來越大的風險?

另外,在m_nodes和m_state這兩個中很有可能不明白這兩個都在幹嘛,m_nodes是儲存了當前節點所有想產生關聯的node,不一定代表就一定能連上,但是肯定都會嘗試連線,而m_state是本節點儲存了所有發生過連線的節點列表,記住,一定是至少曾經連線過,這一點可以從m_state的新增只發生在noteActiveNode這個函式裡看出來,但是這裡面的節點如果節點不能連線了,其實他是不知道的,另外還有一點,就是節點的新增事件也是隻在這個函式中體現,也就說,只有活躍可連線的節點才會傳遞給上層來進行p2p通訊。

謝謝大家查閱,如有任何建議或者疑問,歡迎留言評論!!