1. 程式人生 > >一種基於Qt的可伸縮的全非同步C/S架構伺服器實現(五) 單層無中心叢集

一種基於Qt的可伸縮的全非同步C/S架構伺服器實現(五) 單層無中心叢集

五、單層無中心叢集

對40萬用戶規模以內的伺服器,使用星形的無中心連線是較為簡便的實現方式。分佈在各個物理伺服器上的服務程序共同工作,每個程序承擔若干連線。為了實現這個功能,需要解決幾個關鍵問題。

5.1、跨伺服器傳輸通道

        設計在高速區域網中的連線可直接採用TCP,並用第二章介紹的網路傳輸工具、第三章介紹的流水線執行緒池共同搭建。引用上述兩個工具的程式碼在cluster子資料夾的   zp_clusterterm.h中定義:

		ZPNetwork::zp_net_Engine * m_pClusterNet;
		ZPTaskEngine::zp_pipeline * m_pClusterEng;

        伺服器在專用的叢集網路中監聽,需要引數如下:

       1、監聽的地址、埠

       2、本節點唯一名稱

       3、對伺服器叢集內其他節點發布的連線埠、地址

       4、對公網客戶端釋出的連線埠、地址。

      比如,伺服器高速區域網網段可能是 10.129.XX.XX,而有些伺服器可能以虛擬機器(192.168.11.XX)+NAT(10.129.XX.XX)的方式在內網的子網中對映,因此,需要告訴別的伺服器節點,如何連線到自己。同時,對公網客戶端來說,每個伺服器的連線地址又不同了。很有可能也是通過NAT的方式,把數十個內網IP對映到一個外網出口的連續埠上去。這個策略的配置頁面如下:


      叢集的連線策略是,新的伺服器程序選取任意一個現有節點,連線後,通過叢集內廣播系統自動接收其它各個節點的地址,並繼續發起連線,直到與現有節點兩兩相通為止。  

     為了支援這個策略,叢集傳輸需要定義一些指令。

5.1.1 叢集指令

     叢集指令在 cluster 資料夾的cross_svr_message.h 定義:

#ifndef CROSS_SVR_MESSAGES_H
#define CROSS_SVR_MESSAGES_H
#include <qglobal.h>
namespace ZP_Cluster{
#pragma  pack (push,1)
	typedef struct tag_cross_svr_message{
		struct tag_header{
			quint16 Mark;    //Always be 0x1234
			quint8 messagetype;
			quint32 data_length;
		}  hearder;
		union uni_payload{
			quint8 data[1];
			struct tag_CSM_heartBeating{
				quint32 nClients;
			} heartBeating;
			struct tag_CSM_BasicInfo{
				quint8 name [64];
				quint8 Address_LAN[64];
				quint16 port_LAN;
				quint8 Address_Pub[64];
				quint16 port_Pub;
			} basicInfo;
			struct tag_CSM_Broadcast{
				quint8 name [64];
				quint8 Address_LAN[64];
				quint16 port_LAN;
				quint8 Address_Pub[64];
				quint16 port_Pub;
			} broadcastMsg[1];
		} payload;
	} CROSS_SVR_MSG;

#pragma pack(pop)
}

#endif // CROSS_SVR_MESSAGES_H
指令由頭部、載荷兩部分組成。

頭部header說明:

Mark是一個固定的起始,用於驗證流解譯的正確性。如果流解譯不正確,第二塊指令的起始將不是這個值。

messagetype 是一個用來標定指令型別的位元組,決定了載荷聯合體該採用哪個策略解譯

data_length是長度,這裡代表載荷的長度

載荷 payload說明:

有三種指令結構體, 心跳結構體用來維持各個伺服器之間的心跳,基本資訊(basicInfo)用於在連線建立後,向對方告知本節點的資訊。廣播結構體是用於在本機的伺服器列表發生變更時,向所有現有節點廣播新的列表。

對傳輸的使用者資料,直接儲存在data中。

5.1.2 連線流程

第一步,準備加入叢集的伺服器選取叢集中任一個節點作為物件,發起P2P連線。

第二步,雙方互換資訊(basicInfo)

第三步,雙方將對方的資訊新增到本地的伺服器節點表中。伺服器節點表是一群zp_ClusterNode類的例項,該類由ZPTaskEngine::zp_plTaskBase派生。這個基類在第三章有介紹。伺服器節點物件的例項負責具體的指令解譯。該列表如下(在cluster子資料夾的   zp_clusterterm.h中定義):

		//important hashes. server name to socket, socket to server name
		QMutex m_hash_mutex;
		QMap<QString , zp_ClusterNode *> m_hash_Name2node;
		QMap<QObject *,zp_ClusterNode *> m_hash_sock2node;

節點的指標存放在對映中,一個是名稱到物件的對映,一個是套接字到物件的對映

第四步,由於節點表發生變化,因此,會觸發對現有節點的廣播(broadCasting)

第五步,各個節點收到廣播後,會比較廣播中的節點資訊和自己目前的節點資訊,併發起向新增節點的連線。

最終,當一對一連線完成,系統重新處於穩定狀態。解譯這段資訊的程式碼片段在中cluster資料夾zp_clusternode.cpp的deal_current_message_block方法中實現:

		switch(m_currentHeader.messagetype)
		{
                \\...
		case 0x01://basicInfo, when connection established, this message should be used

			if (bytesLeft==0)
			{
				QString strName ((const char *)pMsg->payload.basicInfo.name);
				if (strName != m_pTerm->name())
				{
					this->m_strTermName = strName;
					m_nPortLAN = pMsg->payload.basicInfo.port_LAN;
					m_addrLAN = QHostAddress((const char *)pMsg->payload.basicInfo.Address_LAN);
					m_nPortPub = pMsg->payload.basicInfo.port_Pub;
					m_addrPub = QHostAddress((const char *)pMsg->payload.basicInfo.Address_Pub);
					if (false==m_pTerm->regisitNewServer(this))
					{
						this->m_strTermName.clear();
						emit evt_Message(this,tr("Info: New Svr already regisited. Ignored.")+strName);
						emit evt_close_client(this->sock());
					}
					else
					{
						emit evt_NewSvrConnected(this->termName());
						m_pTerm->BroadcastServers();
					}
				}
				else
				{
					emit evt_Message(this,tr("Can not connect to it-self, Loopback connections is forbidden."));
					emit evt_close_client(this->sock());
				}
			}
			break;
		case 0x02: //Server - broadcast messages

			if (bytesLeft==0)
			{
				int nSvrs = pMsg->hearder.data_length / sizeof(CROSS_SVR_MSG::uni_payload::tag_CSM_Broadcast);
				for (int i=0;i<nSvrs;i++)
				{
					QString strName ((const char *)pMsg->payload.broadcastMsg[i].name);
					if (strName != m_pTerm->name() && m_pTerm->SvrNodeFromName(strName)==NULL)
					{
						QHostAddress addrToConnectTo((const char *)pMsg->payload.broadcastMsg[i].Address_LAN);
						quint16 PortToConnectTo = pMsg->payload.broadcastMsg[i].port_LAN;
						
						if (strName > m_pTerm->name())
							emit evt_connect_to(addrToConnectTo,PortToConnectTo,false);
						else
							emit evt_Message(this,tr("Name %1 <= %2, omitted.").arg(strName).arg(m_pTerm->name()));
					}
				}
			}
			break;
                     ...


5.2 流式解析

TCP 是面向連線的流式傳輸。對使用者傳送的一個大資料包,雖然保證收發的完整性,旦接收方每次接收的資料片段長度是有限的,也是不定的。一種簡單的思路是按照指令結構體的長度,直接快取完整的資料包,而後集中處理。這樣有一個問題,在資料包很大時,記憶體開銷過高。因此,本應用設計的思路是邊接收、邊處理。具體步驟:

1、檢查收到的頭部是否合法

2、儲存當前指令的頭部

3、一旦得到一段載荷資料,就回調一次處理過程,處理過程根據需求等待更多資料,或者處理完後清空快取。這對一次傳輸100MB資料的應用是很關鍵的。流式處理需要完成的步驟關鍵程式碼如下:

5.2.1  資料接收

在 zp_ClusterTerm的接收槽裡,直接把資料片段壓入zp_ClusterNode物件的佇列中,並壓入流水線。

	//some data arrival
	void  zp_ClusterTerm::on_evt_Data_recieved(QObject *  clientHandle,QByteArray  datablock )
	{
		//Push Clients to nodes if it is not exist
		zp_ClusterNode * pClientNode = ...;
		int nblocks =  pClientNode->push_new_data(datablock);
		if (nblocks<=1)
			m_pClusterEng->pushTask(pClientNode);
                //...
	}

oushTask方法把Block壓入zp_ClusterNode的處理佇列m_list_Rawdata裡,這部分的狀態變數如下:
	class zp_ClusterNode : public ZPTaskEngine::zp_plTaskBase
	{
		//.....
		//Data Process
		//The raw data queue and its mutex
		QList<QByteArray> m_list_RawData;
		QMutex m_mutex_rawData;

		//The current Read Offset, from m_list_RawData's beginning
		int m_currentReadOffset;
		//Current Message Offset, according to m_currentHeader
		int m_currentMessageSize;

		//Current un-procssed message block.for large blocks,
		//this array will be re-setted as soon as some part of data has been
		//dealed, eg, send a 200MB block, the 200MB data will be splitted into pieces
		QByteArray m_currentBlock;

		CROSS_SVR_MSG::tag_header m_currentHeader;

		//...		
	};

變數 m_currentReadOffset 指的是佇列的首部元素已經處理的偏移。比如首部的Block有2341位元組,處理了1099位元組,本指令已經結束,則此值為1099

變數 m_currentMessageSize 指的是當前接收的資訊的大小。比如100MB 的資訊,接受了23MB,這個值就是23MB

變數 m_currentBlock 是當前的快取。這個快取會不斷的遞交處理,負責處理的程式可以根據情況適時清空它。對短指令,不清也是可以的。

變數 m_currentHeader 是當前的資訊頭部,這個值記錄了當前結構體的首部資訊。

5.2.2 資料處理

線上程池中,會呼叫 zp_ClusterNode::run 虛擬方法。這個方法的關鍵程式碼如下(實際程式碼因為有執行緒同步,要複雜一些):

	int zp_ClusterNode::run()
	{
		//nMessageBlockSize 是靜態變數,表示最多處理幾個塊就釋放CPU給其他節點
		int nMessage = m_nMessageBlockSize;
		int nCurrSz = -1;
		while (--nMessage>=0 && nCurrSz!=0  )
		{
			QByteArray block;			
			block =  *m_list_RawData.begin();			
			m_currentReadOffset = filter_message(block,m_currentReadOffset);
			if (m_currentReadOffset >= block.size())
			{
				m_list_RawData.pop_front();
				m_currentReadOffset = 0;
			}
			nCurrSz = m_list_RawData.size();
		}
		if (nCurrSz==0)
			return 0;
		return -1;
	}

其中,filter_message 是對資訊進行初步處理,輸入當前佇列的首部、處理偏移,返回新的處理偏移

這個方法的關鍵程式碼如下:

	//!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader
	//!return bytes Used.
	int zp_ClusterNode::filter_message(QByteArray  block, int offset)
	{
		const int blocklen = block.length();
		while (blocklen>offset)
		{
			const char * dataptr = block.constData();

			//先確保資訊的頭標誌被接收
			while (m_currentMessageSize<2 && blocklen>offset )
			{
				m_currentBlock.push_back(dataptr[offset++]);
				m_currentMessageSize++;
			}
			if (m_currentMessageSize < 2) //First 2 byte not complete
				continue;

			if (m_currentMessageSize==2)
			{
				const char * headerptr = m_currentBlock.constData();
				memcpy((void *)&m_currentHeader,headerptr,2);
			}

			const char * ptrCurrData = m_currentBlock.constData();
			//判斷頭2個位元組是不是1234
			if (m_currentHeader.Mark == 0x1234)
				//Valid Message
			{
				//試圖接收完整的頭部資訊
				if (m_currentMessageSize< sizeof(CROSS_SVR_MSG::tag_header) && blocklen>offset)
				{
					int nCpy = sizeof(CROSS_SVR_MSG::tag_header) - m_currentMessageSize;
					if (nCpy > blocklen - offset)
						nCpy = blocklen - offset;
					QByteArray arrCpy(dataptr+offset,nCpy);
					m_currentBlock.push_back(arrCpy);
					offset += nCpy;
					m_currentMessageSize += nCpy;
				}
				//如果頭部還沒收完則返回
				if (m_currentMessageSize < sizeof(CROSS_SVR_MSG::tag_header)) //Header not completed.
					continue;
				//除了頭部以外,還有資料可用,並且頭部剛剛接收完
				else if (m_currentMessageSize == sizeof(CROSS_SVR_MSG::tag_header))//Header just  completed.
				{
					//儲存頭部
					const char * headerptr = m_currentBlock.constData();
					memcpy((void *)&m_currentHeader,headerptr,sizeof(CROSS_SVR_MSG::tag_header));

					//繼續處理後續的載荷
					if (block.length()>offset)
					{
						//確定還有多少位元組沒有接收
						qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header)
								-m_currentMessageSize ;
						//繼續接收載荷
						if (bitLeft>0 && blocklen>offset)
						{
							int nCpy = bitLeft;
							if (nCpy > blocklen - offset)
								nCpy = blocklen - offset;
							QByteArray arrCpy(dataptr+offset,nCpy);
							m_currentBlock.push_back(arrCpy);
							offset += nCpy;
							m_currentMessageSize += nCpy;
							bitLeft -= nCpy;
						}
						//處理一次資料
						deal_current_message_block();
						if (bitLeft>0)
							continue;
						//This Message is Over. Start a new one.
						m_currentMessageSize = 0;
						m_currentBlock = QByteArray();
						continue;
					}
				}
				//除了頭部以外,還有資料可用
				else
				{
						
					if (block.length()>offset)
					{
						//確定還有多少位元組沒有接收
						qint32 bitLeft = m_currentHeader.data_length + sizeof(CROSS_SVR_MSG::tag_header)
								-m_currentMessageSize ;
						//繼續接收載荷
						if (bitLeft>0 && blocklen>offset)
						{
							int nCpy = bitLeft;
							if (nCpy > blocklen - offset)
								nCpy = blocklen - offset;
							QByteArray arrCpy(dataptr+offset,nCpy);
							m_currentBlock.push_back(arrCpy);
							offset += nCpy;
							m_currentMessageSize += nCpy;
							bitLeft -= nCpy;
						}
						//deal block, may be processed as soon as possible;
						deal_current_message_block();
						if (bitLeft>0)
							continue;
						//This Message is Over. Start a new one.
						m_currentMessageSize = 0;
						m_currentBlock = QByteArray();
						continue;
					}
				} // end if there is more bytes to append
			} //end deal trans message
			else
			  //...
		} // end while block len > offset

		return offset;
	}

在處理當前塊資料的方法 deal_current_message_block裡,即可逐一判斷訊息型別,加以處理了。

5.3 叢集模組外部介面

         叢集模組只負責在伺服器之間建立連線,並提供一套傳輸使用者資料的通路。在叢集建立連線後,使用者直接通過

	void zp_ClusterTerm::SendDataToRemoteServer(QString  svrName,QByteArray  SourceArray)
	{
		int nMsgLen = sizeof(CROSS_SVR_MSG::tag_header) +  SourceArray.size();
		QByteArray array(nMsgLen,0);
		CROSS_SVR_MSG * pMsg =(CROSS_SVR_MSG *) array.data();
		pMsg->hearder.Mark = 0x1234;
		pMsg->hearder.data_length = SourceArray.size();
		pMsg->hearder.messagetype = 0x03;
		memcpy (pMsg->payload.data,SourceArray.constData(),SourceArray.size());
		m_hash_mutex.lock();
		if (m_hash_Name2node.contains(svrName))
			netEng()->SendDataToClient(m_hash_Name2node[svrName]->sock(),array);
		m_hash_mutex.unlock();
	}

向伺服器 svrName傳送 SourceArray, 並響應
void evt_RemoteData_recieved(QString /*svrHandle*/,QByteArray  /*svrHandle*/ );

訊號來接收資料。使用者不用關心傳輸協議的封裝和解析。

     但是,以下問題是不涉及的。

     1、傳輸的資料的具體意義解釋

     2、全域性客戶端的UUID雜湊和同步

     3、客戶端資料是否被真正接收。

     這些部分留給應用相關部分來具體實現。