C++從零開始區塊鏈:P2P模組之UDP資料包分組排序
阿新 • • 發佈:2018-11-03
udp的特點是不可靠,不連線,資料發過去就完事,至於對方收沒收到就不管了。
在使用udp進行通訊的時候,要在應用層做分組、排序、組包、校驗等工作。
傳送方現將要傳送的資料切片,所有的切片組成一組,標上組號,每個切片根據原始資料的順序有個組內編號。
傳送的時候,每次傳送一個切片,並等待接收資料接收方反饋收到資料的資訊,如果沒有接收到資料接收方的反饋資訊就繼續傳送,接收到資料接收方傳送的反饋資訊位置。
資料接收方接收到資料後先給傳送方傳送一個數據接收成功的反饋,然後對資料進行分組儲存。儲存之前要先檢驗一下該塊資料是否已經接受過了,因為雖然接收方成功的接收到了資料,併發送了反饋資訊給傳送方,但傳送方可能沒有接收到反饋資訊,傳送方會判定是傳送失敗,從而再次傳送該塊資料,從而導致了接收方對同一塊資料重複接收了兩次。
當接收方接收到的資料塊數與傳送方傳送的總塊數相同時,說明該資料已經全部接收完成,對資料進行排序組合,最後再進行校驗,至此一個數據傳送完成。
為了組號處理方便,直接用完整資訊的hash作為組號,資料重新組裝完成後可以直接用組號進行校驗。每一組存到一個map裡,有map根據序號自動排序。
//資料切片
void P2PNode::sendBlockChain()
{
P2PMessage mess;
int i = 0;
BlockChain *bc = BlockChain::Instance();
std::string strBcJson = bc->GetJsonFromBlockList();
std::string strBcHash = Cryptography::GetHash(strBcJson.c_str(), strBcJson.length());
int total = strBcJson.length() / MAX_P2P_SIZE + 1 ;
for (i = 0; i < total - 1; ++i)
{
memset(&mess, 0, sizeof(mess));
mess.cmd = p2p_blockchain;
mess.index = i;
mess.total = total;
mess.length = MAX_P2P_SIZE;
strcpy(mess.messHash, strBcHash.c_str());
strncpy(mess.mess, strBcJson.c_str() + i * MAX_P2P_SIZE, MAX_P2P_SIZE);
sendMessage(mess);
}
memset (&mess, 0, sizeof(mess));
mess.cmd = p2p_blockchain;
mess.index = i;
mess.total = total;
mess.length = strBcJson.length() % MAX_P2P_SIZE;
strcpy(mess.messHash, strBcHash.c_str());
strncpy(mess.mess, strBcJson.c_str() + i * MAX_P2P_SIZE, mess.length);
sendMessage(mess);
}
//資料傳送
void P2PNode::sendMessage(P2PMessage &mess)
{
sockaddr_in otherAddr;
memset(&otherAddr, 0, sizeof(otherAddr));
otherAddr.sin_family = AF_INET;
otherAddr.sin_port = htons(m_otherPort);
otherAddr.sin_addr.s_addr = inet_addr(m_otherIP);
P2PResult result;
result.index = mess.index;
memcpy(result.messHash, mess.messHash, sizeof(result.messHash));
while (1)
{
if (sendto(m_sock, &mess, sizeof(P2PMessage), 0, (struct sockaddr*)&otherAddr, sizeof(otherAddr)) < 0)
{
perror("sendto");
close(m_sock);
exit(4);
}
sleep(1);
pthread_mutex_lock(&m_mutexResult);
if (m_lstResult.end() == std::find(m_lstResult.begin(), m_lstResult.end(), result))
{
pthread_mutex_unlock(&m_mutexResult);
continue;
}
m_lstResult.remove(result);
pthread_mutex_unlock(&m_mutexResult);
break;
}
}
//資料接收
void P2PNode::combinationPackage(P2PMessage &mess)
{
Package package;
int index = mess.index;
std::list<Package>::iterator it;
pthread_mutex_lock(&m_mutexPack);
for (it = m_lstPackage.begin(); it != m_lstPackage.end(); ++it)
{
if (!strcmp(it->messHash, mess.messHash))
break;
}
if (it == m_lstPackage.end())
{
package.total = mess.total;
package.cmd = mess.cmd;
memcpy(package.messHash, mess.messHash, sizeof(package.messHash));
package.mapMess.insert(std::pair<int, std::string>(index, std::string(mess.mess, mess.length)));
m_lstPackage.push_back(package);
}
else
{
it->mapMess.insert(std::pair<int, std::string>(index, std::string(mess.mess, mess.length)));
package = *it;
}
pthread_mutex_unlock(&m_mutexPack);
if (package.total == (int)package.mapMess.size())
{
std::string str;
std::map<int, std::string>::iterator mapIt;
for (mapIt = package.mapMess.begin(); mapIt != package.mapMess.end(); ++mapIt)
{
str += mapIt->second;
}
BlockChain *blockChain = BlockChain::Instance();
switch (package.cmd)
{
case p2p_transaction:
{
BroadcastMessage bm;
memset(&bm, 0, sizeof(bm));
memcpy(&bm, str.c_str(), str.length());
std::string strHash = ShaCoin::Cryptography::GetHash(bm.json, strlen(bm.json));
Transactions ts = blockChain->GetTransactionsFromJson(bm.json);
int balan = blockChain->CheckBalances(ts.sender);
if (balan < ts.amount)
break;
if (Cryptography::Verify(bm.pubkey, strHash.c_str(), strHash.length(), bm.sign, sizeof(bm.sign), bm.signlen) < 1)
break;
blockChain->InsertTransactions(ts);
}
break;
case p2p_bookkeeping:
{
BroadcastMessage bm;
memset(&bm, 0, sizeof(bm));
memcpy(&bm, str.c_str(), str.length());
Block block = blockChain->GetBlockFromJson(std::string(bm.json, strlen(bm.json)));
if (block.proof > blockChain->GetLastBlock().proof && blockChain->WorkloadVerification(block.proof))
{
blockChain->DeleteDuplicateTransactions(block);
blockChain->InsertBlock(block);
}
}
break;
case p2p_result:
break;
case p2p_merge:
sendBlockChain();
break;
case p2p_blockchain:
blockChain->MergeBlockChain(str);
break;
default:
break;
}
m_lstPackage.remove(package);
}
}