C# 實現的多執行緒非同步Socket資料包接收器框架
幾天前在博問中看到一個C# Socket問題,就想到筆者2004年做的一個省級交通流量接收伺服器專案,當時的基本求如下:
- 接收自動觀測裝置通過無線網絡卡、Internet和Socket上報的交通量資料包
- 全年365*24執行的自動觀測裝置5分鐘上報一次觀測資料,每筆記錄約2K大小
- 規劃全省將有100個左右的自動觀測裝置(截止2008年10月還只有30個)
當時,VS2003才釋出年多,筆者也是接觸C#不久。於是Google了國內國外網,希望找點應用C#解決Socket通訊問題的思路和程式碼。最後,找到了兩篇幫助最大的文章:一篇是國人寫的Socket接收器框架,應用了獨立的客戶端Socket會話(Session)概念,給筆者提供了一個接收伺服器的總體框架思路;另一篇是美國人寫的,提出了多執行緒、分段接收資料包的技術方案,描述了多執行緒、非同步Socket的許多實現細節,該文堅定了筆者採用多執行緒和非同步方式處理Socket接收器的技術路線。
具體實現和測試時筆者還發現,在Internet環境下的Socket應用中,需要系統有極強的容錯能力:沒有辦法控制異常,就必須允許它們存在(附加原始碼中可以看到,try{}catch{}語句較多)。對此,筆者設計了一個專門的檢查和清理執行緒,完成無效或超時會話的清除和資源釋放工作。
依稀記得,國內框架作者的名稱空間有ibm,認為是IBM公司職員,通過郵件後才知道其人在深圳。筆者向他請教了幾個問題,相互探討了幾個技術關鍵點。可惜,現在再去找,已經查不到原文和郵件了。只好藉此機會,將本文獻給這兩個素未謀面的技術高人和同行,也盼望拙文或原始碼能給讀者一點有用的啟發和幫助。
1、主要技術思路
整個系統由三個核心執行緒組成,並由.NET執行緒池統一管理:
- 偵聽客戶端連線請求執行緒:ListenClientRequest(),迴圈偵聽客戶端連線請求。如果有,檢測該客戶端IP,看是否是同一觀測裝置,然後建立一個客戶端TSession物件,並通過Socket非同步呼叫方法BeginReceive()接收資料包、EndReceive()處理資料包
- 資料包處理執行緒:HandleDatagrams(),迴圈檢測資料包佇列_datagramQueue,完成資料包解析、判斷型別、儲存等工作
- 客戶端狀態檢測執行緒:CheckClientState(),迴圈檢查客戶端會話表_sessionTable,判斷會話物件是否有效,設定超時會話關閉標誌,清楚無效會話物件及釋放其資源
2、主要類簡介
系統主要由3個類組成:
- TDatagramReceiver(資料包接收伺服器):系統的核心程序類,建立Socket連線、處理與儲存資料包、清理系統資源,該類提供全部的public屬性和方法
- TSession(客戶端會話):由每個客戶端的Socket物件組成,有自己的資料緩衝區,清理執行緒根據該物件的最近會話時間判斷是否超時
- TDatagram(資料包類):判斷資料包類別、解析資料包
3、關鍵函式和程式碼
下面簡介核心類TDatagramReceiver的關鍵實現程式碼。
3.1 系統啟動
系統啟動方法StartReceiver()首先清理資源、建立資料庫連線、初始化若干計數值,然後建立伺服器端偵聽Socket物件,最後呼叫靜態方法ThreadPool.QueueUserWorkItem()線上程池中建立3個核心處理執行緒。
///<summary>/// 啟動接收器
///</summary>publicbool StartReceiver()
{
try
{
_stopReceiver =true;
this.Close();
if (!this.ConnectDatabase()) returnfalse;
_clientCount =0;
_datagramQueueCount =0;
_datagramCount =0;
_errorDatagramCount =0;
_exceptionCount =0;
_sessionTable =new Hashtable(_maxAllowClientCount);
_datagramQueue =new Queue<TDatagram>(_maxAllowDatagramQueueCount);
_stopReceiver =false; // 迴圈中均要該標誌if (!this.CreateReceiverSocket()) //建立伺服器端 Socket 物件 {
returnfalse;
}
// 偵聽客戶端連線請求執行緒, 使用委託推斷, 不建 CallBack 物件if (!ThreadPool.QueueUserWorkItem(ListenClientRequest))
{
returnfalse;
}
// 處理資料包佇列執行緒if (!ThreadPool.QueueUserWorkItem(HandleDatagrams))
{
returnfalse;
}
// 檢查客戶會話狀態, 長時間未通訊則清除該物件if (!ThreadPool.QueueUserWorkItem(CheckClientState))
{
returnfalse;
}
_stopConnectRequest =false; // 啟動接收器,則自動允許連線 }
catch
{
this.OnReceiverException();
_stopReceiver =true;
}
return!_stopReceiver;
}
下面是建立偵聽Socket物件的方法程式碼。
///<summary>/// 建立接收伺服器的 Socket, 並偵聽客戶端連線請求
///</summary>privatebool CreateReceiverSocket()
{
try
{
_receiverSocket =new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_receiverSocket.Bind(new IPEndPoint(IPAddress.Any, _tcpSocketPort)); // 繫結埠 _receiverSocket.Listen(_maxAllowListenQueueLength); // 開始監聽returntrue;
}
catch
{
this.OnReceiverException();
returnfalse;
}
}
3.2 偵聽客戶端連線請求
伺服器端迴圈等待客戶端連線請求。一旦有請求,先判斷客戶端連線數是否超限,接著檢測該客戶端IP地址,一切正常後建立TSession物件,並呼叫非同步方法接收客戶端Socket資料包。
程式碼中,Socket讀到資料時的回撥AsyncCallback委託方法EndReceiveData()完成資料接收工作,正常情況下啟動另一個非同步BeginReceive()呼叫。
.NET中,每個非同步方法都有自己的獨立執行緒,非同步處理其實也基於多執行緒機制的。下面程式碼中的非同步套非同步呼叫,既佔用較大的系統資源,也給處理帶來意想不到的結果,更是出現異常時難以控制和處理的關鍵所在。
///<summary>/// 迴圈偵聽客戶端請求,由於要用執行緒池,故帶一個引數
///</summary>privatevoid ListenClientRequest(object state)
{
Socket client =null;
while (!_stopReceiver)
{
if (_stopConnectRequest) // 停止客戶端連線請求 {
if (_receiverSocket !=null)
{
try
{
_receiverSocket.Close(); // 強制關閉接收器 }
catch
{
this.OnReceiverException();
}
finally
{
// 必須為 null,否則 disposed 物件仍然存在,將引發下面的錯誤 _receiverSocket =null;
}
}
continue;
}
else
{
if (_receiverSocket ==null)
{
if (!this.CreateReceiverSocket())
{
continue;
}
}
}
try
{
if (_receiverSocket.Poll(_loopWaitTime, SelectMode.SelectRead))
{
// 頻繁關閉、啟動時,這裡容易產生錯誤(提示套接字只能有一個) client = _receiverSocket.Accept();
if (client !=null&& client.Connected)
{
if (this._clientCount >=this._maxAllowClientCount)
{
this.OnReceiverException();
try
{
client.Shutdown(SocketShutdown.Both);
client.Close();
}
catch { }
}
elseif (CheckSameClientIP(client)) // 已存在該 IP 地址 {
try
{
client.Shutdown(SocketShutdown.Both);
client.Close();
}
catch { }
}
else
{
TSession session =new TSession(client);
session.LoginTime = DateTime.Now;
lock (_sessionTable)
{
int preSessionID = session.ID;
while (true)
{
if (_sessionTable.ContainsKey(session.ID)) // 有可能重複該編號 {
session.ID =100000+ preSessionID;
}
else
{
break;
}
}
_sessionTable.Add(session.ID, session); // 登記該會話客戶端 Interlocked.Increment(ref _clientCount);
}
this.OnClientRequest();
try// 客戶端連續連線或連線後立即斷開,易在該處產生錯誤,系統忽略之 {
// 開始接受來自該客戶端的資料 session.ClientSocket.BeginReceive(session.ReceiveBuffer, 0,
session.ReceiveBufferLength, SocketFlags.None, EndReceiveData, session);
}
catch
{
session.DisconnectType = TDisconnectType.Exception;
session.State = TSessionState.NoReply;
}
}
}
elseif (client !=null) // 非空,但沒有連線(connected is false) {
try
{
client.Shutdown(SocketShutdown.Both);
client.Close();
}
catch { }
}
}
}
catch
{
this.OnReceiverException();
if (client !=null)
{
try
{
client.Shutdown(SocketShutdown.Both);
client.Close();
}
catch { }
}
}
// 該處可以適當暫停若干毫秒 }
// 該處可以適當暫停若干毫秒}
3.3 處理資料包
該執行緒迴圈檢視資料包佇列,完成資料包的解析與儲存等工作。具體實現時,如果佇列中沒有資料包,可以考慮等待若干毫秒,提高CPU利用率。
privatevoid HandleDatagrams(object state)
{
while (!_stopReceiver)
{
this.HandleOneDatagram(); // 處理一個數據包if (!_stopReceiver)
{
// 如果連線關閉,則重新建立,可容許幾個連線錯誤出現if (_sqlConnection.State == ConnectionState.Closed)
{
this.OnReceiverWork();
try
{
_sqlConnection.Open();
}
catch
{
this.OnReceiverException();
}
}
}
}
}
///<summary>/// 處理一個包資料,包括:驗證、儲存
///</summary>privatevoid HandleOneDatagram()
{
TDatagram datagram =null;
lock (_datagramQueue)
{
if (_datagramQueue.Count >0)
{
datagram = _datagramQueue.Dequeue(); // 取佇列資料 Interlocked.Decrement(ref _datagramQueueCount);
}
}
if (datagram ==null) return;
datagram.Clear();
datagram =null; // 釋放物件}
3.4 檢查與清理會話
本執行緒負責處理建立連線後的客戶端會話TSession或Socket物件的關閉與資源清理工作,其它方法中出現異常等情況,儘可能標記相關TSession物件的屬性NoReply=true,表示該會話已經無效、需要清理。
檢查會話佇列並清理資源分3步:第一步,Shutdown()客戶端Socket,此時可能立即觸發某些Socket的非同步方法EndReceive();第二步,Close()客戶端Socket,釋放佔用資源;第三步,從會話表中清除該會話物件。其中,第一步完成後,某個TSession也許不會立即到第二步,因為可能需要處理其非同步結束方法。
需要指出, 由於涉及多執行緒處理,需要頻繁加解鎖操作,清理工作前先建立一個會話佇列列副本sessionTable2,檢查與清理該隊副本列列的TSession物件。
///<summary>/// 檢查客戶端狀態(掃描方式,若長時間無資料,則斷開)
///</summary>privatevoid CheckClientState(object state)
{
while (!_stopReceiver)
{
DateTime thisTime = DateTime.Now;
// 建立一個副本 ,然後對副本進行操作 Hashtable sessionTable2 =new Hashtable();
lock (_sessionTable)
{
foreach (TSession session in _sessionTable.Values)
{
if (session !=null)
{
sessionTable2.Add(session.ID, session);
}
}
}
foreach (TSession session in sessionTable2.Values) // 對副本進行操作 {
Monitor.Enter(session);
try
{
if (session.State == TSessionState.NoReply) // 分三步清除一個 Session {
session.State = TSessionState.Closing;
if (session.ClientSocket !=null)
{
try
{
// 第一步:shutdown session.ClientSocket.Shutdown(SocketShutdown.Both);
}
catch { }
}
}
elseif (session.State == TSessionState.Closing)
{
session.State = TSessionState.Closed;
if (session.ClientSocket !=null)
{
try
{
// 第二步: Close session.ClientSocket.Close();
}
catch { }
}
}
elseif (session.State == TSessionState.Closed)
{
lock (_sessionTable)
{
// 第三步:remove from table _sessionTable.Remove(session.ID);
Interlocked.Decrement(ref _clientCount);
}
this.OnClientRequest();
session.Clear(); // 清空緩衝區 }
elseif (session.State == TSessionState.Normal) // 正常的會話 {
TimeSpan ts = thisTime.Subtract(session.LastDataReceivedTime);
if (Math.Abs(ts.TotalSeconds) > _maxSocketDataTimeout) // 超時,則準備斷開連線 {
session.DisconnectType = TDisconnectType.Timeout;
session.State = TSessionState.NoReply; // 標記為將關閉、準備斷開 }
}
}
finally
{
Monitor.Exit(session);
}
} // end foreach
sessionTable2.Clear();
} // end while}
4 、結語
基於多執行緒處理的系統代價是比較大的,需要經常呼叫加/解鎖方法lock()或Monitor.Enter(),需要經常建立處理執行緒等。從實際執行效果看,筆者的實現方案有較好的穩定性:2005年4月到5月間,在一個普通PC機器上連續執行30多天不出一點故障。同時,筆者採用了時序區間判重等演算法,有效地提高了系統處理與響應速度。測試表明,在普通的PC機器(P4 2.0)上,可以做到0.5秒處理一個數據包,如果優化程式碼和伺服器,還有較大的效能提升空間。
上面的程式碼是筆者實現的省級公路交通流量資料服務中心(DSC)專案中的接收伺服器框架部分,整個系統還包括:資料轉發交通部的轉發伺服器、資料遠端查詢客戶端、綜合報表資料處理系統、資料線上釋出系統、系統執行監控系統等。
實際的接收伺服器類及其輔助類超過3K行,整個系統則超過了60K。因為是早期實現的程式,難免有程式碼粗糙、方法欠妥的感覺,只有留待下個版本完善擴充了。由於與甲方有保密合同和版權保護等,不可能公開全部原始碼,刪減也有不當之處,讀者發現時請不吝指正。下面是帶詳細註釋的程式碼下載URL。
相關推薦
C# 實現的多執行緒非同步Socket資料包接收器框架
幾天前在博問中看到一個C# Socket問題,就想到筆者2004年做的一個省級交通流量接收伺服器專案,當時的基本求如下: 接收自動觀測裝置通過無線網絡卡、Internet和Socket上報的交通量資料包 全年365*24執行的自動觀測裝置5分鐘上報一次觀測資料,每筆記錄約
利用web work實現多執行緒非同步機制,打造頁面單步除錯IDE
我們已經完成了整個編譯器的開發,現在我們做一個能夠單步除錯的頁面IDE,完成本章程式碼後,我們可以實現下面如圖所示功能: 頁面IDE可以顯示每行程式碼所在的行,單擊某一行,在改行前面會出現一個紅點表示斷點,點選Parsing按鈕後,進入單步除錯模式,然後每點一次step按鈕,頁
Linux平臺上用C++實現多執行緒互斥鎖
在上篇用C++實現了Win32平臺上的多執行緒互斥鎖,這次寫個Linux平臺上的,同樣參考了開源專案C++ Sockets的程式碼,在此對這些給開源專案做出貢獻的鬥士們表示感謝! 下邊分別是互斥鎖類和測試程式碼,已經在Fedora 13虛擬機器上測試通過。
用C++實現多執行緒Mutex鎖(Win32)
本文目的:用C++和Windows的互斥物件(Mutex)來實現執行緒同步鎖。 準備知識:1,核心物件互斥體(Mutex)的工作機理,WaitForSingleObject函式的用法,這些可以從MSDN獲取詳情; 2,當兩個或更多執行緒需要同時訪問一個共享資
gRPC 實現多執行緒非同步服務端
// AsyncServer_Demo.cpp : 定義控制檯應用程式的入口點。 // #include "stdafx.h" #include <memory> #include <iostream> #include <string> #include <thr
《Unity 3D遊戲客戶端基礎框架》多執行緒非同步 Socket 框架構建
引言: 之前寫過一個 demo 案例大致講解了 Socket 通訊的過程,並和自建的伺服器完成連線和簡單的資料通訊,詳細的內容可以檢視 Unity3D —— Socket通訊(C#)。但是在實際專案應用的過程中,這個 demo 的實現方式顯得異常簡陋,而且對應
windows程式設計 使用C++實現多執行緒類
本文簡單介紹如何在windows程式設計中實現多執行緒類,供大家學習參考,也希望大家指正。 有時候我們想在一個類中實現多執行緒,主執行緒在某些時刻獲得資料,可以“通知”子執行緒去處理,然後把結果返回。下面的例項是主執行緒每隔2s產生10個隨機數,將這10隨機數傳給多執行緒
C++實現多執行緒物件記憶體池帶垃圾回收機制
#include <Windows.h> #include <iostream> #include <map> #include <string> #include <assert.h> #include <
QT C++實現多執行緒通訊--示例程式碼
</pre><p></p><pre name="code" class="cpp">先看測試程式碼:main.cpp#include "mainwindow.h" #include <QApplication> /
可擴充套件多執行緒非同步Socket伺服器框架EMTASS 2.0
0 前言 >>[前言]、[第1節]、[第2節]、[第3節]、[第4節]、[第5節]、[第6節] 在程式設計與實際應用中,Socket資料包接收伺服器夠得上一個經典問題了:需要計算機與網路程式設計知識(主要是Socket),與業務處理邏輯密切(如:包組成
C# 利用執行緒安全資料結構BlockingCollection實現多執行緒
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using Danny.Infrastructure.Helper; names
C++11 多執行緒執行緒共享資料
共享資料的問題 這些在作業系統中都有詳細的介紹,可以回顧作業系統課程。。很典型的就是資料競爭問題。 互斥量保護資料 最原始的方式:使用std::mutex建立互斥量,使用成員lock()加鎖,使用成員unlock()解鎖。但是這種方式需要我們在每個函數出口都呼叫一次unloc
Java併發(十八):阻塞佇列BlockingQueue BlockingQueue(阻塞佇列)詳解 二叉堆(一)之 圖文解析 和 C語言的實現 多執行緒程式設計:阻塞、併發佇列的使用總結 Java併發程式設計:阻塞佇列 java阻塞佇列 BlockingQueue(阻塞佇列)詳解
阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。 這兩個附加的操作是:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。當佇列滿時,儲存元素的執行緒會等待佇列可用。 阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者
C# 多執行緒 非同步
一、基本概念 1、程序 首先開啟工作管理員,檢視當前執行的程序: 從工作管理員裡面可以看到當前所有正在執行的程序。那麼究竟什麼是程序呢? 程序(Process)是Windows系統中的一個基本概念,它包含著一個執行程式所需要的資源。一個正在執行的應用程式在作業
C++11多執行緒程式設計 第十章: 使用packaged_task優雅的讓同步函式非同步執行
C++11 Multithreading – Part 10: packaged_task<> Example and Tutorial Varun July 2, 2017 C++11 Multithreading – Part 10: packaged_tas
C++11多執行緒程式設計 第四章: 共享資料和競態條件
C++11 Multithreading – Part 4: Data Sharing and Race Conditions Varun February 21, 2015C++11 Multithreading – Part 4: Data Sharing and Race Con
C++之多執行緒(C++11 thread.h檔案實現多執行緒)
轉載自: 與 C++11 多執行緒相關的標頭檔案 C++11 新標準中引入了四個標頭檔案來支援多執行緒程式設計,他們分別是<atomic> ,<thread>,<mutex>,<condition_variable>和&l
深入理解javascript非同步程式設計障眼法&&h5 web worker實現多執行緒
0.從一道題說起 var t = true; setTimeout(function(){ t = false; }, 1000); while(t){ } alert('end'); 問,以上程式碼何時alert“end”呢? 測試一下:答案是:
callable介面配合ExecutorService實現多執行緒處理資料,並接收返回值(2018-08-23)
/** * @author chenzhen * Created by chenzhen on 2018/8/22. */ @Data public class QuickPullGit implements Callable<ArrayList&l
c語言 多執行緒的簡單實現 執行緒鎖
#include <stdio.h> #include <math.h> #include <pthread.h> #include <stdlib.h> #include <string.h> #i