1. 程式人生 > >NIO結合Socket程式設計實現

NIO結合Socket程式設計實現

基本概念

Socket又稱“套接字”,應用程式通過“套接字”向網路發出請求或者應答網路請求。Socket和SocketServer類庫位於java.net包中,ServerSocket用於伺服器端,Socket是建立網路連結使用的。在連線成功時,應用程式兩端會產生一個Socket例項,操作這個例項,完成所需的會話。對於一個網路連線來說,套接字是平等的,不因為在伺服器端或在客戶端而產生不同的級別。不管是Socket還是ServerSocket它們的工作都是通過SocketImpl類及其子類完成的。套接字之間的連線過程可以分為四個步驟:

  • (1)伺服器監聽:是伺服器端套接字並不定位具體的客戶端套接字,而是出於平等連線的狀態,實時監控網路狀態。
  • (2)客戶端請求:指由客戶端的套接字提出連線請求,要連線的目標是伺服器端的套接字。為此,客戶端的套接字必須首先描述它要連線的伺服器套接字,指出伺服器套接字的地址和埠號,然後向伺服器端套接字提出連線請求。
  • (3)伺服器連線確認:當伺服器端套接字接收到客戶端的套接字的連線請求,它就響應客戶端套接字的請求,建立一個新的執行緒,把伺服器端套接字的描述發給客戶端。
  • (4)客戶端連線確認:一旦客戶端確認了此描述,連線就建立好了,雙方開始進行通訊。而伺服器端套接字出於監聽狀態,繼續接受其他客戶端套接字的連線請求。

IO(BIO)和NIO的區別,其本質就是阻塞和非阻塞的區別。

  • 阻塞:應用程式在獲取網路資料的時候,如果網路傳輸資料很慢,那麼程式就一直等待,直到傳輸完畢為止。
  • 非阻塞:應用程式直接可以獲取已經準備就緒的資料,無須等待。

IO為同步阻塞形式,NIO為同步非阻塞形式。NIO並沒有實現非同步,在JDK1.7之後,升級了NIO庫包,支援非同步非阻塞通訊模型即NIO2.0(AIO)。同步和非同步一般是面向作業系統與應用程式對IO操作的層面來區別。

  • 同步時:應用程式會直接參與IO讀寫操作,並且我們的應用程式會直接阻塞到某一個方法上,直到資料準備就緒;或者採用輪詢的策略實時檢查資料的就緒狀態,如果就緒則獲取資料。
  • 非同步時:則所有的IO讀寫操作交給作業系統處理,與我們的應用程式沒有直接關係,我們程式不需要關係IO讀寫,當作業系統完成IO讀寫操作時,會給我們的應用程式傳送通知,我們的應用程式直接拿走資料即可。

同步說的是server服務端的執行方式,阻塞說的是具體的技術,接收資料的方式、狀態(io,nio)。

public class Server {

	final static int PORT = 8888;
	
	public static void main(String[] args) {
		ServerSocket server = null;
		try{
			server = new ServerSocket(PORT);
			System.out.println("server start...");
			//進行阻塞
			Socket socket = server.accept();
			//新建一個執行緒執行客戶端的任務
			new Thread(new ServerHandler(socket)).start();
		} catch (Exception e){
			e.printStackTrace();
		} finally{
			if(server != null){
				try {
					server.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			server = null;
		}
	}

}
public class ServerHandler implements Runnable {

	private Socket socket;
	
	public ServerHandler(Socket socket){
		this.socket = socket;
	}
	
	@Override
	public void run() {
		BufferedReader in = null;
		PrintWriter out = null;
		
		try {
			in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
			out = new PrintWriter(this.socket.getOutputStream());
			String body = null;
			while(true){
				body = in.readLine();
				if(null == body) break;
				System.out.println("Server: " + body);
				out.println(body);
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				in.close();
				out.close();
				socket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		
	}

}
public class Client {

	final static String ADDRESS = "127.0.0.1";
	final static int PORT = 8888;
	
	public static void main(String[] args) {
		Socket socket = null;
		BufferedReader in = null;
		PrintWriter out = null;
		
		try {
			socket = new Socket(ADDRESS, PORT);
			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			out = new PrintWriter(socket.getOutputStream(),true);
			
			//向伺服器端傳送資料
			out.println("接受到客戶端的請求資料...");
			String response = in.readLine();
			System.out.println("Client: " + response);
		} catch (UnknownHostException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally{
			try {
				in.close();
				out.close();
				socket.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

}

傳統的BIO(Blocking IO)程式設計

網路程式設計的基本模型是Client/Sever模型,也就是兩個程序直接進行相互通訊,其中服務端提供配置資訊(繫結埠的ip地址和監聽埠),客戶端通過連線操作箱伺服器端監聽的地址發起連線請求,通過三次握手建立連線,如果連線成功,則雙方即可以進行通訊(網路套接字socket)。

偽非同步IO

採用執行緒池和任務佇列可以實現一種偽非同步IO通訊框架。在學習過連線池和佇列的使用,我們將客戶端的socket封裝成一個task任務(實現Runnable介面的類)然後投遞到執行緒池中去,配置相應的佇列進行實現。

/**
 * 修改Server,增加HandlerExecutorPool處理資料
 * */
public class Server {

	final static int PORT = 8888;
	
	public static void main(String[] args) {
		ServerSocket server = null;
		Socket socket = null;
		try{
			server = new ServerSocket(PORT);
			System.out.println("server start...");
			HandlerExecutorPool executorPool = new HandlerExecutorPool(50,1000);
			while(true){
				socket = server.accept();
				executorPool.execute(new ServerHandler(socket));
			}
		} catch (Exception e){
			e.printStackTrace();
		} finally{
			if(server != null){
				try {
					server.close();
					socket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			server = null;
			socket = null;
		}
	}

}
public class HandlerExecutorPool {

	private ExecutorService executor;
	
	public HandlerExecutorPool(int maxPoolSize,int queueSize){
		this.executor = new ThreadPoolExecutor(
				Runtime.getRuntime().availableProcessors(),
				maxPoolSize,
				120L, 
				TimeUnit.SECONDS,
				new ArrayBlockingQueue<Runnable>(queueSize));
	}
	
	public void execute(Runnable task){
		this.executor.execute(task);
	}
}

NIO程式設計

NIO(Non-Block IO),即非阻塞IO。它包含以下幾個概念:

  • Buffer(緩衝區)
  • Channel(管道、通道)
  • Selector(選擇器、多路複用器)

Buffer

Buffer是一個物件,它包含一些要寫入或者讀取的資料。在NIO類庫中加入Buffer對夏寧,體現了新庫與原IO的一個重要區別。在面向流的IO中,可以將資料直接寫入或讀取到Stream物件中。在NIO庫中,所有的資料都是用緩衝區處理的。緩衝區實質上是一個數組,通常它是一個位元組陣列(ByteBuffer),也可以使用其他型別的陣列。這個陣列為緩衝區提供了資料的訪問讀寫等操作屬性,如位置、容量、上限等概念。我們常用的就是ByteBuffer,實際上每一種java基本型別都對應了一種緩衝區(除了Boolean型別)。

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
public class TestBuffer {
	
	public static void main(String[] args) {
		
		// 1 基本操作
		
		//建立指定長度的緩衝區
		IntBuffer buf = IntBuffer.allocate(10);
		buf.put(13);// position位置:0 - > 1
		buf.put(21);// position位置:1 - > 2
		buf.put(35);// position位置:2 - > 3
		//把位置復位為0,也就是position位置:3 - > 0
		buf.flip();
		System.out.println("使用flip復位:" + buf);
		System.out.println("容量為: " + buf.capacity());	//容量一旦初始化後不允許改變(warp方法包裹陣列除外)
		System.out.println("限制為: " + buf.limit());		//由於只裝載了三個元素,所以可讀取或者操作的元素為3 則limit=3
		
		
		System.out.println("獲取下標為1的元素:" + buf.get(1));
		System.out.println("get(index)方法,position位置不改變:" + buf);
		buf.put(1, 4);
		System.out.println("put(index, change)方法,position位置不變:" + buf);;
		
		for (int i = 0; i < buf.limit(); i++) {
			//呼叫get方法會使其緩衝區位置(position)向後遞增一位
			System.out.print(buf.get() + "\t");
		}
		System.out.println("buf物件遍歷之後為: " + buf);
		
		
		// 2 wrap方法使用
		/**
		//  wrap方法會包裹一個數組: 一般這種用法不會先初始化快取物件的長度,因為沒有意義,最後還會被wrap所包裹的陣列覆蓋掉。 
		//  並且wrap方法修改緩衝區物件的時候,陣列本身也會跟著發生變化。                     
		int[] arr = new int[]{1,2,5};
		IntBuffer buf1 = IntBuffer.wrap(arr);
		System.out.println(buf1);
		
		IntBuffer buf2 = IntBuffer.wrap(arr, 0 , 2);
		//這樣使用表示容量為陣列arr的長度,但是可操作的元素只有實際進入快取區的元素長度
		System.out.println(buf2);
		*/
		
		
		// 3 其他方法
		/**
		IntBuffer buf1 = IntBuffer.allocate(10);
		int[] arr = new int[]{1,2,5};
		buf1.put(arr);
		System.out.println(buf1);
		//一種複製方法
		IntBuffer buf3 = buf1.duplicate();
		System.out.println(buf3);
		
		//設定buf1的位置屬性
		//buf1.position(0);
		buf1.flip();
		System.out.println(buf1);
		
		System.out.println("可讀資料為:" + buf1.remaining());
		
		int[] arr2 = new int[buf1.remaining()];
		//將緩衝區資料放入arr2陣列中去
		buf1.get(arr2);
		for(int i : arr2){
			System.out.print(Integer.toString(i) + ",");
		}
		*/
		
	}
}

Channel

通道(Channel),他就像自來水管道一樣,網路資料通過Channel讀取和寫入,通道與流不同之處在於通道是雙向的,而流只是一個方向上流動(一個流必須是InputStream或者OutputStream的子類),而通道可以用於讀、寫或者二者同時進行,最關鍵的是可以與多路複用器結合起來,有多種狀態為,方便多路複用器去識別。事實上通道分為兩大類,一類是網路讀寫的(SelectableChannel),一類是用於檔案操作的(FileChannel),我們使用的SocketChannel和ServerSocketChannel都是SelectableChannel的子類。

Selector

多路複用器(Selector),它是NIO程式設計的基礎,非常重要。多路複用器提供選擇已經就緒的任務的能力。簡單說,就是Selector會不斷地輪詢註冊在其上的通道(Channel),如果某個通道發生了讀寫操作,這個通道就出於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以取得就緒的Channel集合,從而進行後續的IO操作。一個多路複用器(Selector)可以輔助成千上萬的Channel通道,沒有上限,這也是JDK使用了epoll代替了傳統的select實現,獲得連線控制代碼沒有限制。這也就意味著我們只要一個執行緒負責Selector的輪詢,就可以接入成千上萬個客戶端,這是JDK NIO庫的巨大進步。 Selector執行緒就類似一個管理者(Master),管理了成千上萬個管道,然後輪詢哪個管道的資料已經準備好,通知cpu執行IO的讀取或寫入操作。Selector模式:當IO事件(管道)註冊到選擇器以後,Selector會分配個每個管道一個key值,相當於標籤。Selector選擇器是以輪詢的方式進行查詢註冊所有IO事件(管道),當我們的IO事件(管道)準備就緒後,Selector就會識別,會通過key值來找到對應的管道,進行相關的資料處理操作(從管道里讀或寫資料,寫到我們的資料緩衝區中)。 每個管道都會對選擇器進行註冊不同的時間狀態,以便於選擇器查詢:

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE
/**
 * 實現Runnable介面是為了註冊到Selector中一直處於輪詢的狀態
 * */
public class Server implements Runnable {

	//1.多路複用器(管理所有的通道)
	private Selector selector;
	//2.建立緩衝區
	private ByteBuffer readBuf = ByteBuffer.allocate(1024);
	
	public Server(int port){
		
		try {
			//1.開啟多路複用器
			this.selector = Selector.open();
			//2.開啟服務端通道
			ServerSocketChannel ssc = ServerSocketChannel.open();
			//3.設定服務端通道為非阻塞模式
			ssc.configureBlocking(false);
			//4.繫結地址
			ssc.bind(new InetSocketAddress(port));
			//5.把服務端通道註冊到多路複用器上,並且監聽阻塞事件
			ssc.register(selector, SelectionKey.OP_ACCEPT);
			
			System.out.println("Server start, prot: " + port);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	@Override
	public void run() {
		while(true){
			try {
				//1.讓多路複用器開始監聽
				this.selector.select();
				//2.返回多路複用器已經選擇的結果集
				Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
				//3.進行響應
				while(keys.hasNext()){
					//4.獲取一個選擇元素
					SelectionKey key = keys.next();
					//5.直接從容器中移除
					keys.remove();
					//6.如果key有效
					if(key.isValid()){
						//7.如果為阻塞狀態
						if(key.isAcceptable()){
							this.accept(key);//這裡的key就是伺服器端的Channel的key
						}
						//8.如果為可讀狀態
						if(key.isReadable()){
							this.read(key);
						}
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	private void read(SelectionKey key) {
		try {
			//1.清空緩衝區舊的資料
			this.readBuf.clear();
			//2.獲取之前註冊的socket通道物件
			SocketChannel sc = (SocketChannel) key.channel();
			//3.讀取資料
			int count = sc.read(this.readBuf);
			//4.如果沒有資料
			if(count == -1){
				key.channel().close();
				key.cancel();
				return;
			}
			//5.有資料則進行讀取,讀取之前需要進行復位方法(把position和limit進行復位)
			this.readBuf.flip();
			//6.根據緩衝區的資料長度建立相應大小的byte陣列,接收緩衝區的資料
			byte[] bytes = new byte[this.readBuf.remaining()];
			//7.接收緩衝區資料
			this.readBuf.get(bytes);
			//8.列印結果
			String body = new String(bytes).trim();
			System.out.println("Server: " + body);
			//9.寫回給客戶端
			//...
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void accept(SelectionKey key) {
		try {
			//1.獲取服務端通道
			ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
			//2.執行客戶端Channel的阻塞方法
			SocketChannel sc = ssc.accept();
			//3.設定阻塞模式
			sc.configureBlocking(false);
			//4.註冊到多路複用器上,並設定讀取標識
			sc.register(this.selector, SelectionKey.OP_READ);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	
	public static void main(String[] args) {
		new Thread(new Server(8765)).start();
	}
}
public class Client {

	public static void main(String[] args) {
		//建立連線地址
		InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8765);
		
		//宣告連線通道
		SocketChannel sc = null;
		
		//建立緩衝區
		ByteBuffer buf = ByteBuffer.allocate(1024);
		
		try {
			//開啟通道
			sc = SocketChannel.open();
			//進行連線
			sc.connect(address);
			while(true){
				//定義一個位元組陣列,然後使用系統錄入的功能
				byte[] bytes = new byte[1024];
				System.in.read(bytes);
				
				//把資料放到緩衝區
				buf.put(bytes);
				//復位
				buf.flip();
				//寫出資料
				sc.write(buf);
				//清空緩衝區資料
				buf.clear();
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				sc.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		
	}
}