1. 程式人生 > >比特幣原始碼分析-網路(二)

比特幣原始碼分析-網路(二)

比特幣原始碼分析-網路(二)

https://www.jianshu.com/p/4b42d8698f35

 

眾所周知,比特幣網路是採用的P2P網路體系,所以,沒有明顯的客戶端與服務端的區別或者是概念,每一個節點既是自身的客戶端,又是其它節點的服務端。

sync.h中,定義了 CSemaphore,它包裝了系統底層的訊號量機制,對wait(), try_wait(),post()實現了封裝,程式碼如下:

class CSemaphore {
private:
    boost::condition_variable condition;
    boost::mutex mutex;
    int value;
public:
    CSemaphore(int init) : value(init) {}
    void wait() {}
    bool try_wait() {}
    void post() {}
};

用於控制網路連線時的最大數量,每一個網路節點的最大連線數受限於訊號量所允許的最大值。

下面我們按照一個網路連線從傳送到接收到請求返回的這麼個思路,來梳理程式碼邏輯。

004.png

CNode

CNode定義在bitcoin.cpp中,是比較重要的也是較為複雜的一個類,節點的所有資訊都包含在內:

class CNode {
    SOCKET sock; //用來連線的socket控制代碼
    CDataStream vSend; //傳送訊息
    CDataStream vRecv; //接收訊息
    uint32_t nHeaderStart; //頭資訊開始
    uint32_t nMessageStart; 
    int nVersion; //版本資訊
    std::string strSubVer; 
    int nStartingHeight; //起始高度
    std::vector<CAddress> *vAddr; //ip地址(網路上節點的連線資訊)
    int ban; 
    int64_t doneAfter; 
    CAddress you;
};

在上述定義中,最主要的是 std::vector<CAddress> *vAddr; 它包含了連線的所有節點,如果有節點連線進來,就加入到這個vector中;如果某個節點斷開連線,就從這個vector中刪除。

net.h中,對CNode進行了詳細的定義(所有關於節點的資訊,都進行了詳細羅列),由於篇幅較長,只羅列其中的一些關鍵結構:

/** Information about a peer */
class CNode {
    friend class CConnman;

public:
    SOCKET hSocket; //連線的socket控制代碼
    size_t nSendSize; //所有vSendMsg條目的總大小。
    size_t nSendOffset; //已經發送的第一個vSendMsg內的偏移量。
    std::deque<std::vector<uint8_t>> vSendMsg;//傳送訊息的陣列
     ...
    const CAddress addr;//節點地址資訊
    std::atomic<int> nVersion;//版本資訊
    CBloomFilter *pfilter;//海量過濾器
    const NodeId id;//節點ID

protected:
    mapMsgCmdSize mapSendBytesPerMsgCmd;
    mapMsgCmdSize mapRecvBytesPerMsgCmd;

public:
    uint256 hashContinue;
    std::atomic<int> nStartingHeight;
private:
    std::list<CNetMessage> vRecvMsg;//接收訊息的陣列
public:
    //用來解析接收到的訊息資料
    bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool &complete);

    //用來設定接收版本
    void SetRecvVersion(int nVersionIn) { nRecvVersion = nVersionIn; }
    int GetRecvVersion() { return nRecvVersion; }
    void SetSendVersion(int nVersionIn);
    int GetSendVersion() const;

    //用來發送地址
    void PushAddress(const CAddress &_addr, FastRandomContext &insecure_rand) {
        // Known checking here is only to save space from duplicates.
        // SendMessages will filter it again for knowns that were added
        // after addresses were pushed.
        if (_addr.IsValid() && !addrKnown.contains(_addr.GetKey())) {
            if (vAddrToSend.size() >= MAX_ADDR_TO_SEND) {
                vAddrToSend[insecure_rand.randrange(vAddrToSend.size())] =
                    _addr;
            } else {
                vAddrToSend.push_back(_addr);
            }
        }
    }

    //用來發送inventory訊息
    void PushInventory(const CInv &inv) {
        LOCK(cs_inventory);
        if (inv.type == MSG_TX) {
            if (!filterInventoryKnown.contains(inv.hash)) {
                setInventoryTxToSend.insert(inv.hash);
            }
        } else if (inv.type == MSG_BLOCK) {
            vInventoryBlockToSend.push_back(inv.hash);
        }
    }

    void PushBlockHash(const uint256 &hash) {
        LOCK(cs_inventory);
        vBlockHashesToAnnounce.push_back(hash);
    }
};

傳送訊息

CDataStream這個類主要是包裝了一個帶有雙向緩衝區的介面, 它過載了 >> 和 <<,使用上述序列化讀取和寫入未格式化的資料模板,以線性時間填充資料;

class CDataStream {
protected:
    typedef CSerializeData vector_type;
    vector_type vch;
    unsigned int nReadPos;

    int nType;
    int nVersion;

public:
    template <typename T> CDataStream &operator<<(const T &obj) {
        // Serialize to this stream
        ::Serialize(*this, obj);
        return (*this);
    }

    template <typename T> CDataStream &operator>>(T &obj) {
        // Unserialize from this stream
        ::Unserialize(*this, obj);
        return (*this);
    }
    
    void read(char *pch, size_t nSize) {
        if (nSize == 0) {
            return;
        }

        // Read from the beginning of the buffer
        unsigned int nReadPosNext = nReadPos + nSize;
        if (nReadPosNext >= vch.size()) {
            if (nReadPosNext > vch.size()) {
                throw std::ios_base::failure(
                    "CDataStream::read(): end of data");
            }
            memcpy(pch, &vch[nReadPos], nSize);
            nReadPos = 0;
            vch.clear();
            return;
        }
        memcpy(pch, &vch[nReadPos], nSize);
        nReadPos = nReadPosNext;
    }

    void write(const char *pch, size_t nSize) {
        // Write to the end of the buffer
        vch.insert(vch.end(), pch, pch + nSize);
    }

所以,當我們需要傳送訊息時,首先會把資料放到CDataStream的資料流中,構造好完整的訊息,但此時的訊息格式是網路無法識別的,下一步,將構造好的訊息放入到CSerializeData(類似一個訊息佇列)進行序列化,序列化之後,我們就可以把訊息放到SocketSendData中傳送出去。

CSerializeData 的格式如下:

// Byte-vector that clears its contents before deletion.
typedef std::vector<char, zero_after_free_allocator<char>> CSerializeData;

SocketSendData 的定義如下:

size_t CConnman::SocketSendData(CNode *pnode) const {
    AssertLockHeld(pnode->cs_vSend);
    size_t nSentSize = 0;
    size_t nMsgCount = 0;

    for (const auto &data : pnode->vSendMsg) {
        assert(data.size() > pnode->nSendOffset);
        int nBytes = 0;
         ...       
    }
    pnode->vSendMsg.erase(pnode->vSendMsg.begin(),
                          pnode->vSendMsg.begin() + nMsgCount);
    if (pnode->vSendMsg.empty()) {
        assert(pnode->nSendOffset == 0);
        assert(pnode->nSendSize == 0);
    }
    return nSentSize;
}

接收訊息

接收訊息的工作,主要是由 ThreadSocketHandler 來完成的,

if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) {
     pnode->CloseSocketDisconnect();
}

隨後,通過 ReceiveMsgBytes 把從其它節點接收到的資料解析為單個數據,然後放回到訊息佇列,最後由ThreadMessageHandler來進行最後的處理。

ReceiveMsgBytes解析資料的主要流程如下,呼叫的是CNetMessage下的readHeader和readData方法,隨後,使用complete()進行一次判定,看解析是否完成:

// Absorb network data.
        int handled;
        if (!msg.in_data) {
            handled = msg.readHeader(pch, nBytes);
        } else {
            handled = msg.readData(pch, nBytes);
        }

readHeader

readHeader 主要用來解析訊息頭,由上一篇文章我們能夠知道,一個訊息頭,至少24位元組,如果小於24位元組直接退出,如果滿足這個條件,先把接收到的資料的開始部分複製到訊息頭資料流中(hdrbuf),再反格式化成訊息頭(hdr)。訊息資料最大為MAX_SIZE(0x02000000),如果大於這個值,證明出錯,直接退出。

int CNetMessage::readHeader(const char *pch, unsigned int nBytes) {
    // copy data to temporary parsing buffer
    unsigned int nRemaining = 24 - nHdrPos;
    unsigned int nCopy = std::min(nRemaining, nBytes);

    memcpy(&hdrbuf[nHdrPos], pch, nCopy);
    nHdrPos += nCopy;

    // if header incomplete, exit
    if (nHdrPos < 24) {
        return nCopy;
    }

    // deserialize to CMessageHeader
    try {
        hdrbuf >> hdr;
    } catch (const std::exception &) {
        return -1;
    }

    // reject messages larger than MAX_SIZE
    if (hdr.nMessageSize > MAX_SIZE) {
        return -1;
    }

    // switch state to reading message data
    in_data = true;

    return nCopy;
}

readData

readData 主要用來解析訊息體,訊息的資料部分複製到訊息資料流中(vRecv)來處理,如果 vRecv 的空間不夠,會進行擴容,但最多分配256 KB,不能超過總訊息大小。

int CNetMessage::readData(const char *pch, unsigned int nBytes) {
    unsigned int nRemaining = hdr.nMessageSize - nDataPos;
    unsigned int nCopy = std::min(nRemaining, nBytes);

    if (vRecv.size() < nDataPos + nCopy) {
        // Allocate up to 256 KiB ahead, but never more than the total message
        // size.
        vRecv.resize(std::min(hdr.nMessageSize, nDataPos + nCopy + 256 * 1024));
    }

    hasher.Write((const uint8_t *)pch, nCopy);
    memcpy(&vRecv[nDataPos], pch, nCopy);
    nDataPos += nCopy;

    return nCopy;
}

緩衝區

在 net.h 檔案中,我們能夠看到如下定義:

//接收訊息緩衝區
static const size_t DEFAULT_MAXRECEIVEBUFFER = 5 * 1000;
//傳送訊息緩衝區
static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000;

我們將接收或者傳送的資料放入到緩衝區,我們可以通過如下函式,分別對他們呼叫,加速我們的處理過程:

unsigned int CConnman::GetReceiveFloodSize() const {
    return nReceiveFloodSize;
}
unsigned int CConnman::GetSendBufferSize() const {
    return nSendBufferMaxSize;
}

祝您生活愉快!