1. 程式人生 > >比特幣bitcoin原始碼解析之整體架構和流程

比特幣bitcoin原始碼解析之整體架構和流程

1. 比特幣簡介

比特幣(BitCoin)的概念最初由中本聰在2009年提出,根據中本聰的思路設計釋出的開源軟體以及建構其上的P2P網路。比特幣是一種P2P形式的數字貨幣。點對點的傳輸意味著一個去中心化的支付系統。

與大多數貨幣不同,比特幣不依靠特定貨幣機構發行,它依據特定演算法,通過大量的計算產生,比特幣經濟使用整個P2P網路中眾多節點構成的分散式資料庫來確認並記錄所有的交易行為,並使用密碼學的設計來確保貨幣流通各個環節安全性。P2P的去中心化特性與演算法本身可以確保無法通過大量製造比特幣來人為操控幣值。基於密碼學的設計可以使比特幣只能被真實的擁有者轉移或支付。這同樣確保了貨幣所有權與流通交易的匿名性。比特幣與其他虛擬貨幣最大的不同,是其總數量非常有限,具有極強的稀缺性。該貨幣系統曾在4年內只有不超過1050萬個,之後的總數量將被永久限制在2100萬個。

2. 比特幣整體架構

點對點網路,每一個網路中的節點即是Client又是Server,如下圖所示:整體P2P架構
* 節點和節點之間通過傳送訊息命令來相互通訊的,下圖是對應的訊息命令格式:訊息命令格式
CMessageHeader對應的類圖如下所示:CMessageHeader類圖
* 節點通訊過程中使用的對應的訊息命令如下表所示:

命令 訊息內容 說明
version nVersion,nService,nTime,address 版本,服務標識,時間;地址
addr vector< CAddress> 地址列表
inv vector< CInv> 庫存資訊列表:將對應的庫存傳送訊息增加到庫存傳送已知中
getdata vector< CInv> 根據inv對應的type執行不同的處理,就是將對應的請求訊息轉化為待請求的命令放入到節點對應的傳送訊息快取中
getblocks CBlockLocator locator;uint256 hashStop; 根據locator定位區塊在鏈上位置,從這個位置開始往後找(找對應的next),一直到對應塊的block的hash等於hashStop為止,並將所有找到的區塊傳送庫存setInventoryKnown2中等待被髮送
Tx CTransaction 將交易訊息放入到對應的已知庫存中,如果此交易能夠被接受,則對此訊息進行轉播,並遞迴處理所有依賴這個交易對應的孤兒交易;如果交易不被接受,則將次交易放入到孤兒交易列表中mapOrphanTransactions和mapOrphanTransactionsByPrev中
block CBlock 塊訊息:將接收的block放入對應的已知庫存中,並對應這個塊進行處理,並將此塊從mapAlreadyAskedFor諮詢中移除

3. 比特幣整體處理流程

對應的整體流程如下圖所示:整體流程

3.1 LoadAdreess方法

LoadAdreess方法從地址­檔案addr.dat中讀取對應的地址資訊,放入對應的全域性記憶體物件mapAddresses中,其中addr.dat對應的結構如下:

Key Value 說明 記憶體中存放物件
addr CAddress 資料庫中儲存對應的地址新 map< vector,CAddress>mapAddresses節點地址對映:key對應的是ip地址+埠,value是CAddress物件

3.2 LoadBlockIndex方法

LoadBlockIndex方法從塊索引檔案blkindex.dat中讀取對應的塊索引資訊,放入對應的全域性記憶體物件mapBlockIndex中,其中blkindex.dat對應的結構如下:

Key Value 說明 記憶體中存放物件
blockindex CDiskBlockIndex 塊索引資料庫 map< uint256, CBlockIndex*> mapBlockIndex塊索引資訊:其中key對應的block的hash值

對應的原始碼如下:

bool LoadBlockIndex(bool fAllowNew)
{
    //
    // Load block index
    //
    CTxDB txdb("cr");
    if (!txdb.LoadBlockIndex())
        return false;
    txdb.Close();

    //
    // Init with genesis block
    //
    if (mapBlockIndex.empty())
    {
        if (!fAllowNew)
            return false;


        // Genesis Block:
        // GetHash()      = 0x000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f
        // hashMerkleRoot = 0x4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b
        // txNew.vin[0].scriptSig     = 486604799 4 0x736B6E616220726F662074756F6C69616220646E6F63657320666F206B6E697262206E6F20726F6C6C65636E61684320393030322F6E614A2F33302073656D695420656854
        // txNew.vout[0].nValue       = 5000000000
        // txNew.vout[0].scriptPubKey = 0x5F1DF16B2B704C8A578D0BBAF74D385CDE12C11EE50455F3C438EF4C3FBCF649B6DE611FEAE06279A60939E028A8D65C10B73071A6F16719274855FEB0FD8A6704 OP_CHECKSIG
        // block.nVersion = 1
        // block.nTime    = 1231006505
        // block.nBits    = 0x1d00ffff
        // block.nNonce   = 2083236893
        // CBlock(hash=000000000019d6, ver=1, hashPrevBlock=00000000000000, hashMerkleRoot=4a5e1e, nTime=1231006505, nBits=1d00ffff, nNonce=2083236893, vtx=1)
        //   CTransaction(hash=4a5e1e, ver=1, vin.size=1, vout.size=1, nLockTime=0)
        //     CTxIn(COutPoint(000000, -1), coinbase 04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73)
        //     CTxOut(nValue=50.00000000, scriptPubKey=0x5F1DF16B2B704C8A578D0B)
        //   vMerkleTree: 4a5e1e

        // Genesis block
        char* pszTimestamp = "The Times 03/Jan/2009 Chancellor on brink of second bailout for banks";
        CTransaction txNew;
        txNew.vin.resize(1);
        txNew.vout.resize(1);
        txNew.vin[0].scriptSig     = CScript() << 486604799 << CBigNum(4) << vector<unsigned char>((unsigned char*)pszTimestamp, (unsigned char*)pszTimestamp + strlen(pszTimestamp));
        txNew.vout[0].nValue       = 50 * COIN;
        txNew.vout[0].scriptPubKey = CScript() << CBigNum("0x5F1DF16B2B704C8A578D0BBAF74D385CDE12C11EE50455F3C438EF4C3FBCF649B6DE611FEAE06279A60939E028A8D65C10B73071A6F16719274855FEB0FD8A6704") << OP_CHECKSIG;
        CBlock block;
        block.vtx.push_back(txNew);
        block.hashPrevBlock = 0;
        block.hashMerkleRoot = block.BuildMerkleTree();
        block.nVersion = 1;
        block.nTime    = 1231006505;
        block.nBits    = 0x1d00ffff;
        block.nNonce   = 2083236893;

            //// debug print, delete this later
            printf("%s\n", block.GetHash().ToString().c_str());
            printf("%s\n", block.hashMerkleRoot.ToString().c_str());
            printf("%s\n", hashGenesisBlock.ToString().c_str());
            txNew.vout[0].scriptPubKey.print();
            block.print();
            assert(block.hashMerkleRoot == uint256("0x4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b"));

        assert(block.GetHash() == hashGenesisBlock);

        // Start new block file
        unsigned int nFile;
        unsigned int nBlockPos;
        if (!block.WriteToDisk(!fClient, nFile, nBlockPos))
            return error("LoadBlockIndex() : writing genesis block to disk failed");
        if (!block.AddToBlockIndex(nFile, nBlockPos))
            return error("LoadBlockIndex() : genesis block not accepted");
    }

    return true;
}

3.3 LoadWallet方法

LoadWallet方法從檔案wallet.dat中讀取對應的錢包交易資訊和其他的配置資訊,放入到不同的全域性記憶體物件中,其中wallet.dat對應的格式(key-value結構)如下所示:

Key Value 說明 記憶體中存放物件
Type Key1 Key分為型別型別和key
name 比特幣地址 比特幣地址對應名稱 地址和名稱之間對映 map< string,string> mapAddressBook地址和名稱的對映,其中key為地址,value為名稱
Tx 交易hash值 錢包交易物件CWalletTx 交易hash與交易之間對映 map< uint256, CWalletTx>mapWalle錢包交易對應的map,其中key對應的錢包交易的hash值
Key 公鑰 私鑰 公鑰和私鑰對應關係 map< vector< unsigned char>, CPrivKey> mapKeys公鑰和私鑰對應的對映關係,其中key為公鑰,value為私鑰;map< uint160, vector > mapPubKeys公鑰的hash值和公鑰的關係,其中key為公鑰的hash值,value為公鑰
defaultkey vector< unsigned char> vchDefaultKeyRet 預設key對應對應的值
setting fGenerateBitcoins 是否產生比特幣標記 是否挖礦標記 int fGenerateBitcoins
setting nTransactionFee 交易手續費的值 交易手續費 int64 nTransactionFee
setting addrIncoming CAddress物件 獲得當前對應的外部地址,用於接收外部的連線 CAddress addrIncoming

原始碼內容如下:

//
// CWalletDB
//

bool CWalletDB::LoadWallet(vector<unsigned char>& vchDefaultKeyRet)
{
    vchDefaultKeyRet.clear();

    //// todo: shouldn't we catch exceptions and try to recover and continue?
    CRITICAL_BLOCK(cs_mapKeys)
    CRITICAL_BLOCK(cs_mapWallet)
    {
        // Get cursor
        Dbc* pcursor = GetCursor();
        if (!pcursor)
            return false;

        loop
        {
            // Read next record
            CDataStream ssKey;
            CDataStream ssValue;
            int ret = ReadAtCursor(pcursor, ssKey, ssValue);
            if (ret == DB_NOTFOUND)
                break;
            else if (ret != 0)
                return false;

            // Unserialize
            // Taking advantage of the fact that pair serialization
            // is just the two items serialized one after the other
            string strType;
            ssKey >> strType;
            if (strType == "name")
            {
                string strAddress;
                ssKey >> strAddress;
                ssValue >> mapAddressBook[strAddress];
            }
            else if (strType == "tx")
            {
                uint256 hash;
                ssKey >> hash;
                CWalletTx& wtx = mapWallet[hash];
                ssValue >> wtx;

                if (wtx.GetHash() != hash)
                    printf("Error in wallet.dat, hash mismatch\n");

                //// debug print
                //printf("LoadWallet  %s\n", wtx.GetHash().ToString().c_str());
                //printf(" %12I64d  %s  %s  %s\n",
                //    wtx.vout[0].nValue,
                //    DateTimeStr(wtx.nTime).c_str(),
                //    wtx.hashBlock.ToString().substr(0,14).c_str(),
                //    wtx.mapValue["message"].c_str());
            }
            else if (strType == "key")
            {
                vector<unsigned char> vchPubKey;
                ssKey >> vchPubKey;
                CPrivKey vchPrivKey;
                ssValue >> vchPrivKey;

                mapKeys[vchPubKey] = vchPrivKey;
                mapPubKeys[Hash160(vchPubKey)] = vchPubKey;
            }
            else if (strType == "defaultkey")
            {
                ssValue >> vchDefaultKeyRet;
            }
            else if (strType == "setting")  /// or settings or option or options or config?
            {
                string strKey;
                ssKey >> strKey;
                if (strKey == "fGenerateBitcoins")  ssValue >> fGenerateBitcoins;
                if (strKey == "nTransactionFee")    ssValue >> nTransactionFee;
                if (strKey == "addrIncoming")       ssValue >> addrIncoming;
            }
        }
    }

    printf("fGenerateBitcoins = %d\n", fGenerateBitcoins);
    printf("nTransactionFee = %I64d\n", nTransactionFee);
    printf("addrIncoming = %s\n", addrIncoming.ToString().c_str());

    return true;
}

3.4節點通訊執行緒

啟動了三個主要的執行緒,ThreadOpenConnections,ThreadSocketHandler和ThreadMessageHandler,這個三個執行緒對應的主要處理流程分別如下圖所示:

3.4.1執行緒ThreadOpenConnections

執行緒ThreadOpenConnections對應的處理流程如下圖所示:ThreadOpenConnections處理流程
原始碼如下:

// 對於每一個開啟節點的連結,進行節點之間資訊通訊,獲得節點對應的最新資訊,比如節點對應的知道地址進行交換等
void ThreadOpenConnections2(void* parg)
{
    printf("ThreadOpenConnections started\n");

    // 初始化網路連線
    // Initiate network connections
    const int nMaxConnections = 15; // 最大連線數
    loop
    {
        // Wait
        vfThreadRunning[1] = false;
        Sleep(500);
        while (vNodes.size() >= nMaxConnections || vNodes.size() >= mapAddresses.size())
        {
            CheckForShutdown(1);
            Sleep(2000);
        }
        vfThreadRunning[1] = true;
        CheckForShutdown(1);


        // Ip對應的C類地址,相同的C類地址放在一起
        // Make a list of unique class C's
        unsigned char pchIPCMask[4] = { 0xff, 0xff, 0xff, 0x00 };
        unsigned int nIPCMask = *(unsigned int*)pchIPCMask;
        vector<unsigned int> vIPC;
        CRITICAL_BLOCK(cs_mapAddresses)
        {
            vIPC.reserve(mapAddresses.size());
            unsigned int nPrev = 0;
            // mapAddress已經進行排序了,預設是生效排序
            foreach(const PAIRTYPE(vector<unsigned char>, CAddress)& item, mapAddresses)
            {
                const CAddress& addr = item.second;
                if (!addr.IsIPv4())
                    continue;

                // Taking advantage of mapAddresses being in sorted order,
                // with IPs of the same class C grouped together.
                unsigned int ipC = addr.ip & nIPCMask;
                if (ipC != nPrev)
                    vIPC.push_back(nPrev = ipC);
            }
        }

        // IP選擇的過程
        // The IP selection process is designed to limit vulnerability致命性 to address flooding.
        // Any class C (a.b.c.?) has an equal chance of being chosen, then an IP is
        // chosen within the class C.  An attacker may be able to allocate many IPs, but
        // they would normally be concentrated in blocks of class C's.  They can hog獨佔 the
        // attention within their class C, but not the whole IP address space overall.
        // A lone node in a class C will get as much attention as someone holding all 255
        // IPs in another class C.
        //
        bool fSuccess = false;
        int nLimit = vIPC.size();
        while (!fSuccess && nLimit-- > 0)
        {
            // Choose a random class C 隨機獲取一個C級別的地址
            unsigned int ipC = vIPC[GetRand(vIPC.size())];

            // Organize all addresses in the class C by IP
            map<unsigned int, vector<CAddress> > mapIP;
            CRITICAL_BLOCK(cs_mapAddresses)
            {
                unsigned int nDelay = ((30 * 60) << vNodes.size());
                if (nDelay > 8 * 60 * 60)
                    nDelay = 8 * 60 * 60;
                /*
                map::lower_bound(key):返回map中第一個大於或等於key的迭代器指標
                map::upper_bound(key):返回map中第一個大於key的迭代器指標
                */
                for (map<vector<unsigned char>, CAddress>::iterator mi = mapAddresses.lower_bound(CAddress(ipC, 0).GetKey());
                     mi != mapAddresses.upper_bound(CAddress(ipC | ~nIPCMask, 0xffff).GetKey());
                     ++mi)
                {
                    const CAddress& addr = (*mi).second;
                    unsigned int nRandomizer = (addr.nLastFailed * addr.ip * 7777U) % 20000;
                    // 當前時間 - 地址連線最新失敗的時間 要大於對應節點重連的間隔時間
                    if (GetTime() - addr.nLastFailed > nDelay * nRandomizer / 10000)
                        mapIP[addr.ip].push_back(addr); //同一個地址區段不同地址: 同一個地址的不同埠,所有對應同一個ip會有多個地址
                }
            }
            if (mapIP.empty())
                break;

            // Choose a random IP in the class C
            map<unsigned int, vector<CAddress> >::iterator mi = mapIP.begin();
            boost::iterators::advance_adl_barrier::advance(mi, GetRand(mapIP.size())); // 將指標定位到隨機位置

            // 遍歷同一個ip對應的所有不同埠
            // Once we've chosen an IP, we'll try every given port before moving on
            foreach(const CAddress& addrConnect, (*mi).second)
            {
                // ip不能是本地ip,且不能是非ipV4地址,對應的ip地址不在本地的節點列表中
                if (addrConnect.ip == addrLocalHost.ip || !addrConnect.IsIPv4() || FindNode(addrConnect.ip))
                    continue;
                // 連結對應地址資訊的節點
                CNode* pnode = ConnectNode(addrConnect);
                if (!pnode)
                    continue;
                pnode->fNetworkNode = true; //設定對應的節點為網路節點,是因為從對應的本地節點列表中沒有查詢到

                // 如果本地主機地址能夠進行路由,則需要廣播我們的地址
                if (addrLocalHost.IsRoutable())
                {
                    // Advertise our address
                    vector<CAddress> vAddrToSend;
                    vAddrToSend.push_back(addrLocalHost);
                    pnode->PushMessage("addr", vAddrToSend); // 將訊息推送出去放入vsend中,在訊息處理執行緒中進行處理
                }

                // 從建立的節點獲得儘可能多的地址資訊,傳送訊息,在訊息處理執行緒中進行處理
                // Get as many addresses as we can
                pnode->PushMessage("getaddr");

                ////// should the one on the receiving end do this too?
                // Subscribe our local subscription list
                // 新建的節點要訂閱我們本地主機訂閱的對應通斷
                const unsigned int nHops = 0;
                for (unsigned int nChannel = 0; nChannel < pnodeLocalHost->vfSubscribe.size(); nChannel++)
                    if (pnodeLocalHost->vfSubscribe[nChannel])
                        pnode->PushMessage("subscribe", nChannel, nHops);

                fSuccess = true;
                break;
            }
        }
    }
}

3.4.2執行緒ThreadSocketHandler

執行緒ThreadSocketHandler對應的處理流程如下:ThreadSocketHandler處理流程
原始碼如下:

// socket 處理,parag對應的是本地節點開啟的監聽socket
void ThreadSocketHandler2(void* parg)
{
    printf("ThreadSocketHandler started\n");
    SOCKET hListenSocket = *(SOCKET*)parg; // 獲得監聽socket
    list<CNode*> vNodesDisconnected;
    int nPrevNodeCount = 0;

    loop
    {
        //
        // Disconnect nodes
        //
        CRITICAL_BLOCK(cs_vNodes)
        {
            // Disconnect duplicate connections 釋放同一個ip重複連結對應的節點,可能是不同埠
            map<unsigned int, CNode*> mapFirst;
            foreach(CNode* pnode, vNodes)
            {
                if (pnode->fDisconnect)
                    continue;
                unsigned int ip = pnode->addr.ip;
                // 本地主機ip地址對應的是0,所以所有的ip地址都應該大於這個ip
                if (mapFirst.count(ip) && addrLocalHost.ip < ip)
                {
                    // In case two nodes connect to each other at once,
                    // the lower ip disconnects its outbound connection
                    CNode* pnodeExtra = mapFirst[ip];

                    if (pnodeExtra->GetRefCount() > (pnodeExtra->fNetworkNode ? 1 : 0))
                        swap(pnodeExtra, pnode);

                    if (pnodeExtra->GetRefCount() <= (pnodeExtra->fNetworkNode ? 1 : 0))
                    {
                        printf("(%d nodes) disconnecting duplicate: %s\n", vNodes.size(), pnodeExtra->addr.ToString().c_str());
                        if (pnodeExtra->fNetworkNode && !pnode->fNetworkNode)
                        {
                            pnode->AddRef();
                            swap(pnodeExtra->fNetworkNode, pnode->fNetworkNode);
                            pnodeExtra->Release();
                        }
                        pnodeExtra->fDisconnect = true;
                    }
                }
                mapFirst[ip] = pnode;
            }
            // 斷開不使用的節點
            // Disconnect unused nodes
            vector<CNode*> vNodesCopy = vNodes;
            foreach(CNode* pnode, vNodesCopy)
            {
                // 節點準備釋放連結,並且對應的接收和傳送快取區都是空
                if (pnode->ReadyToDisconnect() && pnode->vRecv.empty() && pnode->vSend.empty())
                {
                    // 從節點列表中移除
                    // remove from vNodes
                    vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
                    pnode->Disconnect();

                    // 將對應準備釋放的節點放在對應的節點釋放連結池中,等待對應節點的所有引用釋放
                    // hold in disconnected pool until all refs are released
                    pnode->nReleaseTime = max(pnode->nReleaseTime, GetTime() + 5 * 60); // 向後推遲5分鐘
                    if (pnode->fNetworkNode)
                        pnode->Release();
                    vNodesDisconnected.push_back(pnode);
                }
            }

            // 刪除埠的連結的節點:刪除的條件是對應節點的引用小於等於0
            // Delete disconnected nodes
            list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
            foreach(CNode* pnode, vNodesDisconnectedCopy)
            {
                // wait until threads are done using it
                if (pnode->GetRefCount() <= 0)
                {
                    bool fDelete = false;
                    TRY_CRITICAL_BLOCK(pnode->cs_vSend)
                     TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
                      TRY_CRITICAL_BLOCK(pnode->cs_mapRequests)
                       TRY_CRITICAL_BLOCK(pnode->cs_inventory)
                        fDelete = true;
                    if (fDelete)
                    {
                        vNodesDisconnected.remove(pnode);
                        delete pnode;
                    }
                }
            }
        }
        if (vNodes.size() != nPrevNodeCount)
        {
            nPrevNodeCount = vNodes.size(); // 記錄前一次節點對應的數量
            MainFrameRepaint();
        }


        // 找出哪一個socket有資料要傳送
        // Find which sockets have data to receive
        //
        struct timeval timeout;
        timeout.tv_sec  = 0;
        timeout.tv_usec = 50000; // frequency to poll pnode->vSend 諮詢節點是否有資料要傳送的頻率

        struct fd_set fdsetRecv; // 記錄所有節點對應的socket控制代碼和監聽socket控制代碼
        struct fd_set fdsetSend; // 記錄所有有待發送訊息的節點對應的socket控制代碼
        FD_ZERO(&fdsetRecv);
        FD_ZERO(&fdsetSend);
        SOCKET hSocketMax = 0;
        FD_SET(hListenSocket, &fdsetRecv); // FD_SET將hListenSocket 放入fdsetRecv對應的陣列的最後
        hSocketMax = max(hSocketMax, hListenSocket);
        CRITICAL_BLOCK(cs_vNodes)
        {
            foreach(CNode* pnode, vNodes)
            {
                FD_SET(pnode->hSocket, &fdsetRecv);
                hSocketMax = max(hSocketMax, pnode->hSocket); // 找出所有節點對應的socket的最大值,包括監聽socket
                TRY_CRITICAL_BLOCK(pnode->cs_vSend)
                    if (!pnode->vSend.empty())
                        FD_SET(pnode->hSocket, &fdsetSend); // FD_SET 欄位設定
            }
        }

        vfThreadRunning[0] = false;
        // 函式參考:https://blog.csdn.net/rootusers/article/details/43604729
        /*確定一個或多個套介面的狀態,本函式用於確定一個或多個套介面的狀態,對每一個套介面,呼叫者可查詢它的可讀性、可寫性及錯誤狀態資訊,用fd_set結構來表示一組等待檢查的套介面,在呼叫返回時,這個結構存有滿足一定條件的套介面組的子集,並且select()返回滿足條件的套介面的數目。
            簡單來說select用來填充一組可用的socket控制代碼,當滿足下列之一條件時:
            1.可以讀取的sockets。當這些socket被返回時,在這些socket上執行recv/accept等操作不會產生阻塞;
            2.可以寫入的sockets。當這些socket被返回時,在這些socket上執行send等不會產生阻塞;
            3.返回有錯誤的sockets。
            select()的機制中提供一fd_set的資料結構,實際上市一long型別的陣列,每一個數組元素都能與一開啟的檔案控制代碼(或者是其他的socket控制代碼,檔案命名管道等)建立聯絡,建立聯絡的工作實際上由程式設計師完成,當呼叫select()的時候,由核心根據IO狀態修改fd_set的內容,由此來通知執行了select()的程序那一socket或檔案可讀。
        */
        int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, NULL, &timeout);
        vfThreadRunning[0] = true;
        CheckForShutdown(0);
        if (nSelect == SOCKET_ERROR)
        {
            int nErr = WSAGetLastError();
            printf("select failed: %d\n", nErr);
            for (int i = 0; i <= hSocketMax; i++)
            {
                FD_SET(i, &fdsetRecv); // 所有的值設定一遍
                FD_SET(i, &fdsetSend);
            }
            Sleep(timeout.tv_usec/1000);
        }
        // 隨機增加種子:效能計數
        RandAddSeed();

        //// debug print
        //foreach(CNode* pnode, vNodes)
        //{
        //    printf("vRecv = %-5d ", pnode->vRecv.size());
        //    printf("vSend = %-5d    ", pnode->vSend.size());
        //}
        //printf("\n");


        // 如果fdsetRecv中有監聽socket,則接收改監聽socket對應的連結請求,並將連結請求設定為新的節點
        // Accept new connections
        // 判斷髮送緩衝區中是否有對應的socket,如果有則接收新的交易
        if (FD_ISSET(hListenSocket, &fdsetRecv))
        {
            struct sockaddr_in sockaddr;
            int len = sizeof(sockaddr);
            SOCKET hSocket = accept(hListenSocket, (struct sockaddr*)&sockaddr, &len); // 接收socket連結
            CAddress addr(sockaddr);
            if (hSocket == INVALID_SOCKET)
            {
                if (WSAGetLastError() != WSAEWOULDBLOCK)
                    printf("ERROR ThreadSocketHandler accept failed: %d\n", WSAGetLastError());
            }
            else
            {
                printf("accepted connection from %s\n", addr.ToString().c_str());
                CNode* pnode = new CNode(hSocket, addr, true); // 有新的socket連結,則新建對應的節點,並將節點在加入本地節點列表中
                pnode->AddRef();
                CRITICAL_BLOCK(cs_vNodes)
                    vNodes.push_back(pnode);
            }
        }


        // 對每一個socket進行服務處理
        // Service each socket
        //
        vector<CNode*> vNodesCopy;
        CRITICAL_BLOCK(cs_vNodes)
            vNodesCopy = vNodes;
        foreach(CNode* pnode, vNodesCopy)
        {
            CheckForShutdown(0);
            SOCKET hSocket = pnode->hSocket; // 獲取每一個節點對應的socket

            // 從節點對應的socket中讀取對應的資料,將資料放入節點的接收緩衝區中
            // Receive
            //
            if (FD_ISSET(hSocket, &fdsetRecv))
            {
                TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
                {
                    CDataStream& vRecv = pnode->vRecv;
                    unsigned int nPos = vRecv.size();

                    // typical socket buffer is 8K-64K
                    const unsigned int nBufSize = 0x10000;
                    vRecv.resize(nPos + nBufSize);// 調整接收緩衝區的大小
                    int nBytes = recv(hSocket, &vRecv[nPos], nBufSize, 0);// 從socket中接收對應的資料
                    vRecv.resize(nPos + max(nBytes, 0));
                    if (nBytes == 0)
                    {
                        // socket closed gracefully (socket優雅的關閉)
                        if (!pnode->fDisconnect)
                            printf("recv: socket closed\n");
                        pnode->fDisconnect = true;
                    }
                    else if (nBytes < 0)
                    {
                        // socket error
                        int nErr = WSAGetLastError();
                        if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
                        {
                            if (!pnode->fDisconnect)
                                printf("recv failed: %d\n", nErr);
                            pnode->fDisconnect = true;
                        }
                    }
                }
            }

            // 將節點對應的傳送緩衝中的內容傳送出去
            // Send
            //
            if (FD_ISSET(hSocket, &fdsetSend))
            {
                TRY_CRITICAL_BLOCK(pnode->cs_vSend)
                {
                    CDataStream& vSend = pnode->vSend;
                    if (!vSend.empty())
                    {
                        int nBytes = send(hSocket, &vSend[0], vSend.size(), 0); // 從節點對應的傳送緩衝區中傳送資料出去
                        if (nBytes > 0)
                        {
                            vSend.erase(vSend.begin(), vSend.begin() + nBytes);// 從傳送緩衝區中移除傳送過的內容
                        }
                        else if (nBytes == 0)
                        {
                            if (pnode->ReadyToDisconnect())
                                pnode->vSend.clear();
                        }
                        else
                        {
                            printf("send error %d\n", nBytes);
                            if (pnode->ReadyToDisconnect())
                                pnode->vSend.clear();
                        }
                    }
                }
            }
        }
        Sleep(10);
    }
}

3.4.3執行緒ThreadMessageHandler

執行緒ThreadMessageHandler對應的處理流程:ThreadMessageHandler處理流程
原始碼如下:

// 訊息處理執行緒
void ThreadMessageHandler2(void* parg)
{
    printf("ThreadMessageHandler started\n");
    SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL);
    loop
    {
        // 輪詢連結的節點用於訊息處理
        // Poll the connected nodes for messages
        vector<CNode*> vNodesCopy;
        CRITICAL_BLOCK(cs_vNodes)
            vNodesCopy = vNodes;
        // 對每一個節點進行訊息處理:傳送訊息和接收訊息
        foreach(CNode* pnode, vNodesCopy)
        {
            pnode->AddRef();

            // Receive messages
            TRY_CRITICAL_BLOCK(pnode->cs_vRecv)
                ProcessMessages(pnode);

            // Send messages
            TRY_CRITICAL_BLOCK(pnode->cs_vSend)
                SendMessages(pnode);

            pnode->Release();
        }

        // Wait and allow messages to bunch up
        vfThreadRunning[2] = false;
        Sleep(100);
        vfThreadRunning[2] = true;
        CheckForShutdown(2);
    }
}
// 處理單個節點對應的訊息:單個節點接收到的訊息進行處理
bool ProcessMessages(CNode* pfrom)
{
    CDataStream& vRecv = pfrom->vRecv;
    if (vRecv.empty())
        return true;
    printf("ProcessMessages(%d bytes)\n", vRecv.size());

    // 同一個的訊息格式
    // Message format
    //  (4) message start
    //  (12) command
    //  (4) size
    //  (x) data
    //
    // 訊息頭包含:message start;command;size;

    loop
    {
        // Scan for message start
        CDataStream::iterator pstart = search(vRecv.begin(), vRecv.end(), BEGIN(pchMessageStart), END(pchMessageStart));
        // 刪除無效的訊息: 就是在對應的訊息開始前面還有一些資訊
        if (vRecv.end() - pstart < sizeof(CMessageHeader))
        {
            if (vRecv.size() > sizeof(CMessageHeader))
            {
                printf("\n\nPROCESSMESSAGE MESSAGESTART NOT FOUND\n\n");
                vRecv.erase(vRecv.begin(), vRecv.end() - sizeof(CMessageHeader));
            }
            break;
        }
        if (pstart - vRecv.begin() > 0)
            printf("\n\nPROCESSMESSAGE SKIPPED %d BYTES\n\n", pstart - vRecv.begin());
        vRecv.erase(vRecv.begin(), pstart); // 移除訊息開始資訊和接收緩衝區開頭之間

        // 讀取訊息頭
        // Read header
        CMessageHeader hdr;
        vRecv >> hdr; // 指標已經偏移了
        if (!hdr.IsValid())
        {
            printf("\n\nPROCESSMESSAGE: ERRORS IN HEADER %s\n\n\n", hdr.GetCommand().c_str());
            continue;
        }
        string strCommand = hdr.GetCommand();

        // Message size
        unsigned int nMessageSize = hdr.nMessageSize;
        if (nMessageSize > vRecv.size())
        {
            // Rewind and wait for rest of message
            ///// need a mechanism to give up waiting for overlong message size error
            printf("MESSAGE-BREAK 2\n");
            vRecv.insert(vRecv.begin(), BEGIN(hdr), END(hdr));
            Sleep(100);
            break;
        }

        // Copy message to its own buffer
        CDataStream vMsg(vRecv.begin(), vRecv.begin() + nMessageSize, vRecv.nType, vRecv.nVersion);
        vRecv.ignore(nMessageSize);

        // Process message
        bool fRet = false;
        try
        {
            CheckForShutdown(2);
            CRITICAL_BLOCK(cs_main)
                // 根據命令和訊息內容進行訊息處理
                fRet = ProcessMessage(pfrom, strCommand, vMsg);
            CheckForShutdown(2);
        }
        CATCH_PRINT_EXCEPTION("ProcessMessage()")
        if (!fRet)
            printf("ProcessMessage(%s, %d bytes) from %s to %s FAILED\n", strCommand.c_str(), nMessageSize, pfrom->addr.ToString().c_str(), addrLocalHost.ToString().c_str());
    }

    vRecv.Compact();
    return true;
}

// 對節點pFrom處理命令strCommand對應的訊息內容為vRecv
bool ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
{
    static map<unsigned int, vector<unsigned char> > mapReuseKey;
    printf("received: %-12s (%d bytes)  ", strCommand.c_str(), vRecv.size());
    // 僅僅輸出前25個字元
    for (int i = 0; i < min(vRecv.size(), (unsigned int)25); i++)
        printf("%02x ", vRecv[i] & 0xff);
    printf("\n");
    // 訊息採集頻率進行處理
    if (nDropMessagesTest > 0 && GetRand(nDropMessagesTest) == 0)
    {
        printf("dropmessages DROPPING RECV MESSAGE\n");
        return true;
    }

    // 如果命令是版本:節點對應的版本
    if (strCommand == "version")
    {
        // 節點對應的版本只能更新一次,初始為0,後面進行更新
        // Can only do this once
        if (pfrom->nVersion != 0)
            return false;

        int64 nTime;
        CAddress addrMe; // 讀取訊息對應的內容
        vRecv >> pfrom->nVersion >> pfrom->nServices >> nTime >> addrMe;
        if (pfrom->nVersion == 0)
            return false;
        // 更新發送和接收緩衝區中的對應的版本
        pfrom->vSend.SetVersion(min(pfrom->nVersion, VERSION));
        pfrom->vRecv.SetVersion(min(pfrom->nVersion, VERSION));

        // 如果節點對應的服務型別是節點網路,則對應節點的客戶端標記就是false
        pfrom->fClient = !(pfrom->nServices & NODE_NETWORK);
        if (pfrom->fClient)
        {
            // 如果不是節點網路,可能僅僅是一些節點不要儲存對應的完整區塊資訊,僅僅需要區塊的頭部進行校驗就可以了
            pfrom->vSend.nType |= SER_BLOCKHEADERONLY;
            pfrom->vRecv.nType |= SER_BLOCKHEADERONLY;
        }
        // 增加時間樣本資料:沒有什麼用處,僅僅用於輸出
        AddTimeData(pfrom->addr.ip, nTime);

        // 對第一個進來的節點請求block資訊
        // Ask the first connected node for block updates
        static bool fAskedForBlocks;
        if (!fAskedForBlocks && !pfrom->fClient)
        {
            fAskedForBlocks = true;
            pfrom->PushMessage("getblocks", CBlockLocator(pindexBest), uint256(0));
        }

        printf("version addrMe = %s\n", addrMe.ToString().c_str());
    }


    else if (pfrom->nVersion == 0)
    {
        // 節點在處理任何訊息之前一定有一個版本訊息
        // Must have a version message before anything else
        return false;
    }

    // 地址訊息
    else if (strCommand == "addr")
    {
        vector<CAddress> vAddr;
        vRecv >> vAddr;

        // Store the new addresses
        CAddrDB addrdb;
        foreach(const CAddress& addr, vAddr)
        {
            if (fShutdown)
                return true;
            // 將地址增加到資料庫中
            if (AddAddress(addrdb, addr))
            {
                // Put on lists to send to other nodes
                pfrom->setAddrKnown.insert(addr); // 將對應的地址插入到已知地址集合中
                CRITICAL_BLOCK(cs_vNodes)
                    foreach(CNode* pnode, vNodes)
                        if (!pnode->setAddrKnown.count(addr))
                            pnode->vAddrToSend.push_back(addr);// 地址的廣播
            }
        }
    }

    
            
           

相關推薦

bitcoin原始碼解析整體架構流程

1. 比特幣簡介 比特幣(BitCoin)的概念最初由中本聰在2009年提出,根據中本聰的思路設計釋出的開源軟體以及建構其上的P2P網路。比特幣是一種P2P形式的數字貨幣。點對點的傳輸意味著一個去中心化的支付系統。 與大多數貨幣不同,比特幣不依靠特定貨幣機構

bitcoin原始碼解析資料結構

1. CTxOutAn output of a transaction. It contains the public key that the next input must be able to sign with to claim it.CTxOut類圖欄位屬性說明nValue交易輸出對應的金額scr

區塊鏈BTC98Bitcoin原始碼安裝編譯

在這兒主要介紹Linux下的比特幣Bitcoin安裝,我們選擇ubuntu 12.04的環境,安裝相對容易得多。Windows下並不推薦,因為基於mingW配置相以繁瑣。 同時也可以參考build官方文件。    先拉下原始碼: 安裝Berkeley DB 4.8以上版本: sudo apt-get

Bitcoin原始碼安裝編譯

轉載自:http://www.cnblogs.com/wintersun/p/3813424.html        比特幣 (貨幣符號: ฿;英文名:Bitcoin;英文縮寫: BTC),是一種用於開源的P2P軟體而產生的電子貨幣。比特幣全域性圖是這樣的: 在這兒主要介紹Linux下的比特幣Bitco

代碼分析1 整體架構

分享 rpc image bitcoin ima tex tco nag blog Bitcoin 比特幣官方客戶端有兩個版本:一個是圖形界面的版本,通常被稱為 Bitcoin(首字母大寫),以及一個簡潔命令行的版本(稱為 bitcoind)。命令

從零開始學習開發(七)-P2P網路建立流程生成地址對並連線到指定地址

本節繼續講解比特幣P2P網路建立流程,這節講解的執行緒為’ThreadOpenAddedConnections’,它的作用是生成地址對並連線到指定地址。 本文可以結合比特幣系統啟動的的第12步的講解來看,可以更加系統的瞭解比特幣系統啟動的過程。 P2P 網路的建立是在比特幣系統啟動的第

從零開始學習(六)--P2P網路建立的流程查詢DNS節點

上節開始我們已經開始講解比特幣系統中P2P網路是如何建立的。還記得在比特幣系統啟動的的第12步的講解中,我們提到有幾個執行緒相關的處理非常重要嗎?以下內容正是基於此做了詳細的講解。由於篇幅過長,我們分幾篇文章依次道來。 P2P 網路的建立是在比特幣系統啟動的第 12 步,最後時刻呼叫 C

從零開始學習(五)--P2P網路建立的流程套接字的讀取傳送

寫在前面: 本篇文章接續 從零開始學習比特幣開發(四)–網路初始化,載入區塊鏈和錢包,匯入區塊啟動節點 從零開始學習區塊鏈技術(三)-接入比特幣網路的關鍵步驟解析、建立比特幣錢包,以及重要rpc指令 從零開始學習區塊鏈技術(二)–如何接入比特幣網路以及其原理分析 從零開始學習

BTC原始碼分析(1):地址生成過程

一、生成一個比特幣錢地址 二、根據原始碼整理比特幣地址生成過程 1、取得公鑰PubKey 2、使用 RIPEMD160(SHA256(PubKey)) 雜湊演算法,取公鑰並對其雜湊兩次 3、給雜湊加上地址生成演算法版本的字首 4、對於第二步生成的結果,使用SHA256(SHA256

BTC原始碼分析(0):環境搭建

一、Bitcoin編譯執行啟動過程 1、從Github上clone bitcoin原始碼 至本地 ~/go/src/github.com/bitcoin$git clone https://github.com/bitcoin/bitcoin.git Cloning into 'bi

bitcoin-cli轉賬與交易的api使用總結

需要使用到3個api,分別是 createrawtransaction(建立交易),signrawtransaction (簽名交易),sendrawtransaction(廣播交易), 2.1命令格式: createrawtransaction [{“txid”:”id”,”vout”:n},…] {“

Ubuntu下編譯安裝Bitcoin

一、安裝第三方庫:       1.安裝libssl, libevent, libboost庫等:         sudo apt-get install build-essential libtool autotools-dev automake pkg-conf

原始交易解析

在比特幣當中,一筆交易的構成並非簡單地由一個賬戶餘額減去所轉賬的數額,再由向另一個賬戶餘額新增相同的數額。比特幣的交易是由一系列被稱為opcode的指令所組成的指令碼。也就是說,這些交易資料構成了比特幣本身。在執行指令碼的過程中,比特幣會將這些指令一一壓入棧中,並計算它們是否合法。 那麼,比特幣是如何解析

(Bitcoin) POW難度調節機制

比特幣白皮書在工作量證明章節中解釋了工作量證明(PoW)的方式: 我們在區塊中補增一個隨機數(Nonce),這個隨機數要使得該給定區塊的隨機雜湊值出現了所需的那麼多個0。我們通過反覆嘗試來找到這個隨機數,直到找到為止,這樣我們就構建了一個工作量證明機制。只要該CP

區塊結構解析

前言 本文主要具體分析一個區塊的值,通過逐位元組分析,找出與比特幣區塊欄位對應的部分,我們就可以加深對比特幣區塊的瞭解。 準備工作 json格式資訊: { "hash": "00000000d1145790a8694403d4063f32

區塊鏈-bitcoin節點安裝及RPC呼叫

節點安裝: 從比特幣官網下載執行環境(bitcoin-0.16.2-win64.zip) 壓縮包內可執行檔案有:bitcoin-cli bitcoind  bitcoin-qt  bitcoin-tx test_bitcoin                   

程式碼分析8 區塊校驗確認

比特幣節點接收到一個區塊以後,都會進行校驗和確認,如下參考網路圖: 關鍵看看對區塊中的交易進行進一步的校驗程式碼:1.// First transaction must be coinbase, the rest must not be2.if (vtx.empty() || !vtx[0].IsCoinB

精通(第七章)【高階交易指令碼】

7.1介紹 在上一章中,我們介紹了比特幣交易的基本元素,並且查看了最常見的交易指令碼型別,即P2PKH指令碼。在本章中,我們將介紹更高階的指令碼,以及如何使用它來構建具有複雜條件的交易。 首先,我們將看看多重簽名指令碼。接下來,我們將檢查第二個最常見的交易指令碼

區塊鏈(1)—— 中的區塊、賬戶驗證記賬

上一篇說到比特幣是一種去中心化的電子現金系統。去中心化說起來似乎挺簡單,但是不用細想就會發現很多問題:賬本儲存在每個節點中,如何保證每個節點中的資料一致,或者說如何防止某些節點的賬本被惡意篡改而影響到整個網路的交易? 如果說交易的驗證由各個節點完成,那麼如何在不把密碼洩露給其

所有權及隱私問題 | 轉賬的加密流程

如果你對這個問題還不是很明白,那就一起來看看吧。銀行系統我們先來回顧下現實的銀行系統:首先我們需要把我們的個人資訊(如身份證)給銀行,銀行給我們開立相對應的賬戶,銀行在開戶的時候確立了對賬戶的所有權。進行支付的時候,銀行對交易雙方完成轉賬(銀行在開戶的時候已經知道我們對應的賬戶)。同時銀行會對賬戶資訊進行保密