1. 程式人生 > >C++從零開始區塊鏈:P2P模組之UDP資料包分組排序

C++從零開始區塊鏈:P2P模組之UDP資料包分組排序

udp的特點是不可靠,不連線,資料發過去就完事,至於對方收沒收到就不管了。
在使用udp進行通訊的時候,要在應用層做分組、排序、組包、校驗等工作。
傳送方現將要傳送的資料切片,所有的切片組成一組,標上組號,每個切片根據原始資料的順序有個組內編號。
傳送的時候,每次傳送一個切片,並等待接收資料接收方反饋收到資料的資訊,如果沒有接收到資料接收方的反饋資訊就繼續傳送,接收到資料接收方傳送的反饋資訊位置。
資料接收方接收到資料後先給傳送方傳送一個數據接收成功的反饋,然後對資料進行分組儲存。儲存之前要先檢驗一下該塊資料是否已經接受過了,因為雖然接收方成功的接收到了資料,併發送了反饋資訊給傳送方,但傳送方可能沒有接收到反饋資訊,傳送方會判定是傳送失敗,從而再次傳送該塊資料,從而導致了接收方對同一塊資料重複接收了兩次。
當接收方接收到的資料塊數與傳送方傳送的總塊數相同時,說明該資料已經全部接收完成,對資料進行排序組合,最後再進行校驗,至此一個數據傳送完成。
為了組號處理方便,直接用完整資訊的hash作為組號,資料重新組裝完成後可以直接用組號進行校驗。每一組存到一個map裡,有map根據序號自動排序。

//資料切片
void P2PNode::sendBlockChain()
{
    P2PMessage mess;
    int i = 0;
    BlockChain *bc = BlockChain::Instance();
    std::string strBcJson = bc->GetJsonFromBlockList();
    std::string strBcHash = Cryptography::GetHash(strBcJson.c_str(), strBcJson.length());
    int total = strBcJson.length() / MAX_P2P_SIZE + 1
; for (i = 0; i < total - 1; ++i) { memset(&mess, 0, sizeof(mess)); mess.cmd = p2p_blockchain; mess.index = i; mess.total = total; mess.length = MAX_P2P_SIZE; strcpy(mess.messHash, strBcHash.c_str()); strncpy(mess.mess, strBcJson.c_str() + i * MAX_P2P_SIZE, MAX_P2P_SIZE); sendMessage(mess); } memset
(&mess, 0, sizeof(mess)); mess.cmd = p2p_blockchain; mess.index = i; mess.total = total; mess.length = strBcJson.length() % MAX_P2P_SIZE; strcpy(mess.messHash, strBcHash.c_str()); strncpy(mess.mess, strBcJson.c_str() + i * MAX_P2P_SIZE, mess.length); sendMessage(mess); } //資料傳送 void P2PNode::sendMessage(P2PMessage &mess) { sockaddr_in otherAddr; memset(&otherAddr, 0, sizeof(otherAddr)); otherAddr.sin_family = AF_INET; otherAddr.sin_port = htons(m_otherPort); otherAddr.sin_addr.s_addr = inet_addr(m_otherIP); P2PResult result; result.index = mess.index; memcpy(result.messHash, mess.messHash, sizeof(result.messHash)); while (1) { if (sendto(m_sock, &mess, sizeof(P2PMessage), 0, (struct sockaddr*)&otherAddr, sizeof(otherAddr)) < 0) { perror("sendto"); close(m_sock); exit(4); } sleep(1); pthread_mutex_lock(&m_mutexResult); if (m_lstResult.end() == std::find(m_lstResult.begin(), m_lstResult.end(), result)) { pthread_mutex_unlock(&m_mutexResult); continue; } m_lstResult.remove(result); pthread_mutex_unlock(&m_mutexResult); break; } } //資料接收 void P2PNode::combinationPackage(P2PMessage &mess) { Package package; int index = mess.index; std::list<Package>::iterator it; pthread_mutex_lock(&m_mutexPack); for (it = m_lstPackage.begin(); it != m_lstPackage.end(); ++it) { if (!strcmp(it->messHash, mess.messHash)) break; } if (it == m_lstPackage.end()) { package.total = mess.total; package.cmd = mess.cmd; memcpy(package.messHash, mess.messHash, sizeof(package.messHash)); package.mapMess.insert(std::pair<int, std::string>(index, std::string(mess.mess, mess.length))); m_lstPackage.push_back(package); } else { it->mapMess.insert(std::pair<int, std::string>(index, std::string(mess.mess, mess.length))); package = *it; } pthread_mutex_unlock(&m_mutexPack); if (package.total == (int)package.mapMess.size()) { std::string str; std::map<int, std::string>::iterator mapIt; for (mapIt = package.mapMess.begin(); mapIt != package.mapMess.end(); ++mapIt) { str += mapIt->second; } BlockChain *blockChain = BlockChain::Instance(); switch (package.cmd) { case p2p_transaction: { BroadcastMessage bm; memset(&bm, 0, sizeof(bm)); memcpy(&bm, str.c_str(), str.length()); std::string strHash = ShaCoin::Cryptography::GetHash(bm.json, strlen(bm.json)); Transactions ts = blockChain->GetTransactionsFromJson(bm.json); int balan = blockChain->CheckBalances(ts.sender); if (balan < ts.amount) break; if (Cryptography::Verify(bm.pubkey, strHash.c_str(), strHash.length(), bm.sign, sizeof(bm.sign), bm.signlen) < 1) break; blockChain->InsertTransactions(ts); } break; case p2p_bookkeeping: { BroadcastMessage bm; memset(&bm, 0, sizeof(bm)); memcpy(&bm, str.c_str(), str.length()); Block block = blockChain->GetBlockFromJson(std::string(bm.json, strlen(bm.json))); if (block.proof > blockChain->GetLastBlock().proof && blockChain->WorkloadVerification(block.proof)) { blockChain->DeleteDuplicateTransactions(block); blockChain->InsertBlock(block); } } break; case p2p_result: break; case p2p_merge: sendBlockChain(); break; case p2p_blockchain: blockChain->MergeBlockChain(str); break; default: break; } m_lstPackage.remove(package); } }