純Socket(BIO)長鏈接編程的常見的坑和填坑套路
阿新 • • 發佈:2017-12-01
狀態 記錄 操作系統 傳遞 p s 一件事 先來 else 線程池 本文章純屬個人經驗總結,偽代碼也是寫文章的時候順便白板編碼的,可能有邏輯問題,請幫忙指正,謝謝。
上面啰嗦了那麽多,現在開始步入正題了 正文
實現自定義的應用層協議,也就是意味著要針對傳輸層協議進行開發,傳輸層有TCP、UDP兩種協議,兩者的區別和適用場景請自行seach,TCP傳輸具有可靠性,UDP傳輸不管數據是否送達,一般選擇TCP,這篇文章也是講的TCP方式。
上面說過了TCP/IP是一種協議,也就是一種約定的東西,那怎麽針對這個約定編程呢?其實操作系統已經做了這件事了,並且很有風度的為我們提供了方便的使用方式(Socket API),也就是我們常說的Socket。用C/C++可以直接調用操作系統的API進行操作,JAVA等需要虛擬機的語言可調用SDK提供的API進行開發。
嗯不說沒用的了,相信很多人已經不耐煩了。我們先來講講套路。用到代碼的地方用偽代碼描述,偽代碼可以方便的傳遞思想邏輯等,嗯,好吧其實是懶(寫博文新手詢問:此處是不是需要賣個萌?)。
真正正文
常見的坑
1、收到的信息不完整,或者比預期更多(半包,粘包)
2、BIO讀寫阻塞導致線程掛起
3、物理鏈路意外斷開,程序不能發覺異常導致掛起
4、多線程共享同一socket導致數據錯亂
5、長時間占用大量空閑socket
使用阻塞式的Socket通信有一些弊端,因此我總結了幾個套路,應該也是一個通訊中間件應該做的事情。
常用套路
1、制定消息格式(解決半粘包)
2、規定通訊工作流程(合理使用隊列、線程池、連接池)
3、加入心跳檢測機制(解決異常斷開導致連接不可用)
4、加入鏈接回收機制(按空閑或超時時間等規則終止鏈接)
5、異常處理(不可復用異常發生時及時關閉連接)
6、即時重發(連接不可用時,即時選用另一條連接重發)
我們針對每一步套路進行構思設計,充分考慮其中潛在的問題和可擴展性。
1、制定消息格式(報文)
說到通訊,就必須要有協議,就像人或者其他動物溝通一樣,人有語言規則,其他動物也有他們的語言規則,機器溝通也需要“語言”規則。
制定報文就是制定機器的語言規則。機器溝通的目的無非就是:獲取數據、發送數據、指令,這兩者都是一方發起請求,一方處理並響應。首先,機器不是人,他們不是那麽智能,不能理解你啥時候能說完話,所以要讓機器B知道機器A的數據發送結束了沒有,就需要讓他們先約定好,說話前,先告訴對方要說多少內容。然後機器A先告訴機器B,我要說一句話(一行),或者是我要說10個字母(10字節),這個時候機器B就可以根據機器A告知的長度去接受機器A的消息了,可解決半包粘包問題。
上面說到了他們要先約定好的一件事情,就是要告訴對方這次要說多少內容。這個約定的規則之一,就是消息長度。
既然有長度這一個信息了,有一就有二,我們順便約定個其他的東西吧。比如,我這次要找你獲取數據,還是發送給你一點數據,還是要讓你執行一個指令還是其他事情。那麽這個約定的規則中又加入一個信息,就是行為標識。
消息長度、行為標識、都是一條信息的基本屬性,那麽還有其他屬性嗎?當然有,這個就看想把這份規則制定的多詳細了,不過也不是越長越好,而是在能解決基本事情後,越簡單越好,畢竟東西多了,一是解析慢,二是數據包也會變大。
說到這裏大家應該也都明白了報文應該怎麽制定,下面給出一個簡單的Socket通訊基本報文格式,大家參考一下。可同時用於請求和響應。
Internet(全球互聯網)是無數臺機器基於TCP/IP協議族相互通信產生的。TCP/IP協議族分了四層實現,鏈路層、網絡層、傳輸層、應用層。
與我們應用開發者接觸最多的應該是應用層了,例如web應用普遍使用HTTP協議,HTTP協議幫助我們開發者做了非常多的事情,通過HTTP足以完成大部分的通信工作了,但是有時候會有一些特殊的場景出現,使得HTTP協議並不能得心應手的完成工作,這個時候就需要我們尋找其他的應用層協議去適應場景了。 在項目啟動初期就要基於業務場景和運行環境選擇適當的傳輸協議,例如常見的發布/訂閱場景也就是推送業務可以使用MQTT等協議,文件傳輸可以用FTP等協議。不過我們這次要說的不是如何選擇通訊協議,而是如何自己實現一套自定的通訊協議。上面啰嗦了那麽多,現在開始步入正題了 正文
順序 | 字段名 | 長度(字節) | 字段類型 | 描述 |
1 | 消息長度 | 4(32bit) | int | socket報文的長度最長2^31-1字節,大文件傳輸不使用此字段 |
2 | 行為標識 | 1(8bit) | byte | 用於分支處理數據1字節可標識256種行為,一般夠用 |
3 | 加密標識 | 1(8bit) | byte | 區分加密方式0不加密 |
4 | 時間戳 | 8(64bit) | long | 消息時間戳,其實也沒啥用,加著玩的,忽視掉吧 |
5 | 消息體 | String |
長度為消息長度-10字節,建議使用json,具體解析行為由行為標識字段定義
|
2、規定通信工作流程
因為BIO通訊不是那麽靈活,所以我建議使用的時候一個Socket連接同時只被一個線程操作,並且同一個ServerSocket只做主動請求或者只做被動接收,這樣能減少網絡因素帶來的一些亂七八糟我也不知道會變成啥玩意的¥@U!%1#% fa23 &%3 9&+……事情發生。其實也可以用消息分包或對socket對象加鎖來用於多線程使用,但我是懶得去處理這樣的事情,沒必要嘛(好吧,其實還是懶)。 工作流程如下: 1、主動端發送數據,發送完後進入讀取狀態,等待響應。 2、被動端線程阻塞等待數據,讀取到長度等前14個字節後進行初步解析,並根據行為標識或加密標識等字段進行處理,處理結束後,響應一個報文,然後繼續等待數據。 代碼思路如下: 關於性能方面可以 使用隊列+線程池+連接池相互配合,這次先不討論這些,想要討論的可以私信我或評論,一起討論。 1、基本封裝/** 消息包(報文) **/ class SocketPackage { int length;// 長度 byte action;// 行為標識 byte encryption;// 加密標識 long timestamp;// 時間戳 String data;// 消息體 /** TODO:將此消息包轉換為適當的byte數組 **/ byte[] toBytes() { byte[] lengthBytes = int2bytes(length); // ...將各個字段都做了轉換成bytes的操作後,合並byte數組並返回 } /** TODO:讀取輸入流轉換成一個消息包 **/ static SocketPackage parse(InputStream in) throws IOException { SocketPackage sp = new SocketPackage(); byte[] lengthBytes = new byte[4]; in.read(lengthBytes);// 未收到信息時此步將會阻塞 sp.length = bytes2int(lengthBytes); // .....其他字段讀取就不寫了,這裏要控制好異常,不要隨意catch住,如果發生異常,不是socket壞了就是報文異常了,應當采用拒絕連接的形式向對方跑出異常 } }
/** 封裝下socket,使其可以保存更多的連接信息,不要糾結名字,我糾結了好一會兒不知道怎麽命名,反正是偽代碼,就這樣寫著吧 **/ class NiuxzSocket { Socket socket; volatile long lastUse;// 上次使用時間 // ...這裏還可以再加其他屬性,比如是否是寫狀態,寫操作開始時間,上次非心跳包時間等 NiuxzSocket(Socket socket) { this.socket = socket; this.lastUse = System.currentTimeMillis(); } InputStream getIn() { return socket.getInputStream(); } void write(byte[] bytes) throws IOException { this.socket.getOutputStream().write(bytes); } }
2、主動端: 主動端的核心是連接池SocketPool和SocketClient服務 大概流程是調用SocketClient發送數據包,SocketClient從連接池中獲取一個可用連接,如果沒有可用連接,就創建一個。SocketClient根據業務類型或消息類型分別對NiuxzSocket進行操作。
/** 封裝一個發送信息的接口,提供常用的發送信息方法。 **/ interface SocketClient { SocketPackage sendData(SocketPackage sp);// 發送一個消息包,並等待返回的消息包 // TODO:還可以根據雙方的業務和協議添加幾個更方便使用的接口方法。比如只返回消息體字段,或者直接返回json內容的 void sendHeartBeat(NiuxzSocket socket);// 發送一個心跳包,這個方法後面講心跳包時會用到 } class DefaultSocketClient implements SocketClient { SocketPool socketPool;// 先假裝有一個socket連接池,用來管理socket。不使用連接池的話,在這裏直接註入一個NiuxzSocket就可以了。下面代碼中也直接使用socket,但是一定要在使用時進行加鎖操作。否則就會造成多線程訪問同一個socket導致數據錯亂了。 /** 此方法就是主動端工作入口了,業務代碼可以直接調用這裏進行發送數據 **/ SocketPackage sendData(SocketPackage sp){ NiuxzSocket niuxzSocket = socketPool.get();//獲取一個socket,這裏可以看到獲取的socket並不是原生的socket,其實是我們自己封裝後的socket try{ niuxzSocket.write(sp.toBytes());//阻塞持續寫到緩存中 niuxzSocket.lastUse = System.currentTimeMillis();//根據業務方法更新socket的狀態信息 SocketPackage sp = SocketPackage.parse(niuxzSocket.getIn());//阻塞讀,等待消息的返回,因為是單線程操作socket所以不存在消息插隊的情況。 return sp; }catch(Exception e){ LOG.error("發送消息包失敗",e); socketPool.destroy(niuxzSocket) //在發生不可復用的異常時才關閉socket,並銷毀這個NiuxzSocke。不可復用異常意思是IO操作到了一半不知道具體到哪了所以整個socket都不可用了。 } finally{ if(socketPool!=null){ socketPool.recycle(niuxzSocket );//使用完這個socket後我們不要關閉,因為還要復用,讓連接池回收這個socket。recycle內要判斷socket是否是銷毀狀態。 } } } }
/** 定義一個連接池接口SocketPool **/ interface SocketPool { /** 獲取一個連接 **/ NiuxzSocket get(); /** 回收Socket **/ void recycle(NiuxzSocket ns); /** 銷毀Socket **/ void destroy(NiuxzSocket ns); } /** 實現連接池 **/ class DefaultSocketPool implements SocketPool { BlockingQueue<NiuxzSocket> sockets;// 存放socket的容器,也可以使用數組 NiuxzSocket get() { // TODO:池裏有就獲取,沒有就開一個線程去創建 並且等待創建完成,可使用synchronized/wait或Lock/condition } // TODO:實現socketPool,實現連接池是屬於性能可靠性優化,要做的事情會比較多。偷個懶,大家懂就好,具體實現,等有時間我把我的連接池代碼整理後再寫一篇文章,有想了解的可以給我評論討論下。 }3、被動端 被動端的核心是NiuxzServer和Worker和SocketHandler 大概流程是開啟端口等待連接、接受連接創建線程、達到線程最大數,拒絕連接、連接進入開始讀取數據、讀取到數據後進行分支處理,處理完後把結果響應到主動端,完成一次交互。繼續讀取。
/**開啟一個ServerSocket並等待連接,聯入後開啟一個線程進行處理**/ class NiuxzServer{ ServerSocket serverSocket; HashMap<NiuxzSocket> sockets = new HashMap<NiuxzSocket>(); public static AtomicInteger workerCount = 0; public Object waitLock = new Object(); int maxWorkerCount = 100;//允許100個連接進入 int port;//配置一個端口號 /**工作入口**/ void work(){ serverSocket = new ServerSocket(port); while(true){ Socket socekt = serverSocket.accept();//阻塞等待連接 NiuxzSocket niuxzSocket = new NiuxzSocket(socket); sockets.put(niuxzSocket ,1);//將連接放入map中 Worker worker = new Worker(niuxzSocket );//創建一個工作線程 worker.start();//開始線程 while(true){ if(workerCount.incrementAndGet()>=maxWorkerCount){//如果超過了規定的最大線程數,就進入等待,等待其他連接銷毀 synchronized(waitLock){ if(workerCount.incrementAndGet()>=maxWorkerCount){//double check 確定進入等待前沒有正在斷開的socket waitLock.wait(); }else{ break; } } }else{ break; } } } } /**銷毀一個連接**/ void destroy(NiuxzSocket socket){ synchronized(waitLock){ sockets.remove(socket);//從池子裏刪除 workerCount.decrementAndGet();//當前連接數減一 waitLock.notify();//通知work方法 可以繼續接受請求了 } } /**創建一個工作者線程類,處理連入的socket**/ class Worker extends Thread{ HashMap<Integer,SocketHandler> handlers;//針對每種行為標識做的消息處理器。 NiuxzSocket socket; Worker(NiuxzSocketsocket){//構造函數 this.socket = socket; } void run(){ try{ while(true){ SocketPackage sp = SocketPackage.parse(socket.getIn());//阻塞讀,直到讀完一個消息包未知,這樣可以解決粘包或半包的問題 SocketHandler handler = handlers.get(sp.getAction());//根據行為標識獲取響應的處理器 handler.handle(sp,socket);//處理結果和響應信息都在handler中回寫 } }cache(Exception e){ LOG.error("連接異常中斷",e); NiuxzServer.destroy(socket); } } } }
/** 創建一個消息處理器 SocketHandler 接收所有內容後 回顯 **/ class EchoSocketHandler implements SocketHandler { /** 處理socket請求 **/ void handle(SocketPackage sp, NiuxzSocket socket) { sp.setAction(10);// 比如協議中的行為標識10是響應成功的意思 socket.write(sp.toBytes());// 直接回寫 } }
至此兩端的工作代碼已經初步完成。socket可以按照相互制定的通訊方式進行通訊了。
3、心跳機制:
心跳機制socket長鏈接通訊中不可或缺的一個機制。主動端可以檢測socket是否存活,被動端可以檢測對方是否還在線。因為有時候網絡並不一定那麽完美,會出現鏈路上的異常,此時應用層可能並不能發現問題,等下次再用這個連接的時候就會拋出異常了,如果是被動端,還會白白占用著一個線程,不如在那之前就發現一部分異常,並銷毀連接,下次通訊時出錯的概率就降低了很多,被動端也會釋放線程,釋放資源。
代碼可以這樣實現: 主動端: 做一個定時任務遍歷判斷連接池中所有連接的上次使用時間是否超過心跳包間隔時間,超過了就取出這個socket並開啟一個線程(最好使用使用線程池),在線程中發送一個心跳包。@Scheduled(fixedDelay=30*1000)//延時30秒執行一次 void HeartBeat(){ for(NiuxzSocket socket:socketPool.getAllSocket()){ if(System.curTime() - socket.getLastUse() > 30*1000){//如果系統時間減上次使用時間大於30秒 //開啟線程,從連接池中取出這個連接remove(socket)移除成功再繼續操作,保證不會有其他線程同時使用這個socket。發送一個SocketPackage,socketClient.sendHeartBeat() if(socketPool.remove(socket)){ socketClient.snedHeartBeat(socket);//socketClient.snedHeartBeat這個方法實現:行為標識設置為心跳包,比如規定1就是心跳包。完事回收這個鏈接socketPool.recycle(socket),但當中間反生異常,則代表這個連接不可用了,就銷毀socketPool.destroy(socket)。 } } } }被動端: 跟主動端一樣,定時掃描連接池,但是發現超過規定的空閑超時時間的連接時不發送心跳包而是直接銷毀,關閉socket後,正在read的線程就會讀取到EOF(-1),停止線程。規定的超時時間一定要大於約定的心跳包的間隔時間。 4、即時重發: 優化SocketClient,在每次發送的時候,如果發生異常,銷毀當前socket後,再次執行一次或兩次即可。重試幾次後如果不行再把異常拋出。 5、完善填坑: 通過上面的工作,我們其實已經解決了問題1、3、4了。通過報文制定信息長度解決半包粘包問題,通過客戶端的連接池或操作socket加鎖的方式解決多線程訪問socket時會造成數據錯亂的問題(好吧,加鎖誰不會呢。。所以推薦使用連接池的方式,提高吞吐量)。 還有問題2、5。其實我們可以通過一個Sokcet健康檢測任務(也可與心跳檢測任務合並,把心跳任務的延遲時間改為100ms或者更低)去遍歷連接池,判斷每個連接的信息,挨個判斷每個狀態是否異常,然後再決定要不要關閉socket。 比如問題1,可能在讀寫操作時對方卡死,導致很久不處理任務或者對方掛起了,壓根不會繼續接收或回寫信息了,這時如果有一個超時機制就比較好了,幸運的是java的socket是有setSoTimeout方法的,可以設置read的超時時間,給主動端設置個30s,被動端遲遲不響應,就會拋出超時異常,這時候我們就銷毀這個socket了。 但是,java的socket沒有提供write的超時設置,那給被動端寫數據時,被動端接收巨緩慢或者出了什麽問題導致壓根不接收數據了,就會導致這個寫入線程一直掛起。我們當然不希望發生這樣的事情,那麽我們可以在write之前記錄下當前時間並把socket變為正在寫出狀態,然後在Sokcet健康檢測任務中判斷這個socket是否是寫出狀態並且時間是否超過xx秒,來決定是否關閉這個socket。 空閑socket關閉就更簡單了,在NiuxzSocket再加一個上次非心跳包發送時間,然後在健康檢測任務中進行判斷就可以了。
以上便是我用同步socket實現第一版分布式文件系統時總結的經驗,有些問題其實在NIO中變得不是問題了。NIO和AIO更適合會持有大量連接的服務器端。
純Socket(BIO)長鏈接編程的常見的坑和填坑套路