1. 程式人生 > >QT 執行緒池 + TCP 小試(二)實現通訊功能

QT 執行緒池 + TCP 小試(二)實現通訊功能

*免分資源連結點選開啟連結http://download.csdn.net/detail/goldenhawking/4492378

        有了執行緒池,我們下一步就利用 QTcpServer 搭建一個伺服器,接受客戶端的連線,並把資料傳送到執行緒池上。由於 QTcpServer 資料太多了,這裡不在贅述。唯一值得注意的是,當客戶端退出時,如果執行緒池佇列中還有該客戶的資訊,這個資訊還會被處理,只是無法再發送回去而已。其實,還可實現成客戶端退出,就發一個訊號到執行緒池,刪除自己的所有任務。這個也很簡單,但之所以沒有做,因為這些資料的處理結果可能還會被其他消費者(而非生產者自己)使用,最典型的例子是從工業感測器上採集的資料,其生成的影象需要儲存到裝置中去。

       QTcpSocket的 Write 方法預設是支援大體積資料的,即使一次發了500MB的資料,只要硬體資源可以承受,呼叫也會成功並立刻返回。接受者會以一定的載荷大小不停的觸發readyRead,直到傳送全部成功。但是,為了能夠觀察到並控制收發佇列中的資料包的大小、體積,我們在外層實現了一個傳送佇列,每次以 payLoad為大小發送資料包。這是從MFC中帶來的習慣,很難說好壞。

 qghtcpserver.h

#ifndef QGHTCPSERVER_H
#define QGHTCPSERVER_H

#include <QTcpServer>
#include <QMap>
#include <QList>

class QGHTcpServer : public QTcpServer
{
	Q_OBJECT

public:
	QGHTcpServer(QObject *parent,int nPayLoad = 4096);
	~QGHTcpServer();
	//踢出所有客戶
	void KickAllClients();
	QList <QObject *> clientsList();
	void SetPayload(int nPayload);
private:
	QMap<QObject *,QList<QByteArray> > m_buffer_sending;
	QMap<QObject *,QList<qint64> > m_buffer_sending_offset;
	QMap<QObject*,int> m_clientList;
	int m_nPayLoad;
public slots:
	//新的客戶連線到來
	void new_client_recieved();
	//客戶連線被關閉
	void client_closed();
	//新的資料到來
	void new_data_recieved();
	//一批資料傳送完畢
	void some_data_sended(qint64);
	//客戶端錯誤
	void displayError(QAbstractSocket::SocketError socketError);
	//向客戶端傳送資料
	void SendDataToClient(QObject * objClient,const QByteArray &  dtarray);
	//向客戶端廣播資料,不包括 objFromClient
	void BroadcastData(QObject * objFromClient,const QByteArray &  dtarray);
signals:
	//錯誤資訊
	void evt_SocketError(QObject * senderSock ,QAbstractSocket::SocketError socketError);
	//新的客戶端連線
	void evt_NewClientConnected(QObject * client);
	//客戶端退出
	void evt_ClientDisconnected(QObject * client);
	//收到一批資料
	void evt_Data_recieved(QObject * ,const QByteArray &  );
	//一批資料被髮送
	void evt_Data_transferred(QObject * client,qint64);
};

#endif // QGHTCPSERVER_H

qghtcpserver.cpp
#include "qghtcpserver.h"
#include <assert.h>
#include <QTcpSocket>
QGHTcpServer::QGHTcpServer(QObject *parent,int nPayLoad )
	: QTcpServer(parent),
	m_nPayLoad(nPayLoad)
{
	assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024);
	connect(this, SIGNAL(newConnection()), this, SLOT(new_client_recieved()));
}

QGHTcpServer::~QGHTcpServer()
{

}
QList <QObject *> QGHTcpServer::clientsList()
{
	return m_clientList.keys();
}
void QGHTcpServer::SetPayload(int nPayload)
{
	m_nPayLoad = nPayload;
	assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024);
}

void QGHTcpServer::new_client_recieved()
{
	QTcpSocket * sock_client = nextPendingConnection();
	while (sock_client)
	{
		connect(sock_client, SIGNAL(readyRead()),this, SLOT(new_data_recieved()));
		connect(sock_client, SIGNAL(disconnected()),this,SLOT(client_closed()));
		connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)));
		connect(sock_client, SIGNAL(bytesWritten(qint64)), this, SLOT(some_data_sended(qint64)));
		m_clientList[sock_client] = 0;
		emit evt_NewClientConnected(sock_client);
		sock_client = nextPendingConnection();
	}
}
void QGHTcpServer::client_closed()
{
	QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());
	if (pSock)
	{
		emit evt_ClientDisconnected(pSock);
		m_buffer_sending.remove(pSock);
		m_buffer_sending_offset.remove(pSock);
		m_clientList.remove(pSock);
		pSock->deleteLater();
	}
}
void QGHTcpServer::new_data_recieved()
{
	QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());
	if (pSock)
		emit evt_Data_recieved(pSock,pSock->readAll());
}
void QGHTcpServer::some_data_sended(qint64 wsended)
{
	QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());
	if (pSock)
	{
		emit evt_Data_transferred(pSock,wsended);
		QList<QByteArray> & list_sock_data = m_buffer_sending[pSock];
		QList<qint64> & list_offset = m_buffer_sending_offset[pSock];
		while (list_sock_data.empty()==false)
		{
			QByteArray & arraySending = *list_sock_data.begin();
			qint64 & currentOffset = *list_offset.begin();
			qint64 nTotalBytes = arraySending.size();
			assert(nTotalBytes>=currentOffset);
			qint64 nBytesWritten = pSock->write(arraySending.constData()+currentOffset,qMin((int)(nTotalBytes-currentOffset),m_nPayLoad));
			currentOffset += nBytesWritten;
			if (currentOffset>=nTotalBytes)
			{
				list_offset.pop_front();
				list_sock_data.pop_front();
			}
			else
				break;
		}
	}
}
void QGHTcpServer::displayError(QAbstractSocket::SocketError socketError)
{
	QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());
	if (pSock)
	{
		emit evt_SocketError(pSock,socketError);
		pSock->disconnectFromHost();
	}
}

void QGHTcpServer::SendDataToClient(QObject * objClient,const QByteArray &  dtarray)
{
	if (m_clientList.find(objClient)==m_clientList.end())
		return;
	QTcpSocket * pSock = qobject_cast<QTcpSocket*>(objClient);
	if (pSock&&dtarray.size())
	{
		QList<QByteArray> & list_sock_data = m_buffer_sending[pSock];
		QList<qint64> & list_offset = m_buffer_sending_offset[pSock];
		if (list_sock_data.empty()==true)
		{
			qint64 bytesWritten = pSock->write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad));
			if (bytesWritten < dtarray.size())
			{
				list_sock_data.push_back(dtarray);
				list_offset.push_back(bytesWritten);
			}
		}
		else
		{
			list_sock_data.push_back(dtarray);
			list_offset.push_back(0);
		}			
	}	
}
void QGHTcpServer::BroadcastData(QObject * objClient,const QByteArray &  dtarray)
{
	for(QMap<QObject *,int>::iterator p = m_clientList.begin();p!=m_clientList.end();p++)
	{
		QTcpSocket * pSock = qobject_cast<QTcpSocket*>(p.key());
		if (pSock&&dtarray.size()&&pSock!=objClient)
		{
			QList<QByteArray> & list_sock_data = m_buffer_sending[pSock];
			QList<qint64> & list_offset = m_buffer_sending_offset[pSock];
			if (list_sock_data.empty()==true)
			{
				qint64 bytesWritten = pSock->write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad));
				if (bytesWritten < dtarray.size())
				{
					list_sock_data.push_back(dtarray);
					list_offset.push_back(bytesWritten);
				}
				else
				{
					list_sock_data.push_back(dtarray);
					list_offset.push_back(0);
				}			
			}
		}	
	}
}
void QGHTcpServer::KickAllClients()
{
	QList<QObject *> clientList = m_clientList.keys();
	foreach(QObject * obj,clientList)
	{
		QTcpSocket * pSock = qobject_cast<QTcpSocket*>(obj);
		if (pSock)
		{
			pSock->disconnectFromHost();
		}	
	}

}
下一次,我會介紹最後的實現功能。