1. 程式人生 > >並行模式與演算法(二)

並行模式與演算法(二)

1.矩陣演算法

在矩陣乘法中,第一個矩陣的列數和第二個矩陣的行數必須是相同的。如果需要進行平行計算,一種簡單的策略是可以將A矩陣進行水平分割,得到子矩陣A1和A2,B矩陣進行垂直分割,得到子矩陣B1和B2。此時,我們只要分別計算這些子矩陣的乘積,將結果進行拼接,就能得到原始矩陣A和B的乘積。當然這個過程是可額予以反覆進行的。為了計算A1*A2,我們可以進一步將A1和B1進行分解,直到我們認為子矩陣的大小已經在可接受的範圍內。

在這裡我們使用FockJoin框架來實現這個並行矩陣相乘的想法。為了方便矩陣計算,我們使用jMatrces開源軟體,作為矩陣計算的工具。其中,使用的主要API如下:

  • Matrix:代表一個矩陣
  • MatrixOperator.multiply(Matrix, Matrix):矩陣相乘
  • Matrix.row():獲得矩陣的行數
  • Matrix.getSubMatrix():獲得矩陣的子矩陣
  • MatrixOperator.horizontalConcatenation(Matrix, Matrix):將兩個矩陣進行水平連線
  • MatrixOperator.verticalConcatenation(Matrix, Matrix):將兩個矩陣進行垂直連線

定義一個任務類計算矩陣乘法,如果輸入的矩陣粒度太大,則會再次進行任務分解:

public class MatrixMulTask extends
RecursiveTask<Matrix> { Matrix m1; Matrix m2; String pos; /** * 建構函式 * @parm m1 矩陣1 * @parm m2 矩陣2 * @parm pos 乘積結果在父矩陣相乘結果中所處的位置 */ public MatrixMulTask(Matrix m1,Matrix m2,String pos) { this.m1 = m1; this.m2 = m2; this.pos = pos; } @Override protected Matrix compute()
{ if(m1.rows() <= MatrixMulTask.granularity || m2.cols()<=MatrixMulTask.granularity) { Matrix mRe = MatrixOperator.multiply(m1, m2); return mRe; } else { //繼續分割矩陣 int rows; rows = m1.rows(); //左乘矩陣橫向分割 Matrix m11 = m1.getSubMatrix(1, 1, rows/2, m1.cols()); Matrix m12 = m1.getSubMatrix(rows/2+1, 1, m1.rows(), m1.cols()); //右乘矩陣縱向分割 Matrix m21 = m2.getSubMatrix(1, 1, m2.rows(), m12.cols()/2); Matrix m22 = m2.getSubMatrix(1, m2.cols()/2+1, m2.rows(), m2.cols()); ArrayList<MatrixMulTask> subTasks = new ArrayList<MatrixMulTask>(); MatrixMulTask tmp = null; tmp = new MatrixMulTask(m11, m21, "m1"); subTasks.add(tmp); tmp = new MatrixMulTask(m11, m22, "m2"); subTasks.add(tmp); tmp = new MatrixMulTask(m12, m21, "m3"); subTasks.add(tmp); tmp = new MatrixMulTask(m12, m22, "m4"); subTasks.add(tmp); for(MatrixMulTask t : subTasks) { t.fork(); } Map<String, Matrix> matrixMap = new HashMap<String,Matrix>(); for(MatrixMulTask t :subTasks) { matrixMap.put(t.pos, t.join()); } Matrix tmp1 = MatrixOperator.horizontalConcatenation(matrixMap.get("m1"), matrixMap.get("m2")); Matrix tmp2 = MatrixOperator.horizontalConcatenation(matrixMap.get("m3"), matrixMap.get("m4")); Matrix reM = MatrixOperator.verticalConcatenation(tmp1, tmp2); return reM; } } }

MatrixMulTask中的成員變數m1和m2表示要相乘的兩個矩陣,pos表示這個乘積結果在父矩陣相乘結果中所處的位置,有m1,m2,m3,和m4等四種。先對矩陣進行分割,分割後得到m11、m12、m21和m22等四個任務,並將它們進行子任務的建立。然後計算這些子任務,最後將m1,m2,m3,和m4拼接成新的矩陣作為最終結果。

主函式:

public static final int granularity = 3;
public static void main(String[] args) throws InterruptedException, ExecutionException {
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		//建立兩個300*300的隨機矩陣
		Matrix m1 = MatrixFactory.getRandomMatrix(100, 100, null);
		Matrix m2 = MatrixFactory.getRandomMatrix(100, 100, null);
		MatrixMulTask task = new MatrixMulTask(m1, m2, null);
		ForkJoinTask<Matrix> result = forkJoinPool.submit(task);
		Matrix pr = result.get();
		System.out.println(pr);
	}

二、網路NIO

Java NIO是New IO的簡稱。其涉及到的基礎內容有通道(Channel)和緩衝區(Buffer)、檔案IO和網路IO。

2.1 基於Socket的服務端的多執行緒模式

這裡,以Echo伺服器為例。對於Echo伺服器,它會讀取客戶端的一個輸入,並將這個輸入原封不動地返回給客戶端。伺服器會為每一個客戶端連線啟用一個執行緒,這個新的執行緒將全心全意為這個客戶端服務。為了接受客戶端連線,伺服器還會額外使用一個派發執行緒。下面是服務端程式碼:

public class MultiThreadEchoServer {
	// 使用執行緒池處理每個客戶端連線
	private static ExecutorService tp = Executors.newCachedThreadPool();
	//定義了HandleMsg執行緒,它由一個客戶端Socket構成,它的任務是讀取這個Socket的內容並
	//將其進行返回,返回成功後,任務完成,客戶端Socket就被正常關閉
	static class HandleMsg implements Runnable {
		Socket clientSocket;
		public HandleMsg(Socket clientSocket) {
			this.clientSocket = clientSocket;
		}

		@Override
		public void run() {
			BufferedReader is = null;
			PrintWriter os = null;
			try {
				is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
				os = new PrintWriter(clientSocket.getOutputStream(),true);
				String inputLine = null;
				long b = System.currentTimeMillis();
				while((inputLine = is.readLine()) != null) {
					os.println(inputLine);
				}
				// 統計並輸出了服務端執行緒處理一次客戶端請求所花費的時間(包括讀取資料和回寫資料的時間)
				long e = System.currentTimeMillis();
				System.out.println("spend:" + (e-b)+"ms");

			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				try {
					if(is!=null) is.close();
					if(os!=null) os.close();
					clientSocket.close();
				} catch (Exception e2) {
					e2.printStackTrace();
				}
			}
		}
	}

	public static void main(String[] args) {
		ServerSocket echoServer = null;
		Socket clientSocket = null;
		try {
			echoServer = new ServerSocket(8000);
		} catch (Exception e) {
			System.out.println(e);
		}

		while(true) {
			try {
				//客戶端連線,建立HandleMsg執行緒進行處理
				clientSocket = echoServer.accept();
				System.out.println(clientSocket.getRemoteSocketAddress() + " connect!");
				tp.execute(new HandleMsg(clientSocket));
			} catch (Exception e) {
				System.out.println(e);
			}
		}
	}
}

定義一個重量級的客戶端:


public class HeavySocketClient {
	private static ExecutorService tp = Executors.newCachedThreadPool();
	private static final int sleep_time = 1000*1000*1000;
	public static class EchoClient implements Runnable {
		@Override
		public void run() {
			Socket client = null;
			PrintWriter writer = null;
			BufferedReader reader = null;
			try {
				client = new Socket();
				client.connect(new InetSocketAddress("localhost",8000));
				writer = new PrintWriter(client.getOutputStream(),true);
				writer.write("H");
				LockSupport.parkNanos(sleep_time);
				writer.write("e");
				LockSupport.parkNanos(sleep_time);
				writer.write("l");
				LockSupport.parkNanos(sleep_time);
				writer.write("l");
				LockSupport.parkNanos(sleep_time);
				writer.write("o");
				LockSupport.parkNanos(sleep_time);
				writer.write("!");
				LockSupport.parkNanos(sleep_time);
				writer.println();
				writer.flush();

				reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
				System.out.println("from server:" + reader.readLine());
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				try {
					if(writer!=null)writer.close();
					if(reader!=null)reader.close();
					if(client!=null)client.close();
				} catch (Exception e2) {
				}
			}
		}
	}
	public static void main(String[] args) {
		EchoClient ec = new EchoClient();
		for(int i=0; i<10; i++) {
			tp.execute(ec);
		}
	}
}

上述程式碼定義了的客戶端,它會進行10次請求。每一次請求都會訪問8000埠。連線成功後,會向伺服器輸出“Hello!”字串,但是在這一次互動中,客戶端會延時進行輸出,每次只輸出一個字元,之後進行1秒的等待。因此,整個過程會持續6秒。

對於服務端來說,每一個請求的處理時間都在6秒左右。這很容易理解,因為伺服器要先讀入客戶端的輸入,而客戶端緩慢的處理速度(也可能是一個擁塞的網路環境)使得伺服器花費了不少等待時間。在這個案例中,伺服器處理請求之所以慢,並不是因為在伺服器端有繁重的任務,而僅僅是因為服務執行緒在等待IO。

2.2 使用NIO進行網路程式設計

在NIO中的一個關鍵元件Channel(通道)。Channel有點類似於流,一個Channel可以和檔案或者網路Socket對應。如果Channel對應著一個Socket,那麼往這個Channel中寫資料,就等同於向Socket中寫入資料。

和Channel一起使用的另一個重要元件就是Buffer。可以簡單地把Buffer理解成一個記憶體區域或者byte陣列。資料需要包裝成Buffer的形式才能和Channel互動(寫入或者讀取)。

另一個與Channel密切相關的是Selector(選擇器)。在Channel的眾多實現中,有一個SelectableChannel實現,表示可被選擇的通道。任何一個SelectableChannel都可以將自己註冊到一個Selector中。這樣,這個Channel就能被Selector所管理。而一個Selector可以管理多個SelectableChannel。當SelectableChannel的資料準備好時,Selector就會接到通知,得到那些已經準備好的資料。而SocketChannel就是SelectableChannel的一種。

一個Selector可以由一個執行緒進行管理,而一個SelectableChannel則可以表示一個客戶端連線,因此這就構成由一個或者極少數執行緒,來處理大量客戶端連線的結構。當與客戶端連線的資料沒有準備好時,Selector會處於等待狀態,而一旦有任何一個SelectableChannel準備好了資料,Selector就能立即得到通知,獲取資料進行處理。

下面是NIO伺服器的核心程式碼:

//處理所有的網路連線
private Selector selector;
//執行緒池針對每個客戶端進行相應處理
private ExecutorService tp = Executors.newCachedThreadPool();
//用於統計伺服器執行緒在一個客戶端上花費了多少時間
public static Map<Socket, Long> time_stat = new HashMap<Socket, Long>(10240);
// 啟動NIO Server
private void startServer() throws Exception {
	// 通過工廠方法獲得一個Selector物件的例項
	selector = SelectorProvider.provider().openSelector();
	// 獲得表示服務端的SocketChannel例項
	ServerSocketChannel ssc = ServerSocketChannel.open();
	// 將這個SocketChannel設定為非阻塞模式
	// 在這種模式下,我們才可以向Channel註冊感興趣的事件,並且在資料準備好時,得到必要的通知
	ssc.configureBlocking(false);

	InetSocketAddress isa = new InetSocketAddress("localhost", 8000);
	// 將這個CHannel繫結到8000埠
	ssc.socket().bind(isa);
	// 將這個ServerSocketChannel繫結到Selector上,並註冊它感興趣的時間為Accept
	// 當Selector發現ServerSocketChannel有新的客戶端連線時,就會通知ServerSocketChannel進行處理。
	// 方法register()返回值是一個SelectionKey,SelectionKey表示一對Selector和Channel的關係。
	// 當Channel註冊到Selector上時,就相當於確定了兩者的服務關係,那麼SelectionKey就是這個契約。
	// 當Selector或者Channel被關閉時,它們對應的SelectionKey就會失敗
	SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
	//無窮迴圈,它的主要任務就是等待-分發網路訊息
	for(;;) {
		// select()方法是一個阻塞方法。如果當前沒有任何資料準備好,它就會等待。一旦有資料可讀,
		// 它就會返回。它的返回值是已經準備就緒的SelectionKey的數量。這裡簡單地將其忽略。
		selector.select();
		// 獲取那些準備好的SelectionKey。因為Selector同時為多個Channel服務,因此已經準備就緒的Channel就有可能是多個。
		Set readyKeys = selector.selectedKeys();
		Iterator i = readyKeys.iterator();
		long e = 0;
		// 使用迭代器遍歷整個集合
		while(i.hasNext()) {
			// 根據迭代器獲得一個集合內的SelectionKey例項
			SelectionKey sk = (SelectionKey)i.next();
			// 將這個元素移除!注意,這個非常重要,否則就會重複處理相同的SelectionKey。
			i.remove();
			//當前SelectionKey所代表的Channel是否在Acceptable狀態
			if(sk.isAcceptable()) {
				// 客戶端的接收
				doAccept(sk);
			}
			//判斷Channel是否已經可以讀了
			else if(sk.isValid() && sk.isReadable()) {
				// 為了統計系統處理每一個連線的時間,記錄在讀取資料之前的一個時間戳。
				if(!time_stat.containsKey(((SocketChannel)sk.channel()).socket()))
					time_stat.put(((SocketChannel)sk.channel()).socket(), System.currentTimeMillis());
					// 讀取
				doRead(sk);
			}
			// 判斷通道是否準備好進行寫
			else if(sk.isValid() && sk.isWritable()) {
				// 寫
				doWrite(sk);
				e = System.currentTimeMillis();
				long b = time_stat.remove(((SocketChannel)sk.channel()).socket());
				System.out.println("spend:" + (e-b) +"ms");
			}
		}
	}
}

doAccept()方法,它與客戶端建立連線:

private void doAccept(SelectionKey sk) {
	ServerSocketChannel server = (ServerSocketChannel)sk.channel();
	SocketChannel clientChannel;
	try {
		// 生成的clientChannel就表示和客戶端通訊的通道
		clientChannel = server.accept();
		// 將這個Channel配置為非阻塞模式,也就是要求系統在準備好IO後,再通知我們的執行緒來讀取或者寫入。
		clientChannel.configureBlocking(false);
		//將新生成的Channel註冊到selector選擇器上,並告訴Selector,我現在對讀(OP_READ)操作感興趣。這樣,
		//當Selector發現這個Channel已經準備好讀時,就能給執行緒一個通知。
		SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
		//新建一個物件例項,一個EchoClient例項代表一個客戶端
    EchoClient echoClient = new EchoClient();
		// 我們將這個客戶端例項作為附件,附加到表示這個連線的SelectionKey上。這樣在整個連線的處理過程中,
		// 我們都可以共享這個EchoClient例項
		clientKey.attach(echoClient);

		InetAddress clientAddress = clientChannel.socket().getInetAddress();
		System.out.println("Accepted connection from " + clientAddress.getHostAddress() + ".");
	} catch (Exception e) {
		System.out.println("False to accept new client.");
		e.printStackTrace();
	}
}

EchoClient的定義很簡單,它封裝了一個佇列,儲存在需要回復給這個客戶端的所有資訊,這樣,再進行回覆時,只要outq物件中彈出元素即可。

public