1. 程式人生 > >所執行緒設計模式(五)

所執行緒設計模式(五)

並行設計模式屬於優化的一部分,它是對一些常用的多執行緒結構的總結和抽象。與序列相比,並行程式的結構通常更為複雜。因此合理的使用並行模式在多執行緒開發中更具有意義。

Future模式

Future模式有點類似商品訂單。比如網購時,當看重某一件商品時,就可以提交訂單,當訂單處理完成後,在家裡等待商品送貨上門即可。或者說更形象的我們的Ajax請求的時候,頁面是非同步的進行後臺處理,使用者無須一直等待請求結果,可以繼續瀏覽或操作其他內容。

public class Main {

	public static void main(String[] args) {
		FutureClient fc = new FutureClient();
		Data data = fc.request("請求引數");
		System.out.println("請求傳送成功!");
		System.out.println("FutureClient做其他的業務操作");
		
		String result = data.getRequest();
		System.out.println(result);
	}

}
public class FutureClient {

	public Data request(final String queryStr){
		//1.生成一個代理物件(Data介面的實現類)先返回給客戶端,告訴客戶端請求已經收到,客戶端可以繼續做其他事
		final FutureData futureData = new FutureData();
		//2.啟動一個新的執行緒,去載入真實的資料,並傳給這個帶你物件
		new Thread(new Runnable(){
			@Override
			public void run() {
				//3.載入真實的資料,然後傳遞給代理物件
				RealData realData = new RealData(queryStr);
				futureData.setRealData(realData);
			}}).start();
		return futureData;
	}
}
public interface Data {

	String getRequest();
}
public class FutureData implements Data{

	private RealData realData;
	
	private boolean isReady = false;
	
	public synchronized void setRealData(RealData realData) {
		//如果載入完畢,則直接返回
		if(isReady){
			return;
		}
		//如果沒又裝載,進行裝載真是物件
		this.realData = realData;
		isReady = true;
		//進行通知
		notify();
	}

	@Override
	public String getRequest() {
		//如果沒裝載好,程式就一直處於阻塞狀態
		while(!isReady){
			try{
				wait();
			} catch (InterruptedException e){
				e.printStackTrace();
			}
		}
		//裝載好之後直接返回資料即可
		return this.realData.getRequest();
	}

}
public class RealData implements Data{

	private String result;
	
	public RealData(String queryStr) {
		System.out.println("根據" + queryStr + "進行查詢,這是一個耗時的操作...");
		try{
			Thread.sleep(5000);
		} catch (InterruptedException e){
			e.printStackTrace();
		}
		System.out.println("操作完畢,獲取結果");
		result = "查詢結果";
	}

	@Override
	public String getRequest() {
		return result;
	}
}

Master-Worker模式

Master-Worker模式是常用的平行計算模式。它的核心思想是系統有兩類程序協作工作:Master程序和Worker程序。Master負責接收和分配任務,Worker負責處理子任務。當各個Worker子程序處理完畢後,會將結果返回給Master,有Master做歸納總結。其好處就是將一個大任務分解成若干和小任務,並行執行,從而提高系統的吞吐量。

public class Main {

	public static void main(String[] args) {
		Master master = new Master(new Worker(), 10);
		Random r = new Random();
		for(int i = 0; i < 100; i++){
			Task task = new Task();
			task.setId(i);
			task.setName("任務"+i);
			task.setPrice(r.nextInt(1000));
			master.submit(task);
		}
		master.execute();
		
		long start = System.currentTimeMillis();
		
		while(true){
			if(master.isComplate()){
				long end = System.currentTimeMillis();
				int ret = master.getResult();
				System.out.println("執行結果 " + ret + ",執行耗時 " + (end-start));
				break;
			}
		}
	}

}
public class Master {

	//1.宣告承載任務的容器
	private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<>();
	
	//2.使用HashMap去承載所有的Worker物件
	private HashMap<String,Thread> workers = new HashMap<>();
	
	//3.使用一個容器承載每一個Worker併發執行任務的結果集
	private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<>();
	
	//4.構造方法
	public Master(Worker worker,int workerCount){
		
		//每一個Worker物件都需要有Master節點的引用,workQueue用於任務的領取,resultMap用於任務的提交
		worker.setWorkQueue(this.workQueue);
		worker.setResultMap(this.resultMap);
		
		for(int i = 0; i < workerCount; i++){
			//key表示每一個worker的名字,value表示執行緒執行物件
			workers.put("子節點" + Integer.toString(i), new Thread(worker));
		}
	}
	
	//5.提交方法
	public void submit(Task task){
		this.workQueue.add(task);
	}
	
	//6.啟動worker的方法(啟動應用程式,讓所有的Worker工作)
	public void execute(){
		for(Map.Entry<String, Thread> worker : workers.entrySet()){
			worker.getValue().start();
		}
	}
	
	//7.判斷執行緒是否執行完畢
	public boolean isComplate(){
		for(Map.Entry<String, Thread> worker : workers.entrySet()){
			if(worker.getValue().getState() == Thread.State.TERMINATED)
				return true;
		}
		return false;
	}
	
	//8.返回結果集資料
	public int getResult(){
		int result = 0;
		for(Map.Entry<String, Object> res : resultMap.entrySet()){
			//彙總邏輯
			result += (Integer)res.getValue();
		}
		return result;
	}
}
public class Task {

	private int id;
	
	private String name;
	
	private int price;

	//getter&setter...
}
public class Worker implements Runnable{

	//用於接收master節點的ConcurrentLinkedQueue的引用
	private ConcurrentLinkedQueue<Task> workQueue;
	
	//用於接收master節點的ConcurrentHashMap的引用
	private ConcurrentHashMap<String, Object> resultMap;
	
	public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
		this.workQueue = workQueue;
	}
	
	public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
		this.resultMap = resultMap;
	}
	
	@Override
	public void run() {
		while(true){
			Task input = this.workQueue.poll();
			if(null == input) break;
			//Task處理任務邏輯
			Object output = handle(input); 
			this.resultMap.put(Integer.toString(input.getId()), output);
		}
	}

	private Object handle(Task input) {
		Object output = null;
		try {
			//表示task處理業務邏輯耗時 
			Thread.sleep(500);
			output = input.getPrice();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return output;
	}
}