1. 程式人生 > >Socket服務端,長連線,心跳包,自動釋放掉線資源,可擴充套件處理資料介面

Socket服務端,長連線,心跳包,自動釋放掉線資源,可擴充套件處理資料介面

頭文:

最近看網上寫Sokect程式設計,好多都寫的沒法擴充套件,不能複用,既然是面向物件,就多多少少應該有面向物件的思想,所以自己下午寫了一個,沒優化,有不好的地方請指出.

解析:

Socket的基本原理這裡就不講了,只貼程式碼和解釋程式碼,不會講很細.

大致分為四個部分,3個部分都是一個單獨的執行緒.1個部分為雜湊表共享資源

服務模組:SocketService,用於接受客戶端連線.

檢測模組:Sendheart,用於傳送心跳包,客戶端超時或掉線,此模組會自動資源清除.

資料處理模組:DealDataThread,用於所以客戶端資料,可以實現SocketDealDate來實現自己處理邏輯,若不實現,這會發什麼就返回客戶端ID加發送資料.

共享資源:Hashtable,三個模組共享,儲存Socket連線資源.

程式碼部分:

服務模組:沒寫註釋,sorry.

public class SocketService extends Thread{
	
	final int port = 8081;
	
	ServerSocket server;
	
	Hashtable<String,ClientThead> Clients;
	
	private int number;
	
	//資料接收佇列
	BlockingQueue<byte[]> ReviceQueue;
	
	DealDataThread DealData;
	
	Sendheart sendheart;
	
	private int length;
	
	public SocketService(int limit,int length,int sleep,DealDataThread DealData) throws IOException{
		 
		try {
			
		 server  = new ServerSocket(port);
		 
		 Clients = new Hashtable<String,ClientThead>();
		 
		 this.length = length;
		 
		 ReviceQueue = new ArrayBlockingQueue(limit);
		 
		 DealData  = new DealDataThread();
		 
		 DealData.setQueue(ReviceQueue);
		 
		 DealData.setClients(Clients);
		 
		 DealData.start();
		 
		 sendheart = new Sendheart(Clients,sleep);
		 
		 sendheart.start();
		 
		}
		catch(Exception ex) {
			
			throw  ex;
			
		}
	}
	
	 public void run() {
		 
		 while(true) {
			 
			 try {
			 
			   Socket client =  server.accept();
			   
			   ClientThead thread = new ClientThead(String.valueOf(number),ReviceQueue,client,length);
			   
			   Clients.put(String.valueOf(number), thread);
			   
			   thread.start();
			   
			   number++;
			   
			 }catch(Exception ex) {
				 
				 System.out.print( ex.toString() );
				 
			 }
			 
			 
		 }
		 
	 }
}

檢測模組:沒寫註釋,sorry.

public class Sendheart extends Thread  {
	
	private Hashtable<String,ClientThead> clients;
	
	private int  sleep;
	
	public Sendheart(Hashtable<String,ClientThead> clients,int sleep) {
		
		this.clients = clients;
		
		this.sleep =sleep;
		
	}
	
   public void run() {
    	
    	try {
    		while(true) {
    			
    			ArrayList<String> List = new ArrayList(); 
    			for(java.util.Iterator<String> bean = clients.keySet().iterator();bean.hasNext();) {
    				
    				String  key   =   bean.next();
    				try {
    					
    					ClientThead value =   clients.get(key);
    					
    					value.send(new byte[] {1,2,3,4,5,6,7,8,9});
    					
    				}catch(Exception ex){
    					List.add(key);
    				}
    			}
    			
    			for(String Id:List) {
    				
    				ClientThead value =   clients.get(Id);
    				
    				value.close();
    				
    				clients.remove(Id);
    				
    				
    			}
    			
    			Thread.sleep(sleep);
    		}
			 	
    	}catch(Exception ex) {
    		
    	}
   }

}

資料處理模組:沒寫註釋,sorry.

public class DealDataThread extends Thread{
	

	private SocketDealDate  dateDeal;
	


	private BlockingQueue<byte[]> Queue;
	
	private Hashtable<String,ClientThead> clients;
		
	public void run() {
    	
    	try {
    		
    		while(true) {
    		
		      byte[] data = Queue.take();
		      
		      if(dateDeal == null) {
		    	  
		          ClientThead client = clients.get(String.valueOf(data[0]));
		      
		          client.send(data);
		      }else {
		    	  dateDeal.dealDate(data);
		      }
		      
    		}
			 	
    	}catch(Exception ex) {
    		
    	}
    }
	
	public void setQueue(BlockingQueue<byte[]> queue) {
		Queue = queue;
	}

	public void setClients(Hashtable<String, ClientThead> clients) {
		this.clients = clients;
	}
	
	public void setDateDeal(SocketDealDate dateDeal) {
		this.dateDeal = dateDeal;
	}

}

每個socket客戶端:

public class ClientThead extends Thread{
	
	//客戶連線ID
	private String clientID;
	
	//傳送資料失敗次數,連續傳送三次
	private int sendfailSum;
	
	private Socket client; 
    
	private InputStream  reader; 
    
	private OutputStream out;
	
	private BlockingQueue Queue;
	
	private int length;
    
    
    public ClientThead(String ID,BlockingQueue<byte[]> Queue,Socket socket,int length) throws IOException{
    	
    	try {
    	  this.clientID = ID;
    	
    	  this.Queue    = Queue;
    	
      	  this.client   = socket;
      	  
      	  this.length = length;
    	
    	  reader  = socket.getInputStream();
    	
    	  out     = socket.getOutputStream();
    	  
    	}catch(IOException ex) {
    		
    		throw ex;
    		
    	}

    }
    
    public void run() {
    	
    	
    		while(true) {
    			try {
    				
                   byte[] b=new byte[length]; 
			 
			       reader.read(b);
			   
			       byte[] head = new byte[] { (byte)Integer.parseInt(clientID) };
			   
			       Queue.put(Tool.addBytes(head, b));
    			}
    	        catch(InterruptedException ex) {
    		     try {
    			    if(reader != null) {
    				  
    				  reader.close();
    				  
    			    }
    			  if(out != null) {
    				  out.close();
    			  }
    			  if(client != null) {
    				  
    				  client.close();
    			  }
    			  System.out.print(clientID+ "號客戶端斷開資源已釋放");
    			}catch(IOException exOne) {
    				System.out.print(exOne.toString());
    			}
    		    break;
    	        }
    			
    	    	catch(Exception e)
    			{
    				System.out.println(e.toString());
    				
    			}
    		}
    }
    
    public void send(byte[] data) throws Exception{
    	
    	try {
    	  synchronized(out) {
    		  
    		out.write(data);
    		
    		sendfailSum = 0;
    	  }
    	}catch(IOException ex) {
    		
    		sendfailSum++;
    		
    		if(sendfailSum > 3) {
    			
    			throw new Exception("客戶端已關閉");
    			
    		}
    	}

    }

    
	public String getClientID() {
		return clientID;	
	}

	public void setClientID(String clientID) {
		clientID = clientID;
	}
	
	
	
	public void close() {
		this.interrupt();
	}

}

思路:

大致說一下思路:

​
	public static void main(String[] args) throws Exception {
		
		DealDataThread dealDate = new DealDataThread();
		
		SocketService Service= new SocketService(1000*100,2048,1000,dealDate,8081);
		
		Service.start();
	}
​

先建立一個處理資料執行緒,可以呼叫     DealDataThread.setDateDeal(SocketDealDate dateDeal) ; SocketDealDate 是一個介面,自己實現方法的dealDate的方法,把自己實現的處理類注入到DealDataThread中,可以處理自己邏輯了.

public interface SocketDealDate {
	
	   void  dealDate(byte[] date);
	   
}

在SocketService 的構造方法中,五個引數分別是:

接受資料佇列大小,

接受客戶端最大包的長度,

傳送心跳包時間間隔,

處理資料實現模組,

服務端的埠號.

這五個引數可以根據自己的實際情況去調整.

將SocketService執行緒開啟之後,當新的客戶端接入之後,就會開啟一個新的客戶端執行緒,用來接受資料.客戶端接受的資料都會在資料之前加上客戶端分配的ID,與雜湊表的鍵值相同,

而客戶端需要注意的就是資源釋放,因為Socket是阻塞型資料接受,所以要採用異常觸發,當異常觸發時,跳出while(true){} ,釋放輸入,輸出,Sokcet資源,然後執行緒結束,垃圾回收.(看不明白,對照前面完成程式碼看.)

    	        catch(InterruptedException ex) {
    		     try {
    			    if(reader != null) {
    				  
    				  reader.close();
    				  
    			    }
    			  if(out != null) {
    				  out.close();
    			  }
    			  if(client != null) {
    				  
    				  client.close();
    			  }
    			  System.out.print(clientID+ "號客戶端斷開資源已釋放");
    			}catch(IOException exOne) {
    				System.out.print(exOne.toString());
    			}
    		    break;
    	        }
    			
    	    	catch(Exception e)
    			{
    				System.out.println(e.toString());
    				
    			}

然後就是資源釋放模組:當連續傳送資料3次失敗後,就會將hashTable資源清除.

我採用的都是byte型資料,與底層裝置做對接時,底層大部分都只能處理byte型資料,採用別的資料格式型別,有可能會造成底層無法解析.與電腦通訊不用注意這些.

*****重點,Java的byte型範圍是-128-127之間,當底層發大於127的byte的資料時,Java接受都會變為複數,自己可以將其轉為int資料進行處理,處理方法這裡不做詳解

最後一個就是資料處理模組,關鍵問題就是資料佇列中哪個資料是哪個客戶端傳送過來的問題,雜湊表的鍵值就是客戶端的ID值,資料第一位就是客戶端的ID,最大隻能表示到127,所以理論上做多隻能連線127個裝置,若想要連線更多裝置,用前四位儲存ID.

總結:

本次程式碼名字起的不好,寫的急,各位看官請見諒.