1. 程式人生 > >C# 實現的多執行緒非同步Socket資料包接收器框架

C# 實現的多執行緒非同步Socket資料包接收器框架

幾天前在博問中看到一個C# Socket問題,就想到筆者2004年做的一個省級交通流量接收伺服器專案,當時的基本求如下:

  • 接收自動觀測裝置通過無線網絡卡、Internet和Socket上報的交通量資料包
  • 全年365*24執行的自動觀測裝置5分鐘上報一次觀測資料,每筆記錄約2K大小
  • 規劃全省將有100個左右的自動觀測裝置(截止2008年10月還只有30個)

      當時,VS2003才釋出年多,筆者也是接觸C#不久。於是Google了國內國外網,希望找點應用C#解決Socket通訊問題的思路和程式碼。最後,找到了兩篇幫助最大的文章:一篇是國人寫的Socket接收器框架,應用了獨立的客戶端Socket會話(Session)概念,給筆者提供了一個接收伺服器的總體框架思路;另一篇是美國人寫的,提出了多執行緒、分段接收資料包的技術方案,描述了多執行緒、非同步Socket的許多實現細節,該文堅定了筆者採用多執行緒和非同步方式處理Socket接收器的技術路線。

     具體實現和測試時筆者還發現,在Internet環境下的Socket應用中,需要系統有極強的容錯能力:沒有辦法控制異常,就必須允許它們存在(附加原始碼中可以看到,try{}catch{}語句較多)。對此,筆者設計了一個專門的檢查和清理執行緒,完成無效或超時會話的清除和資源釋放工作。

     依稀記得,國內框架作者的名稱空間有ibm,認為是IBM公司職員,通過郵件後才知道其人在深圳。筆者向他請教了幾個問題,相互探討了幾個技術關鍵點。可惜,現在再去找,已經查不到原文和郵件了。只好藉此機會,將本文獻給這兩個素未謀面的技術高人和同行,也盼望拙文或原始碼能給讀者一點有用的啟發和幫助。

1、主要技術思路


     整個系統由三個核心執行緒組成,並由.NET執行緒池統一管理:

  • 偵聽客戶端連線請求執行緒:ListenClientRequest(),迴圈偵聽客戶端連線請求。如果有,檢測該客戶端IP,看是否是同一觀測裝置,然後建立一個客戶端TSession物件,並通過Socket非同步呼叫方法BeginReceive()接收資料包、EndReceive()處理資料包
  • 資料包處理執行緒:HandleDatagrams(),迴圈檢測資料包佇列_datagramQueue,完成資料包解析、判斷型別、儲存等工作
  • 客戶端狀態檢測執行緒:CheckClientState(),迴圈檢查客戶端會話表_sessionTable,判斷會話物件是否有效,設定超時會話關閉標誌,清楚無效會話物件及釋放其資源

2、主要類簡介

     系統主要由3個類組成:

  • TDatagramReceiver(資料包接收伺服器):系統的核心程序類,建立Socket連線、處理與儲存資料包、清理系統資源,該類提供全部的public屬性和方法
  • TSession(客戶端會話):由每個客戶端的Socket物件組成,有自己的資料緩衝區,清理執行緒根據該物件的最近會話時間判斷是否超時
  • TDatagram(資料包類):判斷資料包類別、解析資料包

3、關鍵函式和程式碼

     下面簡介核心類TDatagramReceiver的關鍵實現程式碼。

3.1  系統啟動

      系統啟動方法StartReceiver()首先清理資源、建立資料庫連線、初始化若干計數值,然後建立伺服器端偵聽Socket物件,最後呼叫靜態方法ThreadPool.QueueUserWorkItem()線上程池中建立3個核心處理執行緒。

ExpandedBlockStart.gif
///<summary>///  啟動接收器
///</summary>publicbool StartReceiver()
{
    
try
    {
        _stopReceiver 
=true;

        
this.Close();

        
if (!this.ConnectDatabase()) returnfalse;

        _clientCount 
=0;
        _datagramQueueCount 
=0;
        _datagramCount 
=0;
        _errorDatagramCount 
=0;
        _exceptionCount 
=0;

        _sessionTable 
=new Hashtable(_maxAllowClientCount);
        _datagramQueue 
=new Queue<TDatagram>(_maxAllowDatagramQueueCount);

        _stopReceiver 
=false;  // 迴圈中均要該標誌if (!this.CreateReceiverSocket())  //建立伺服器端 Socket 物件        {
            
returnfalse;
        }

        
// 偵聽客戶端連線請求執行緒, 使用委託推斷, 不建 CallBack 物件if (!ThreadPool.QueueUserWorkItem(ListenClientRequest))
        {
            
returnfalse;
        }

        
// 處理資料包佇列執行緒if (!ThreadPool.QueueUserWorkItem(HandleDatagrams))
        {
            
returnfalse;
        }

        
// 檢查客戶會話狀態, 長時間未通訊則清除該物件if (!ThreadPool.QueueUserWorkItem(CheckClientState))
        {
            
returnfalse;
        }

        _stopConnectRequest 
=false;  // 啟動接收器,則自動允許連線    }
    
catch
    {
        
this.OnReceiverException();
        _stopReceiver 
=true;
    }
    
return!_stopReceiver;
}

      下面是建立偵聽Socket物件的方法程式碼。

ExpandedBlockStart.gif
///<summary>/// 建立接收伺服器的 Socket, 並偵聽客戶端連線請求
///</summary>privatebool CreateReceiverSocket()
{
    
try
    {
        _receiverSocket 
=new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        _receiverSocket.Bind(
new IPEndPoint(IPAddress.Any, _tcpSocketPort));  // 繫結埠        _receiverSocket.Listen(_maxAllowListenQueueLength);  // 開始監聽returntrue;
    }
    
catch
    {
        
this.OnReceiverException();
        
returnfalse;
    }
}

3.2  偵聽客戶端連線請求

      伺服器端迴圈等待客戶端連線請求。一旦有請求,先判斷客戶端連線數是否超限,接著檢測該客戶端IP地址,一切正常後建立TSession物件,並呼叫非同步方法接收客戶端Socket資料包。

      程式碼中,Socket讀到資料時的回撥AsyncCallback委託方法EndReceiveData()完成資料接收工作,正常情況下啟動另一個非同步BeginReceive()呼叫。

      .NET中,每個非同步方法都有自己的獨立執行緒,非同步處理其實也基於多執行緒機制的。下面程式碼中的非同步套非同步呼叫,既佔用較大的系統資源,也給處理帶來意想不到的結果,更是出現異常時難以控制和處理的關鍵所在。

ExpandedBlockStart.gif
///<summary>/// 迴圈偵聽客戶端請求,由於要用執行緒池,故帶一個引數
///</summary>privatevoid ListenClientRequest(object state)
{
    Socket client 
=null;
    
while (!_stopReceiver)
    {
        
if (_stopConnectRequest)  //  停止客戶端連線請求        {
            
if (_receiverSocket !=null)
            {
                
try
                {
                    _receiverSocket.Close();  
// 強制關閉接收器                }
                
catch
                {
                    
this.OnReceiverException();
                }
                
finally
                {
                    
// 必須為 null,否則 disposed 物件仍然存在,將引發下面的錯誤                    _receiverSocket =null;
                }
            }
            
continue;
        }
        
else
        {
            
if (_receiverSocket ==null)
            {
                
if (!this.CreateReceiverSocket())
                {
                    
continue;
                }
            }
        }

        
try
        {
            
if (_receiverSocket.Poll(_loopWaitTime, SelectMode.SelectRead))
            {
                
// 頻繁關閉、啟動時,這裡容易產生錯誤(提示套接字只能有一個)                client = _receiverSocket.Accept();

                
if (client !=null&& client.Connected)
                {
                    
if (this._clientCount >=this._maxAllowClientCount)
                    {
                        
this.OnReceiverException();

                        
try
                        {
                            client.Shutdown(SocketShutdown.Both);
                            client.Close();
                        }
                        
catch { }
                    }
                    
elseif (CheckSameClientIP(client))  // 已存在該 IP 地址                    {
                        
try
                        {
                            client.Shutdown(SocketShutdown.Both);
                            client.Close();
                        }
                        
catch { }
                    }
                    
else
                    {
                        TSession session 
=new TSession(client);
                        session.LoginTime 
= DateTime.Now;

                        
lock (_sessionTable)
                        {
                            
int preSessionID = session.ID;
                            
while (true)
                            {
                                
if (_sessionTable.ContainsKey(session.ID))  // 有可能重複該編號                                {
                                    session.ID 
=100000+ preSessionID;
                                }
                                
else
                                {
                                    
break;
                                }
                            }
                            _sessionTable.Add(session.ID, session);  
// 登記該會話客戶端                            Interlocked.Increment(ref _clientCount);
                        }

                        
this.OnClientRequest();

                        
try// 客戶端連續連線或連線後立即斷開,易在該處產生錯誤,系統忽略之                        {
                            
// 開始接受來自該客戶端的資料                            session.ClientSocket.BeginReceive(session.ReceiveBuffer, 0
                                session.ReceiveBufferLength, SocketFlags.None, EndReceiveData, session);
                        }
                        
catch
                        {
                            session.DisconnectType 
= TDisconnectType.Exception;
                            session.State 
= TSessionState.NoReply;
                        }
                    }
                }
                
elseif (client !=null)  // 非空,但沒有連線(connected is false)                {
                    
try
                    {
                        client.Shutdown(SocketShutdown.Both);
                        client.Close();
                    }
                    
catch { }
                }
            }
        }
        
catch
        {
            
this.OnReceiverException();

            
if (client !=null)
            {
                
try
                {
                    client.Shutdown(SocketShutdown.Both);
                    client.Close();
                }
                
catch { }
            }
        }
        
// 該處可以適當暫停若干毫秒    }
    
// 該處可以適當暫停若干毫秒}

3.3  處理資料包

      該執行緒迴圈檢視資料包佇列,完成資料包的解析與儲存等工作。具體實現時,如果佇列中沒有資料包,可以考慮等待若干毫秒,提高CPU利用率。

ExpandedBlockStart.gif
privatevoid HandleDatagrams(object state)
{
    
while (!_stopReceiver)
    {
        
this.HandleOneDatagram();  // 處理一個數據包if (!_stopReceiver)
        {
            
// 如果連線關閉,則重新建立,可容許幾個連線錯誤出現if (_sqlConnection.State == ConnectionState.Closed)
            {
                
this.OnReceiverWork();

                
try
                {
                    _sqlConnection.Open();
                }
                
catch
                {
                    
this.OnReceiverException();
                }
            }
        }
    }
}

///<summary>/// 處理一個包資料,包括:驗證、儲存
///</summary>privatevoid HandleOneDatagram()
{
    TDatagram datagram 
=null;

    
lock (_datagramQueue)
    {
        
if (_datagramQueue.Count >0)
        {
            datagram 
= _datagramQueue.Dequeue();  // 取佇列資料            Interlocked.Decrement(ref _datagramQueueCount);
        }
    }

    
if (datagram ==nullreturn;

    datagram.Clear();
    datagram 
=null;  // 釋放物件}

3.4  檢查與清理會話

      本執行緒負責處理建立連線後的客戶端會話TSession或Socket物件的關閉與資源清理工作,其它方法中出現異常等情況,儘可能標記相關TSession物件的屬性NoReply=true,表示該會話已經無效、需要清理。

       檢查會話佇列並清理資源分3步:第一步,Shutdown()客戶端Socket,此時可能立即觸發某些Socket的非同步方法EndReceive();第二步,Close()客戶端Socket,釋放佔用資源;第三步,從會話表中清除該會話物件。其中,第一步完成後,某個TSession也許不會立即到第二步,因為可能需要處理其非同步結束方法。

      需要指出, 由於涉及多執行緒處理,需要頻繁加解鎖操作,清理工作前先建立一個會話佇列列副本sessionTable2,檢查與清理該隊副本列列的TSession物件。

ExpandedBlockStart.gif
///<summary>/// 檢查客戶端狀態(掃描方式,若長時間無資料,則斷開)
///</summary>privatevoid CheckClientState(object state)
{
    
while (!_stopReceiver)
    {
        DateTime thisTime 
= DateTime.Now;

        
// 建立一個副本 ,然後對副本進行操作        Hashtable sessionTable2 =new Hashtable();
        
lock (_sessionTable)
        {
            
foreach (TSession session in _sessionTable.Values)
            {
                
if (session !=null)
                {
                    sessionTable2.Add(session.ID, session);
                }
            }
        }

        
foreach (TSession session in sessionTable2.Values)  // 對副本進行操作        {
            Monitor.Enter(session);
            
try
            {
                
if (session.State == TSessionState.NoReply)  // 分三步清除一個 Session                {
                    session.State 
= TSessionState.Closing;
                    
if (session.ClientSocket !=null)
                    {
                        
try
                        {
                            
// 第一步:shutdown                            session.ClientSocket.Shutdown(SocketShutdown.Both);
                        }
                        
catch { }
                    }
                }
                
elseif (session.State == TSessionState.Closing)
                {
                    session.State 
= TSessionState.Closed;
                    
if (session.ClientSocket !=null)
                    {
                        
try
                        {
                            
// 第二步: Close                            session.ClientSocket.Close();
                        }
                        
catch { }
                    }
                }
                
elseif (session.State == TSessionState.Closed)
                {

                    
lock (_sessionTable)
                    {
                        
// 第三步:remove from table                        _sessionTable.Remove(session.ID);
                        Interlocked.Decrement(
ref _clientCount);
                    }

                    
this.OnClientRequest();
                    session.Clear();  
// 清空緩衝區                }
                
elseif (session.State == TSessionState.Normal)  // 正常的會話                 {

                    TimeSpan ts 
= thisTime.Subtract(session.LastDataReceivedTime);
                    
if (Math.Abs(ts.TotalSeconds) > _maxSocketDataTimeout)  // 超時,則準備斷開連線                    {
                        session.DisconnectType 
= TDisconnectType.Timeout;
                        session.State 
= TSessionState.NoReply;  // 標記為將關閉、準備斷開                    }
                }
            }
            
finally
            {
                Monitor.Exit(session);
            }
        }  
// end foreach
        sessionTable2.Clear();
    }  
// end while}

4 、結語

     基於多執行緒處理的系統代價是比較大的,需要經常呼叫加/解鎖方法lock()或Monitor.Enter(),需要經常建立處理執行緒等。從實際執行效果看,筆者的實現方案有較好的穩定性:2005年4月到5月間,在一個普通PC機器上連續執行30多天不出一點故障。同時,筆者採用了時序區間判重等演算法,有效地提高了系統處理與響應速度。測試表明,在普通的PC機器(P4 2.0)上,可以做到0.5秒處理一個數據包,如果優化程式碼和伺服器,還有較大的效能提升空間。

     上面的程式碼是筆者實現的省級公路交通流量資料服務中心(DSC)專案中的接收伺服器框架部分,整個系統還包括:資料轉發交通部的轉發伺服器、資料遠端查詢客戶端、綜合報表資料處理系統、資料線上釋出系統、系統執行監控系統等。

     實際的接收伺服器類及其輔助類超過3K行,整個系統則超過了60K。因為是早期實現的程式,難免有程式碼粗糙、方法欠妥的感覺,只有留待下個版本完善擴充了。由於與甲方有保密合同和版權保護等,不可能公開全部原始碼,刪減也有不當之處,讀者發現時請不吝指正。下面是帶詳細註釋的程式碼下載URL。

相關推薦

C# 實現執行非同步Socket資料接收器框架

幾天前在博問中看到一個C# Socket問題,就想到筆者2004年做的一個省級交通流量接收伺服器專案,當時的基本求如下: 接收自動觀測裝置通過無線網絡卡、Internet和Socket上報的交通量資料包 全年365*24執行的自動觀測裝置5分鐘上報一次觀測資料,每筆記錄約

利用web work實現執行非同步機制,打造頁面單步除錯IDE

我們已經完成了整個編譯器的開發,現在我們做一個能夠單步除錯的頁面IDE,完成本章程式碼後,我們可以實現下面如圖所示功能: 頁面IDE可以顯示每行程式碼所在的行,單擊某一行,在改行前面會出現一個紅點表示斷點,點選Parsing按鈕後,進入單步除錯模式,然後每點一次step按鈕,頁

Linux平臺上用C++實現執行互斥鎖

     在上篇用C++實現了Win32平臺上的多執行緒互斥鎖,這次寫個Linux平臺上的,同樣參考了開源專案C++ Sockets的程式碼,在此對這些給開源專案做出貢獻的鬥士們表示感謝!     下邊分別是互斥鎖類和測試程式碼,已經在Fedora 13虛擬機器上測試通過。

C++實現執行Mutex鎖(Win32)

    本文目的:用C++和Windows的互斥物件(Mutex)來實現執行緒同步鎖。     準備知識:1,核心物件互斥體(Mutex)的工作機理,WaitForSingleObject函式的用法,這些可以從MSDN獲取詳情; 2,當兩個或更多執行緒需要同時訪問一個共享資

gRPC 實現執行非同步服務端

// AsyncServer_Demo.cpp : 定義控制檯應用程式的入口點。 // #include "stdafx.h" #include <memory> #include <iostream> #include <string> #include <thr

《Unity 3D遊戲客戶端基礎框架執行非同步 Socket 框架構建

引言: 之前寫過一個 demo 案例大致講解了 Socket 通訊的過程,並和自建的伺服器完成連線和簡單的資料通訊,詳細的內容可以檢視 Unity3D —— Socket通訊(C#)。但是在實際專案應用的過程中,這個 demo 的實現方式顯得異常簡陋,而且對應

windows程式設計 使用C++實現執行

本文簡單介紹如何在windows程式設計中實現多執行緒類,供大家學習參考,也希望大家指正。 有時候我們想在一個類中實現多執行緒,主執行緒在某些時刻獲得資料,可以“通知”子執行緒去處理,然後把結果返回。下面的例項是主執行緒每隔2s產生10個隨機數,將這10隨機數傳給多執行緒

C++實現執行物件記憶體池帶垃圾回收機制

#include <Windows.h> #include <iostream> #include <map> #include <string> #include <assert.h> #include <

QT C++實現執行通訊--示例程式碼

</pre><p></p><pre name="code" class="cpp">先看測試程式碼:main.cpp#include "mainwindow.h" #include <QApplication> /

可擴充套件執行非同步Socket伺服器框架EMTASS 2.0

0 前言 >>[前言]、[第1節]、[第2節]、[第3節]、[第4節]、[第5節]、[第6節] 在程式設計與實際應用中,Socket資料包接收伺服器夠得上一個經典問題了:需要計算機與網路程式設計知識(主要是Socket),與業務處理邏輯密切(如:包組成

C# 利用執行安全資料結構BlockingCollection實現執行

using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using Danny.Infrastructure.Helper; names

C++11 執行執行共享資料

共享資料的問題 這些在作業系統中都有詳細的介紹,可以回顧作業系統課程。。很典型的就是資料競爭問題。 互斥量保護資料 最原始的方式:使用std::mutex建立互斥量,使用成員lock()加鎖,使用成員unlock()解鎖。但是這種方式需要我們在每個函數出口都呼叫一次unloc

Java併發(十八):阻塞佇列BlockingQueue BlockingQueue(阻塞佇列)詳解 二叉堆(一)之 圖文解析 和 C語言的實現 多執行緒程式設計:阻塞、併發佇列的使用總結 Java併發程式設計:阻塞佇列 java阻塞佇列 BlockingQueue(阻塞佇列)詳解

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。 這兩個附加的操作是:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。當佇列滿時,儲存元素的執行緒會等待佇列可用。 阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者

C# 執行 非同步

一、基本概念 1、程序 首先開啟工作管理員,檢視當前執行的程序: 從工作管理員裡面可以看到當前所有正在執行的程序。那麼究竟什麼是程序呢? 程序(Process)是Windows系統中的一個基本概念,它包含著一個執行程式所需要的資源。一個正在執行的應用程式在作業

C++11執行程式設計 第十章: 使用packaged_task優雅的讓同步函式非同步執行

C++11 Multithreading – Part 10: packaged_task<> Example and Tutorial Varun July 2, 2017 C++11 Multithreading – Part 10: packaged_tas

C++11執行程式設計 第四章: 共享資料和競態條件

C++11 Multithreading – Part 4: Data Sharing and Race Conditions Varun February 21, 2015C++11 Multithreading – Part 4: Data Sharing and Race Con

C++之執行C++11 thread.h檔案實現執行

轉載自: 與 C++11 多執行緒相關的標頭檔案 C++11 新標準中引入了四個標頭檔案來支援多執行緒程式設計,他們分別是<atomic> ,<thread>,<mutex>,<condition_variable>和&l

深入理解javascript非同步程式設計障眼法&&h5 web worker實現執行

0.從一道題說起 var t = true; setTimeout(function(){ t = false; }, 1000); while(t){ } alert('end'); 問,以上程式碼何時alert“end”呢? 測試一下:答案是:

callable介面配合ExecutorService實現執行處理資料,並接收返回值(2018-08-23)

/** * @author chenzhen * Created by chenzhen on 2018/8/22. */ @Data public class QuickPullGit implements Callable<ArrayList&l

c語言 執行的簡單實現 執行

#include <stdio.h> #include <math.h> #include <pthread.h> #include <stdlib.h> #include <string.h> #i