以太坊原始碼分析之 P2P網路(三、UDP底層通訊)

struct UDPSocketEvents
    virtual ~UDPSocketEvents() = default;
    virtual void onDisconnected(UDPSocketFace*) {}
    virtual void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packetData) = 0;


NodeTable::NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint const& _endpoint, bool _enabled):
    m_node(Node(_alias.pub(), _endpoint)),
    m_socket(make_shared<NodeSocket>(_io, *reinterpret_cast<UDPSocketEvents*>(this), (bi::udp::endpoint)m_node.endpoint)),
    for (unsigned i = 0; i < s_bins; i++)
        m_state[i].distance = i;
    if (!_enabled)
        m_socketPointer->connect(); //開啟連線,這時候就可以接受外界發來的訊息了,m_socketPointer指定了回撥控制代碼就是NodeTable
        doDiscovery();  //節點發現
    catch (std::exception const& _e)
        cwarn << "Exception connecting NodeTable socket: " << _e.what();
        cwarn << "Discovery disabled.";


template <typename Handler, unsigned MaxDatagramSize>
class UDPSocket: UDPSocketFace, public std::enable_shared_from_this<UDPSocket<Handler, MaxDatagramSize>>
    enum { maxDatagramSize = MaxDatagramSize };
    static_assert((unsigned)maxDatagramSize < 65507u, "UDP datagrams cannot be larger than 65507 bytes");
    /// Create socket for specific endpoint.
    UDPSocket(ba::io_service& _io, UDPSocketEvents& _host, bi::udp::endpoint _endpoint): m_host(_host), m_endpoint(_endpoint), m_socket(_io) { m_started.store(false); m_closed.store(true); };
    /// Create socket which listens to all ports.
    UDPSocket(ba::io_service& _io, UDPSocketEvents& _host, unsigned _port): m_host(_host), m_endpoint(bi::udp::v4(), _port), m_socket(_io) { m_started.store(false); m_closed.store(true); };
    virtual ~UDPSocket() { disconnect(); }
    /// Socket will begin listening for and delivering packets
    // 開始監聽並傳送資料包
    void connect();
    /// Send datagram. 傳送資料報
    bool send(UDPDatagram const& _datagram);
    /// Returns if socket is open.
    bool isOpen() { return !m_closed; }
    /// Disconnect socket.
    void disconnect() { disconnectWithError(boost::asio::error::connection_reset); }
    void doRead(); //進行讀操作
    void doWrite(); //進行寫操作
    void disconnectWithError(boost::system::error_code _ec); //斷開
    std::atomic<bool> m_started; ///< Atomically ensure connection is started once. Start cannot occur unless m_started is false. Managed by start and disconnectWithError.
    std::atomic<bool> m_closed;	 ///< Connection availability.
    UDPSocketEvents& m_host;	///< Interface which owns this socket.也就是NodeTable
    bi::udp::endpoint m_endpoint; ///< Endpoint which we listen to. 沒有監聽一說,其實就是一直從這個endpoint上讀資料
    Mutex x_sendQ;
    std::deque<UDPDatagram> m_sendQ;		        ///< Queue for egress data. 傳送資料的佇列
    std::array<byte, maxDatagramSize> m_recvData;	///< Buffer for ingress data. 接受資料的佇列
    bi::udp::endpoint m_recvEndpoint;		        ///< Endpoint data was received from. 接受資料的來源
    bi::udp::socket m_socket;				///< Boost asio udp socket.
    Mutex x_socketError;				///< Mutex for error which can be set from host or IO thread.
    boost::system::error_code m_socketError;		///< Set when shut down due to error.

這裡面直接看函式其實沒啥意思,我們還是從流程上來觀摩這些程式碼,在上面的建構函式中,當socket建立完畢之後,緊接著就呼叫了connect函式,這個函式裡就是定義了可以從這個udp socket進行非同步讀事件,詳細可以看下這個函式的程式碼。。。

template <typename Handler, unsigned MaxDatagramSize>
void UDPSocket<Handler, MaxDatagramSize>::connect()
    bool expect = false;
    if (!m_started.compare_exchange_strong(expect, true))
        m_socket.bind(m_endpoint); //繫結本地地址和埠
    catch (...)
        m_socket.bind(bi::udp::endpoint(bi::udp::v4(), m_endpoint.port()));
    Guard l(x_sendQ);
    m_closed = false;
    doRead();  //上面只是綁定了端點資訊,這裡面真正定義了非同步讀事件


template <typename Handler, unsigned MaxDatagramSize>
void UDPSocket<Handler, MaxDatagramSize>::doRead()
    if (m_closed)
    auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
    m_socket.async_receive_from(boost::asio::buffer(m_recvData), m_recvEndpoint, [this, self](boost::system::error_code _ec, size_t _len)
        if (m_closed)
            return disconnectWithError(_ec);      
        if (_ec != boost::system::errc::success)
            cnetlog << "Receiving UDP message failed. " << _ec.value() << " : " << _ec.message();
        if (_len)  //每次上傳給上層,由上層處理響應資料
            m_host.onReceived(this, m_recvEndpoint, bytesConstRef(m_recvData.data(), _len));


template <typename Handler, unsigned MaxDatagramSize>
bool UDPSocket<Handler, MaxDatagramSize>::send(UDPDatagram const& _datagram)
    if (m_closed)
        return false;
    Guard l(x_sendQ);
    m_sendQ.push_back(_datagram);  //先放到佇列中去
    if (m_sendQ.size() == 1) //一旦有了,立馬進行doWrite
    return true;


template <typename Handler, unsigned MaxDatagramSize>
void UDPSocket<Handler, MaxDatagramSize>::doWrite()
    if (m_closed)
    const UDPDatagram& datagram = m_sendQ[0]; //取出佇列中第一個資料,而且第一個資料肯定存在,由前面保證的
    auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
    bi::udp::endpoint endpoint(datagram.endpoint()); //資料報中包含了目標地址
    m_socket.async_send_to(boost::asio::buffer(datagram.data), endpoint, [this, self, endpoint](boost::system::error_code _ec, std::size_t)
        if (m_closed) //關閉了
            return disconnectWithError(_ec);        
        if (_ec != boost::system::errc::success)
            cnetlog << "Failed delivering UDP message. " << _ec.value() << " : " << _ec.message();
        Guard l(x_sendQ);
        m_sendQ.pop_front();  //傳送成功,將佇列頭資料剔除
        if (m_sendQ.empty())  //勤奮的小夥子完成了工作,退出
        doWrite(); //沒有完成,繼續寫


class UDPDatagram
    UDPDatagram(bi::udp::endpoint const& _ep): locus(_ep) {}
    UDPDatagram(bi::udp::endpoint const& _ep, bytes _data): data(_data), locus(_ep) {}
    bi::udp::endpoint const& endpoint() const { return locus; }
    bytes data;  //攜帶的訊息體
    bi::udp::endpoint locus;  //待發送的目的地址


struct RLPXDatagramFace: public UDPDatagram
    static uint32_t futureFromEpoch(std::chrono::seconds _sec) { return static_cast<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count()); }
    static uint32_t secondsSinceEpoch() { return static_cast<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>((std::chrono::system_clock::now()).time_since_epoch()).count()); }
    static Public authenticate(bytesConstRef _sig, bytesConstRef _rlp);

    RLPXDatagramFace(bi::udp::endpoint const& _ep): UDPDatagram(_ep) {}
    virtual ~RLPXDatagramFace() = default;

    virtual h256 sign(Secret const& _from);
    virtual uint8_t packetType() const = 0;

    virtual void streamRLP(RLPStream&) const = 0;
    virtual void interpretRLP(bytesConstRef _bytes) = 0;


struct DiscoveryDatagram: public RLPXDatagramFace
    /// Constructor used for sending. 這是用來發送資料報時呼叫的建構函式
    DiscoveryDatagram(bi::udp::endpoint const& _to): RLPXDatagramFace(_to), ts(futureFromEpoch(std::chrono::seconds(60))) {}
    /// Constructor used for parsing inbound packets. 這是接受到資料報時定義的建構函式
    DiscoveryDatagram(bi::udp::endpoint const& _from, NodeID const& _fromid, h256 const& _echo): RLPXDatagramFace(_from), sourceid(_fromid), echo(_echo) {}
    // These two are set for inbound packets only. 下面定義的變數只有在接受的資料報中才會用到
    NodeID sourceid; // sender public key (from signature) 傳送方的公鑰
    h256 echo;       // hash of encoded packet, for reply tracking 編碼過的資料報的雜湊值,用來回復響應的跟蹤
    // All discovery packets carry a timestamp, which must be greater 所有用來節點發現的資料報都包含了一個時間戳,
    // than the current local time. This prevents replay attacks. 這個時間戳必須大於當前時間,防止重複攻擊
    uint32_t ts = 0;
    bool isExpired() const { return secondsSinceEpoch() > ts; } //是否過期,ts裡面定義的是傳送時刻+60s
    /// Decodes UDP packets. 解碼udp資料報
    static std::unique_ptr<DiscoveryDatagram> interpretUDP(bi::udp::endpoint const& _from, bytesConstRef _packet);


 * Ping packet: Sent to check if node is alive.
 * PingNode is cached and regenerated after ts + t, where t is timeout.
 * ping資料報,用於檢測node是否是活的,被快取,過期後重新生成
 * Ping is used to implement evict. When a new node is seen for / ping用來完成淘汰,一個給定bucket的新node到來時如果滿了的話
 * a given bucket which is full, the least-responsive node is pinged.  // 最少回覆的node會被ping
 * If the pinged node doesn't respond, then it is removed and the new
 * node is inserted. 如果被ping的node沒有被回覆,那麼將會被移除,新節點會被插入
struct PingNode: DiscoveryDatagram
    using DiscoveryDatagram::DiscoveryDatagram;
    PingNode(NodeIPEndpoint const& _src, NodeIPEndpoint const& _dest): DiscoveryDatagram(_dest), source(_src), destination(_dest) {}
    PingNode(bi::udp::endpoint const& _from, NodeID const& _fromid, h256 const& _echo): DiscoveryDatagram(_from, _fromid, _echo) {}
    static const uint8_t type = 1;
    uint8_t packetType() const { return type; }
    unsigned version = 0;  //版本號
    NodeIPEndpoint source;  //源節點
    NodeIPEndpoint destination;  //目的節點

    void streamRLP(RLPStream& _s) const //生成rlp編碼資料
        _s << dev::p2p::c_protocolVersion;  //協議版本號
        source.streamRLP(_s);  //寫入源節點
        destination.streamRLP(_s); //寫入目的節點
        _s << ts; //超時時間節點
    void interpretRLP(bytesConstRef _bytes)  //rlp解碼
        RLP r(_bytes, RLP::AllowNonCanon|RLP::ThrowOnFail);
        version = r[0].toInt<unsigned>();  //版本好
        source.interpretRLP(r[1]);  //源節點
        destination.interpretRLP(r[2]); //目的節點
        ts = r[3].toInt<uint32_t>(); //時間


 * Pong packet: Sent in response to ping,ping的回覆
struct Pong: DiscoveryDatagram
    Pong(NodeIPEndpoint const& _dest): DiscoveryDatagram((bi::udp::endpoint)_dest), destination(_dest) {}
    Pong(bi::udp::endpoint const& _from, NodeID const& _fromid, h256 const& _echo): DiscoveryDatagram(_from, _fromid, _echo) {}
    static const uint8_t type = 2;
    uint8_t packetType() const { return type; }
    NodeIPEndpoint destination;  //目的地址
    void streamRLP(RLPStream& _s) const  // rlp編碼
        destination.streamRLP(_s);  //目的地址
        _s << echo; //ping包的雜湊值,貌似後面也沒有用到
        _s << ts;  //時間
    void interpretRLP(bytesConstRef _bytes)
        RLP r(_bytes, RLP::AllowNonCanon|RLP::ThrowOnFail);
        echo = (h256)r[1];
        ts = r[2].toInt<uint32_t>();


struct FindNode: DiscoveryDatagram
    FindNode(bi::udp::endpoint _to, h512 _target): DiscoveryDatagram(_to), target(_target) {}
    FindNode(bi::udp::endpoint const& _from, NodeID const& _fromid, h256 const& _echo): DiscoveryDatagram(_from, _fromid, _echo) {}
    static const uint8_t type = 3;
    uint8_t packetType() const { return type; }
    h512 target;  //待尋找的目標節點
    void streamRLP(RLPStream& _s) const
        _s.appendList(2); _s << target << ts; //更簡單了,只有目標節點和超時時間
    void interpretRLP(bytesConstRef _bytes)
        RLP r(_bytes, RLP::AllowNonCanon|RLP::ThrowOnFail);
        target = r[0].toHash<h512>();
        ts = r[1].toInt<uint32_t>();


struct Neighbours: DiscoveryDatagram
{   //Neighbours資料報,可能包含0個或多個Neighbour節點
    Neighbours(bi::udp::endpoint _to, std::vector<std::shared_ptr<NodeEntry>> const& _nearest, unsigned _offset = 0, unsigned _limit = 0): DiscoveryDatagram(_to)
        auto limit = _limit ? std::min(_nearest.size(), (size_t)(_offset + _limit)) : _nearest.size();  
        for (auto i = _offset; i < limit; i++)
    Neighbours(bi::udp::endpoint const& _to): DiscoveryDatagram(_to) {}  //傳送方
    Neighbours(bi::udp::endpoint const& _from, NodeID const& _fromid, h256 const& _echo): DiscoveryDatagram(_from, _fromid, _echo) {}

    struct Neighbour  //定義一個Neighbour結構
        Neighbour(Node const& _node): endpoint(_node.endpoint), node(_node.id) {}  //包含端點資訊,節點id
        Neighbour(RLP const& _r): endpoint(_r) { node = h512(_r[3].toBytes()); }
        NodeIPEndpoint endpoint;
        NodeID node;
        void streamRLP(RLPStream& _s) const { _s.appendList(4); endpoint.streamRLP(_s, NodeIPEndpoint::StreamInline); _s << node; }

    static const uint8_t type = 4;
    uint8_t packetType() const { return type; }

    std::vector<Neighbour> neighbours;

    void streamRLP(RLPStream& _s) const
        for (auto const& n: neighbours)
        _s << ts;
    void interpretRLP(bytesConstRef _bytes)
        RLP r(_bytes, RLP::AllowNonCanon|RLP::ThrowOnFail);
        for (auto const& n: r[0])
        ts = r[1].toInt<uint32_t>();

Neighbours資料報主要包括三部分資料,節點數 | 節點資訊 | 超期時間。上述就是節點通訊過程的四種資料型別,結構比較簡單,就是傳送方與接收方對節點的初始化是不一樣的,這點看的時候可能比較繞,但是理解清楚了之後就明白了。最後還要介紹一點事情,就是這四個資料報的結構是轉化成rlp資料再通過udp傳送的,在傳送前,會對這些資料包進行了簽名的操作,這部分程式碼是在RLPXDatagramFace中sign函式完成的,這個函式的詳細定義是在udp.cpp中,最後看下這個函式來結束這篇部落格吧。。。

h256 RLPXDatagramFace::sign(Secret const& _k) //簽名演算法
    RLPStream rlpxstream;
//	rlpxstream.appendRaw(toPublic(_k).asBytes()); // for mdc-based signature
    rlpxstream.appendRaw(bytes(1, packetType())); // prefix by 1 byte for type,第一個位元組是型別
    streamRLP(rlpxstream); //繼續往裡面新增資料,得到rlp資料
    bytes rlpxBytes(rlpxstream.out()); //放到rlpxBytes中
    bytesConstRef rlpx(&rlpxBytes); 
    h256 sighash(dev::sha3(rlpx)); // H(type||data),計算hash
    Signature sig = dev::sign(_k, sighash); // S(H(type||data)) 對雜湊值簽名,用_k私鑰
    data.resize(h256::size + Signature::size + rlpx.size()); //資料報的資料,
    bytesRef rlpxHash(&data[0], h256::size);
    bytesRef rlpxSig(&data[h256::size], Signature::size);
    bytesRef rlpxPayload(&data[h256::size + Signature::size], rlpx.size());
    bytesConstRef signedRLPx(&data[h256::size], data.size() - h256::size);
    // data
    //   h256:: size      Signature::size     rlpx.size()
    // 0 -> h256::size                | h256::size + Signature::size         | hash::size + Signature + rlp.size
    // 對後面的資料又進行了一次雜湊的雜湊值 | 對型別+資料的rlp的bytes進行雜湊後的簽名   | 型別+資料的rlp的bytes
    return sighash;