以太坊原始碼分析之 P2P網路(三、UDP底層通訊)
區塊鏈特輯 :https://blog.csdn.net/fusan2004/article/details/80879343,歡迎查閱,原創作品,轉載請標明!
這周工作有點小忙,部門區塊鏈基礎平臺的開發開始進入節奏了,和上一篇間隔間隔有點久了,以後還是要堅持,不能剛開始就犯毛病了。上篇講的是以太坊p2p網路的一個重點部分——節點發現,在介紹的時候提過,節點發現是通過udp的方式來進行的,這一篇就介紹下udp通訊的詳細細節,這部分不是很多,算是個過渡吧。
回頭再看下NodeTable類,可以看到這個類繼承了UDPSocketEvents類,也就是這裡和udp通訊建立了關聯,下面我們先看下這個類都幹了些啥。。。
struct UDPSocketEvents { virtual ~UDPSocketEvents() = default; virtual void onDisconnected(UDPSocketFace*) {} virtual void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packetData) = 0; };
可以看到,這個類只提供了兩個api介面,分別是onDisconnected和onReceived,其中onDisconnected有一個定義,空的函式體,在看程式碼的時候也能發現這個函式目前的程式碼並沒有起到什麼作用,可暫時忽略;另外的onReceived是一個純虛擬函式,也就意味著NodeTable必須要實現這個函式,再回頭去看NodeTable類中關於這個函式的詳細編碼,可以看到這是整個NodeTable更新發現的重要起點,也就是在這裡解析了前面所描述的ping\pong\findnode\neighbours這四類訊息,具體這四類訊息,我們後面再細說,現在繼續說說UDPSocketEvents是如何跟udp關聯上的,看看NodeTable的建構函式實現。。。
NodeTable::NodeTable(ba::io_service& _io, KeyPair const& _alias, NodeIPEndpoint const& _endpoint, bool _enabled): m_node(Node(_alias.pub(), _endpoint)), m_secret(_alias.secret()), m_socket(make_shared<NodeSocket>(_io, *reinterpret_cast<UDPSocketEvents*>(this), (bi::udp::endpoint)m_node.endpoint)), m_socketPointer(m_socket.get()), m_timers(_io) { for (unsigned i = 0; i < s_bins; i++) m_state[i].distance = i; if (!_enabled) return; try { m_socketPointer->connect(); //開啟連線,這時候就可以接受外界發來的訊息了,m_socketPointer指定了回撥控制代碼就是NodeTable doDiscovery(); //節點發現 } catch (std::exception const& _e) { cwarn << "Exception connecting NodeTable socket: " << _e.what(); cwarn << "Discovery disabled."; } }
可以看到m_socket就是一個NodeSocket的例項,這個NodeSocket的建構函式中需要this指標傳遞,後面都將通過這個指標完成onReceived的回撥,邏輯很清晰啦,繼續看下NodeSocket吧。。。
template <typename Handler, unsigned MaxDatagramSize>
class UDPSocket: UDPSocketFace, public std::enable_shared_from_this<UDPSocket<Handler, MaxDatagramSize>>
{
public:
enum { maxDatagramSize = MaxDatagramSize };
static_assert((unsigned)maxDatagramSize < 65507u, "UDP datagrams cannot be larger than 65507 bytes");
/// Create socket for specific endpoint.
//為指定的endpoint建立socket
UDPSocket(ba::io_service& _io, UDPSocketEvents& _host, bi::udp::endpoint _endpoint): m_host(_host), m_endpoint(_endpoint), m_socket(_io) { m_started.store(false); m_closed.store(true); };
/// Create socket which listens to all ports.
UDPSocket(ba::io_service& _io, UDPSocketEvents& _host, unsigned _port): m_host(_host), m_endpoint(bi::udp::v4(), _port), m_socket(_io) { m_started.store(false); m_closed.store(true); };
virtual ~UDPSocket() { disconnect(); }
/// Socket will begin listening for and delivering packets
// 開始監聽並傳送資料包
void connect();
/// Send datagram. 傳送資料報
bool send(UDPDatagram const& _datagram);
/// Returns if socket is open.
bool isOpen() { return !m_closed; }
/// Disconnect socket.
void disconnect() { disconnectWithError(boost::asio::error::connection_reset); }
protected:
void doRead(); //進行讀操作
void doWrite(); //進行寫操作
void disconnectWithError(boost::system::error_code _ec); //斷開
std::atomic<bool> m_started; ///< Atomically ensure connection is started once. Start cannot occur unless m_started is false. Managed by start and disconnectWithError.
std::atomic<bool> m_closed; ///< Connection availability.
UDPSocketEvents& m_host; ///< Interface which owns this socket.也就是NodeTable
bi::udp::endpoint m_endpoint; ///< Endpoint which we listen to. 沒有監聽一說,其實就是一直從這個endpoint上讀資料
Mutex x_sendQ;
std::deque<UDPDatagram> m_sendQ; ///< Queue for egress data. 傳送資料的佇列
std::array<byte, maxDatagramSize> m_recvData; ///< Buffer for ingress data. 接受資料的佇列
bi::udp::endpoint m_recvEndpoint; ///< Endpoint data was received from. 接受資料的來源
bi::udp::socket m_socket; ///< Boost asio udp socket.
Mutex x_socketError; ///< Mutex for error which can be set from host or IO thread.
boost::system::error_code m_socketError; ///< Set when shut down due to error.
};
這裡面直接看函式其實沒啥意思,我們還是從流程上來觀摩這些程式碼,在上面的建構函式中,當socket建立完畢之後,緊接著就呼叫了connect函式,這個函式裡就是定義了可以從這個udp socket進行非同步讀事件,詳細可以看下這個函式的程式碼。。。
template <typename Handler, unsigned MaxDatagramSize>
void UDPSocket<Handler, MaxDatagramSize>::connect()
{
bool expect = false;
if (!m_started.compare_exchange_strong(expect, true))
return;
m_socket.open(bi::udp::v4());
try
{
m_socket.bind(m_endpoint); //繫結本地地址和埠
}
catch (...)
{
m_socket.bind(bi::udp::endpoint(bi::udp::v4(), m_endpoint.port()));
}
//因為connect只會呼叫一次,為了不傳送舊訊息,需要清理寫資料的佇列
Guard l(x_sendQ);
m_sendQ.clear();
m_closed = false;
doRead(); //上面只是綁定了端點資訊,這裡面真正定義了非同步讀事件
}
可以看到,這個函式目前還沒有實質性的,繼續看下doRead函式。。。
template <typename Handler, unsigned MaxDatagramSize>
void UDPSocket<Handler, MaxDatagramSize>::doRead()
{
if (m_closed)
return;
auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
//非同步讀讀事件,這個函式是說,如果有資料來了,請把資料放到m_recvData中,m_recvEndpoint會記錄資料來源的端點資訊,讀取完成後回撥函式
m_socket.async_receive_from(boost::asio::buffer(m_recvData), m_recvEndpoint, [this, self](boost::system::error_code _ec, size_t _len)
{
if (m_closed)
return disconnectWithError(_ec);
if (_ec != boost::system::errc::success)
cnetlog << "Receiving UDP message failed. " << _ec.value() << " : " << _ec.message();
if (_len) //每次上傳給上層,由上層處理響應資料
m_host.onReceived(this, m_recvEndpoint, bytesConstRef(m_recvData.data(), _len));
doRead();
});
}
終於看到了onReceived了,可以看到如果有資料過來了,只要讀取正常了,udp是不負責資料完整性的,直接會把資料和讀取到的長度資訊拋給了上層,也就是NodeTable來處理,這也是為啥NodeTable的onReceived的程式碼裡有很大一部分是來處理資料校驗的,上層處理完畢之後,繼續doRead來進行下一次的資料請求的監聽,這個迴圈就介紹完了;有讀就有寫呀,寫操作都是上層呼叫了send函式來觸發的,看下send函式。。。
template <typename Handler, unsigned MaxDatagramSize>
bool UDPSocket<Handler, MaxDatagramSize>::send(UDPDatagram const& _datagram)
{
if (m_closed)
return false;
Guard l(x_sendQ);
m_sendQ.push_back(_datagram); //先放到佇列中去
if (m_sendQ.size() == 1) //一旦有了,立馬進行doWrite
doWrite();
return true;
}
大家可能會奇怪,為啥是佇列大小為1的時候才進行doWrite,這裡解釋下,可以看到在佇列進行push的時候是有鎖的,也就是說佇列push肯定是序列的,但是doWrite立馬待會可以看到,是一個努力的小夥子,不把佇列立馬的全清空不罷休,所以也就是說只有佇列裡有資料,doWrite是一直被呼叫的,只有這個socket閒下來了,突然來了一個數據報,才需要重新呼叫doWrite,好吧,來看看勤奮的小夥子怎麼做的。。。
template <typename Handler, unsigned MaxDatagramSize>
void UDPSocket<Handler, MaxDatagramSize>::doWrite()
{
if (m_closed)
return;
const UDPDatagram& datagram = m_sendQ[0]; //取出佇列中第一個資料,而且第一個資料肯定存在,由前面保證的
auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
bi::udp::endpoint endpoint(datagram.endpoint()); //資料報中包含了目標地址
m_socket.async_send_to(boost::asio::buffer(datagram.data), endpoint, [this, self, endpoint](boost::system::error_code _ec, std::size_t)
{
if (m_closed) //關閉了
return disconnectWithError(_ec);
if (_ec != boost::system::errc::success)
cnetlog << "Failed delivering UDP message. " << _ec.value() << " : " << _ec.message();
Guard l(x_sendQ);
m_sendQ.pop_front(); //傳送成功,將佇列頭資料剔除
if (m_sendQ.empty()) //勤奮的小夥子完成了工作,退出
return;
doWrite(); //沒有完成,繼續寫
});
}
好了,這就是udp連線、接受、傳送的全部流程啦,其實也就是udp的幾個方法的使用,很簡單。前面提到了,節點發現包含了4中訊息型別,下面介紹下這些訊息的格式,首先這4個訊息都有一些共同的父類,慢慢來看,先看下最最上層的UDPDatagram類。。。
class UDPDatagram
{
public:
UDPDatagram(bi::udp::endpoint const& _ep): locus(_ep) {}
UDPDatagram(bi::udp::endpoint const& _ep, bytes _data): data(_data), locus(_ep) {}
bi::udp::endpoint const& endpoint() const { return locus; }
bytes data; //攜帶的訊息體
protected:
bi::udp::endpoint locus; //待發送的目的地址
};
很簡單的,不需要多說啥,繼續。。。
struct RLPXDatagramFace: public UDPDatagram
{
static uint32_t futureFromEpoch(std::chrono::seconds _sec) { return static_cast<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>((std::chrono::system_clock::now() + _sec).time_since_epoch()).count()); }
static uint32_t secondsSinceEpoch() { return static_cast<uint32_t>(std::chrono::duration_cast<std::chrono::seconds>((std::chrono::system_clock::now()).time_since_epoch()).count()); }
static Public authenticate(bytesConstRef _sig, bytesConstRef _rlp);
RLPXDatagramFace(bi::udp::endpoint const& _ep): UDPDatagram(_ep) {}
virtual ~RLPXDatagramFace() = default;
virtual h256 sign(Secret const& _from);
virtual uint8_t packetType() const = 0;
virtual void streamRLP(RLPStream&) const = 0;
virtual void interpretRLP(bytesConstRef _bytes) = 0;
};
這個類是個純虛類,也就是說沒幹啥具體的事情,只不過從名字我們可以看出來,udp通訊的資料採用了rlp來進行了編解碼,streamRLP就是rlp編碼,interpretRLP就是rlp解碼,繼續。。。
struct DiscoveryDatagram: public RLPXDatagramFace
{
/// Constructor used for sending. 這是用來發送資料報時呼叫的建構函式
DiscoveryDatagram(bi::udp::endpoint const& _to): RLPXDatagramFace(_to), ts(futureFromEpoch(std::chrono::seconds(60))) {}
/// Constructor used for parsing inbound packets. 這是接受到資料報時定義的建構函式
DiscoveryDatagram(bi::udp::endpoint const& _from, NodeID const& _fromid, h256 const& _echo): RLPXDatagramFace(_from), sourceid(_fromid), echo(_echo) {}
// These two are set for inbound packets only. 下面定義的變數只有在接受的資料報中才會用到
NodeID sourceid; // sender public key (from signature) 傳送方的公鑰
h256 echo; // hash of encoded packet, for reply tracking 編碼過的資料報的雜湊值,用來回復響應的跟蹤
// All discovery packets carry a timestamp, which must be greater 所有用來節點發現的資料報都包含了一個時間戳,
// than the current local time. This prevents replay attacks. 這個時間戳必須大於當前時間,防止重複攻擊
uint32_t ts = 0;
bool isExpired() const { return secondsSinceEpoch() > ts; } //是否過期,ts裡面定義的是傳送時刻+60s
/// Decodes UDP packets. 解碼udp資料報
static std::unique_ptr<DiscoveryDatagram> interpretUDP(bi::udp::endpoint const& _from, bytesConstRef _packet);
};
這裡面定義了節點發現訊息的直接父類,後面的四個訊息型別都是基於這個類定義的,從這個類中我們總結幾個事情,一個是傳送訊息和接受訊息的定義是不一樣,接受訊息需要看下節點id和echo,來知道是什麼人發的資料,已經發的是不是我要的,二是定義了過期策略,這個也就是在udp實現中需要考慮,因為udp是無連線的,必須要自己控制,接下來正經看看那四個訊息吧。。。
/**
* Ping packet: Sent to check if node is alive.
* PingNode is cached and regenerated after ts + t, where t is timeout.
* ping資料報,用於檢測node是否是活的,被快取,過期後重新生成
* Ping is used to implement evict. When a new node is seen for / ping用來完成淘汰,一個給定bucket的新node到來時如果滿了的話
* a given bucket which is full, the least-responsive node is pinged. // 最少回覆的node會被ping
* If the pinged node doesn't respond, then it is removed and the new
* node is inserted. 如果被ping的node沒有被回覆,那麼將會被移除,新節點會被插入
*/
struct PingNode: DiscoveryDatagram
{
using DiscoveryDatagram::DiscoveryDatagram;
PingNode(NodeIPEndpoint const& _src, NodeIPEndpoint const& _dest): DiscoveryDatagram(_dest), source(_src), destination(_dest) {}
PingNode(bi::udp::endpoint const& _from, NodeID const& _fromid, h256 const& _echo): DiscoveryDatagram(_from, _fromid, _echo) {}
static const uint8_t type = 1;
uint8_t packetType() const { return type; }
unsigned version = 0; //版本號
NodeIPEndpoint source; //源節點
NodeIPEndpoint destination; //目的節點
void streamRLP(RLPStream& _s) const //生成rlp編碼資料
{
_s.appendList(4);
_s << dev::p2p::c_protocolVersion; //協議版本號
source.streamRLP(_s); //寫入源節點
destination.streamRLP(_s); //寫入目的節點
_s << ts; //超時時間節點
}
void interpretRLP(bytesConstRef _bytes) //rlp解碼
{
RLP r(_bytes, RLP::AllowNonCanon|RLP::ThrowOnFail);
version = r[0].toInt<unsigned>(); //版本好
source.interpretRLP(r[1]); //源節點
destination.interpretRLP(r[2]); //目的節點
ts = r[3].toInt<uint32_t>(); //時間
}
};
從上面這段程式碼可以看到ping資料組成形式為:版本|源地址|目的地址|超時時間,繼續看下響應的pong資料報結構。。。
/**
* Pong packet: Sent in response to ping,ping的回覆
*/
struct Pong: DiscoveryDatagram
{
Pong(NodeIPEndpoint const& _dest): DiscoveryDatagram((bi::udp::endpoint)_dest), destination(_dest) {}
Pong(bi::udp::endpoint const& _from, NodeID const& _fromid, h256 const& _echo): DiscoveryDatagram(_from, _fromid, _echo) {}
static const uint8_t type = 2;
uint8_t packetType() const { return type; }
NodeIPEndpoint destination; //目的地址
void streamRLP(RLPStream& _s) const // rlp編碼
{
_s.appendList(3);
destination.streamRLP(_s); //目的地址
_s << echo; //ping包的雜湊值,貌似後面也沒有用到
_s << ts; //時間
}
void interpretRLP(bytesConstRef _bytes)
{
RLP r(_bytes, RLP::AllowNonCanon|RLP::ThrowOnFail);
destination.interpretRLP(r[0]);
echo = (h256)r[1];
ts = r[2].toInt<uint32_t>();
}
};
可以看到pong資料報更簡單,只包括了目的地址|echo|過期時間,其實程式碼裡只關係了,這個節點所攜帶的node_id,繼續看findnode的資料報結構。。。
struct FindNode: DiscoveryDatagram
{
FindNode(bi::udp::endpoint _to, h512 _target): DiscoveryDatagram(_to), target(_target) {}
FindNode(bi::udp::endpoint const& _from, NodeID const& _fromid, h256 const& _echo): DiscoveryDatagram(_from, _fromid, _echo) {}
static const uint8_t type = 3;
uint8_t packetType() const { return type; }
h512 target; //待尋找的目標節點
void streamRLP(RLPStream& _s) const
{
_s.appendList(2); _s << target << ts; //更簡單了,只有目標節點和超時時間
}
void interpretRLP(bytesConstRef _bytes)
{
RLP r(_bytes, RLP::AllowNonCanon|RLP::ThrowOnFail);
target = r[0].toHash<h512>();
ts = r[1].toInt<uint32_t>();
}
};
FindNode的資料結構也很簡單,只包括了目標節點id和超時時間,最後看下Neighbours的資料報。。。
struct Neighbours: DiscoveryDatagram
{ //Neighbours資料報,可能包含0個或多個Neighbour節點
Neighbours(bi::udp::endpoint _to, std::vector<std::shared_ptr<NodeEntry>> const& _nearest, unsigned _offset = 0, unsigned _limit = 0): DiscoveryDatagram(_to)
{
auto limit = _limit ? std::min(_nearest.size(), (size_t)(_offset + _limit)) : _nearest.size();
for (auto i = _offset; i < limit; i++)
neighbours.push_back(Neighbour(*_nearest[i]));
}
Neighbours(bi::udp::endpoint const& _to): DiscoveryDatagram(_to) {} //傳送方
Neighbours(bi::udp::endpoint const& _from, NodeID const& _fromid, h256 const& _echo): DiscoveryDatagram(_from, _fromid, _echo) {}
struct Neighbour //定義一個Neighbour結構
{
Neighbour(Node const& _node): endpoint(_node.endpoint), node(_node.id) {} //包含端點資訊,節點id
Neighbour(RLP const& _r): endpoint(_r) { node = h512(_r[3].toBytes()); }
NodeIPEndpoint endpoint;
NodeID node;
void streamRLP(RLPStream& _s) const { _s.appendList(4); endpoint.streamRLP(_s, NodeIPEndpoint::StreamInline); _s << node; }
};
static const uint8_t type = 4;
uint8_t packetType() const { return type; }
std::vector<Neighbour> neighbours;
void streamRLP(RLPStream& _s) const
{
_s.appendList(2);
_s.appendList(neighbours.size());
for (auto const& n: neighbours)
n.streamRLP(_s);
_s << ts;
}
void interpretRLP(bytesConstRef _bytes)
{
RLP r(_bytes, RLP::AllowNonCanon|RLP::ThrowOnFail);
for (auto const& n: r[0])
neighbours.emplace_back(n);
ts = r[1].toInt<uint32_t>();
}
};
Neighbours資料報主要包括三部分資料,節點數 | 節點資訊 | 超期時間。上述就是節點通訊過程的四種資料型別,結構比較簡單,就是傳送方與接收方對節點的初始化是不一樣的,這點看的時候可能比較繞,但是理解清楚了之後就明白了。最後還要介紹一點事情,就是這四個資料報的結構是轉化成rlp資料再通過udp傳送的,在傳送前,會對這些資料包進行了簽名的操作,這部分程式碼是在RLPXDatagramFace中sign函式完成的,這個函式的詳細定義是在udp.cpp中,最後看下這個函式來結束這篇部落格吧。。。
h256 RLPXDatagramFace::sign(Secret const& _k) //簽名演算法
{
assert(packetType());
RLPStream rlpxstream;
// rlpxstream.appendRaw(toPublic(_k).asBytes()); // for mdc-based signature
rlpxstream.appendRaw(bytes(1, packetType())); // prefix by 1 byte for type,第一個位元組是型別
streamRLP(rlpxstream); //繼續往裡面新增資料,得到rlp資料
bytes rlpxBytes(rlpxstream.out()); //放到rlpxBytes中
bytesConstRef rlpx(&rlpxBytes);
h256 sighash(dev::sha3(rlpx)); // H(type||data),計算hash
Signature sig = dev::sign(_k, sighash); // S(H(type||data)) 對雜湊值簽名,用_k私鑰
data.resize(h256::size + Signature::size + rlpx.size()); //資料報的資料,
bytesRef rlpxHash(&data[0], h256::size);
bytesRef rlpxSig(&data[h256::size], Signature::size);
bytesRef rlpxPayload(&data[h256::size + Signature::size], rlpx.size());
sig.ref().copyTo(rlpxSig);
rlpx.copyTo(rlpxPayload);
bytesConstRef signedRLPx(&data[h256::size], data.size() - h256::size);
dev::sha3(signedRLPx).ref().copyTo(rlpxHash);
// data
// h256:: size Signature::size rlpx.size()
// 0 -> h256::size | h256::size + Signature::size | hash::size + Signature + rlp.size
// 對後面的資料又進行了一次雜湊的雜湊值 | 對型別+資料的rlp的bytes進行雜湊後的簽名 | 型別+資料的rlp的bytes
return sighash;
}