1. 程式人生 > >偽非同步I/O網路程式設計模型

偽非同步I/O網路程式設計模型

       為了解決同步阻塞I/O面臨的一個鏈路需要一個執行緒處理的問題,後來有人對它的執行緒模型進行了優化—後端通過一個執行緒池來處理多個客戶端等請求接入,形成客戶端個數M;執行緒池最大執行緒數N的比例關係,其中M可以遠遠大於N。通過執行緒池可以靈活地調配執行緒資源,設定執行緒的最大值,防止由於海量併發接入導致執行緒耗盡。採用執行緒池和任務佇列可以實現一種叫做偽非同步的I/O通訊框架,它的模型圖如圖示。

          

       當有新的客戶端接入時,將客戶端的Socket封裝成一個Task(該任務實現java.lang.Runnable介面)投遞到後端的執行緒池中進行處理,JDK的執行緒池維護一個訊息佇列和N個活躍執行緒,對訊息佇列中的任務進行處理。由於執行緒池可以設定訊息佇列的大小和最大執行緒數,因此,它的資源佔用是可控的,無論多少個客戶端併發訪問,都不會導致資源的耗盡和宕機,相比於傳統的一連線一執行緒模型,是一種改良。接下來展示該模型下的模型客戶端和伺服器端的實現。

(1)伺服器端

package cn.edu.hust.app.pio;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * 
 * @author chenqiang
 * 伺服器端
 */
public class TimeServer {

	public static void main(String[] args) {
		int port = 8080;
		if (args != null && args.length > 0) {
			try {
				port = Integer.valueOf(args[0]);
			}catch (Exception e) {
				// TODO: handle exception
			}
		}
		
		ServerSocket server = null;
		try {
			server = new ServerSocket(port);
			System.out.println("Th time server is starting in port: " + port);
			Socket socket = null;
			TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(50, 10000); //執行緒池建立
			while (true) {
				socket = server.accept();
				singleExecutor.execute(new TimeServerHandle(socket)); //由執行緒池來執行服務執行緒
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if (server != null) {
				System.out.println("The time server close");
				try {
					server.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				server = null;
			}
		}
	}
}
package cn.edu.hust.app.pio;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Date;

/**
 * 
 * @author chenqiang
 * 服務執行緒
 */
public class TimeServerHandle implements Runnable{

	private Socket socket;
	
	
	
	public TimeServerHandle(Socket socket) {
		super();
		this.socket = socket;
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		BufferedReader in = null;
		PrintWriter out = null;
		try {
			in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
			out = new PrintWriter(this.socket.getOutputStream(), true);
			String currentTime = null;
			String body = null;
			while (true) {
				body = in.readLine();
				if (body == null) {
					break;
				}
				System.out.println("The time server receive order: " + body);
				currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
				out.println(currentTime);
			}
		}catch (Exception e) {
			// TODO: handle exception
			if (in != null) {
				try {
					in.close();
				}catch (Exception e1) {
					// TODO: handle exception
					e.printStackTrace();
				}
			}
			
			if (out != null) {
				out.close();
				out = null;
			}
			
			if(this.socket != null) {
				try {
					this.socket.close();
				}catch (Exception e2) {
					// TODO: handle exception
					e.printStackTrace();
				}
				this.socket = null;
			}
		}
		
	}

}
package cn.edu.hust.app.pio;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 
 * @author chenqiang
 * 執行緒池
 */
public class TimeServerHandlerExecutePool {

	private ExecutorService executor;

	public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
		executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, 
				TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize));
	}
	
	public void execute(Runnable task) {
		executor.execute(task);
	}
	
}

(2)客戶端

package cn.edu.hust.app.pio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

/**
 * 
 * @author chenqiang
 *
 */
public class TimeClient {

	public static void main(String[] args) {
		int port = 8080;
		if(args != null && args.length > 0) {
			try {
				port = Integer.valueOf(args[0]);
			}catch (Exception e) {
				// TODO: handle exception
			}
		}
		
		Socket socket = null;
		BufferedReader in = null;
		PrintWriter out = null;
		
		try {
			socket = new Socket("127.0.0.1", port);
			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			out = new PrintWriter(socket.getOutputStream(), true);
			out.println("QUERY TIME ORDER");
			System.out.println("Send order 2 server succeed.");
			String resp = in.readLine();
			System.out.println("Now is: " + resp);
		}catch (Exception e) {
			// TODO: handle exception
		}finally {
			if (out != null) {
				out.close();
				out = null;
			}
			
			if (in != null) {
				try {
					in.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			in = null;
			
			if (socket != null) {
				try {
					socket.close();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			socket = null;
		}
	}
}

       偽非同步I/O通訊框架採用了執行緒池實現,因此避免了為每個請求建立一個獨立執行緒造成的執行緒資源耗盡問題。但是由於它底層的通訊依然採用同步阻塞模型,因此無法從根本上解決問題。

       偽非同步I/O的問題根源是讀寫操作均為同步阻塞的,這樣一來阻塞的時間取決於雙方I/O執行緒的處理速度和網路I/O的傳輸速度。本質上,我們無法保證生產環節的網路狀況和對端的應用程式能足夠快,如果我們的應用程式依賴對方的處理速度,它的可靠性就非常差。

       偽非同步I/O實際上僅僅是對之前I/O執行緒模型的一個簡單優化,它無法從根本上解決同步I/O導致的通訊執行緒阻塞問題。下面就簡單分析下通訊雙方返回應答時間過長會引起的級聯故障:

(1)服務端處理緩慢,返回應答訊息耗費60s,平時只需要10ms

(2)採用偽非同步I/O的執行緒正在讀取故障服務節點的響應,由於讀取輸入流是阻塞的,它將會被同步阻塞60s

(3)假如所有的可用執行緒都被故障伺服器阻塞,那後續所有的I/O訊息都將在佇列中排隊

(4)由於執行緒池採用阻塞佇列實現,當佇列積滿之後,後續入隊的操作將被阻塞

(5)由於前端只有一個Acceptor執行緒接收客戶端接入,它被阻塞線上程池的同步阻塞佇列之後,新的客戶端請求訊息將被拒絕,客戶端會發生大量的連線超時

(6)由於幾乎所有的連線都超時,呼叫者會認為系統已經崩潰,無法接受新的請求訊息