1. 程式人生 > >【多執行緒高併發】多執行緒的設計模式

【多執行緒高併發】多執行緒的設計模式

多執行緒的設計模式

程式碼在Multi_004當中
並行設計模式屬於設計優化的一部分,他是對一些常用的多執行緒結構的總結和抽象,與序列程式相比,並行程式的結構通常更為複雜,因此合理的使用並行模式在多執行緒開發中更具有意義,在這裡主要介紹Future, Master-Worker和生產者-消費者模型。

Future模式

Future模式類似於非同步請求
Future模式Java實現舉例(其實JDK自帶有實現,這裡我先用java來實現)
程式碼在: com.kaishun.height.design014 中
java_future模式

main方法

public class Main {

    public
static void main(String[] args) throws InterruptedException { FutureClient fc = new FutureClient(); Data data = fc.request("請求引數"); System.out.println("請求傳送成功!"); System.out.println("做其他的事情..."); String result = data.getRequest(); System.out.println(result); } }

FutureClient類 先返回一個futureData物件,不讓主方法阻塞,然後再讓這個引用去得到耗時的操作的結果

public class FutureClient {

    public Data request(final String queryStr){
        //1 我想要一個代理物件(Data介面的實現類)先返回給傳送請求的客戶端,告訴他請求已經接收到,可以做其他的事情
        final FutureData futureData = new FutureData();
        //2 啟動一個新的執行緒,去載入真實的資料,傳遞給這個代理物件
new Thread(new Runnable() { @Override public void run() { //3 這個新的執行緒可以去慢慢的載入真實物件,然後傳遞給代理物件 RealData realData = new RealData(queryStr); futureData.setRealData(realData); } }).start(); return futureData; } }

RealData類,構造方法是一個耗時的類的操作

public class RealData implements Data{

    private String result ;

    public RealData (String queryStr){
        System.out.println("根據'" + queryStr + "'進行查詢,這是一個很耗時的操作..");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("操作完畢,獲取結果");
        result = "'查詢結果'";
    }

    @Override
    public String getRequest() {
        return result;
    }

}

FutureData類 setRealData(RealData realData)和getRequest()加了同步程式碼塊synchronized, 執行緒之間通過wait/notify進行通訊

public class FutureData implements Data{

    private RealData realData ;

    private boolean isReady = false;

    public synchronized void setRealData(RealData realData) {
        //如果已經裝載完畢了,就直接返回
        if(isReady){
            return;
        }
        //如果沒裝載,進行裝載真實物件
        this.realData = realData;
        isReady = true;
        //進行通知
        notify();
    }

    @Override
    public synchronized String getRequest() {
        //如果沒裝載好 程式就一直處於阻塞狀態
        while(!isReady){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //裝載好直接獲取資料即可
        return this.realData.getRequest();
    }

}

Data介面

public interface Data {

    String getRequest();

}

最終輸出

請求傳送成功!
做其他的事情...
根據'請求引數'進行查詢,這是一個很耗時的操作..
操作完畢,獲取結果
'查詢結果'

Master-Worker模式

Master-Worker模式是常用的平行計算模式。他的核心思想是系統由兩類程序協作工作:Master程序和Worker程序.Maseter負責接收和分配任務, Worker負責處理子任務。當各個Worker子進行處理完成後,會將結果返回給Master,由Msster做歸納總結,好處是能將一個大任務分解成若干個小任務,並行執行,從而提高系統的吞吐量
java併發Master-Worker模式
舉例com.kaishun.height.design015

Master-Worker模式編寫步驟

Task任務類

public class Task {

    private int id;
    private int price ;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public int getPrice() {
        return price;
    }
    public void setPrice(int price) {
        this.price = price;
    } 

}

Master類

    //1 有一個盛放任務的容器
    private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();

    //2 需要有一個盛放worker的集合
    private HashMap<String, Thread> workers = new HashMap<String, Thread>();

    //3 需要有一個盛放每一個worker執行任務的結果集合
    private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

    //4 構造方法
    public Master(Worker worker , int workerCount){
        worker.setWorkQueue(this.workQueue);
        worker.setResultMap(this.resultMap);

        for(int i = 0; i < workerCount; i ++){
            this.workers.put(Integer.toString(i), new Thread(worker));
        }

    }

    //5 需要一個提交任務的方法
    public void submit(Task task){
        this.workQueue.add(task);
    }

    //6 需要有一個執行的方法,啟動所有的worker方法去執行任務
    public void execute(){
        for(Map.Entry<String, Thread> me : workers.entrySet()){
            me.getValue().start();
        }
    }

    //7 判斷是否執行結束的方法
    public boolean isComplete() {
        for(Map.Entry<String, Thread> me : workers.entrySet()){
            if(me.getValue().getState() != Thread.State.TERMINATED){
                return false;
            }
        }       
        return true;
    }

    //8 計算結果方法
    public int getResult() {
        int priceResult = 0;
        for(Map.Entry<String, Object> me : resultMap.entrySet()){
            priceResult += (Integer)me.getValue();
        }
        return priceResult;
    }

}

Work類

public class Worker implements Runnable {

    private ConcurrentLinkedQueue<Task> workQueue;
    private ConcurrentHashMap<String, Object> resultMap;

    public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
        this.workQueue = workQueue;
    }

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    @Override
    public void run() {
        while(true){
            Task input = this.workQueue.poll();
            if(input == null) break;
            Object output = handle(input);
            this.resultMap.put(Integer.toString(input.getId()), output);
        }
    }

    private Object handle(Task input) {
        Object output = null;
        try {
            //處理任務的耗時。。 比如說進行操作資料庫。。。
            Thread.sleep(500);
            output = input.getPrice();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return output;
    }
}

Main方法

public class Main {

    public static void main(String[] args) {

        Master master = new Master(new Worker(), 20);

        Random r = new Random();
        // 100 個任務
        for(int i = 1; i <= 100; i++){
            Task t = new Task();
            t.setId(i);
            t.setPrice(r.nextInt(1000));
            master.submit(t);
        }

        master.execute();
        long start = System.currentTimeMillis();

        while(true){
            if(master.isComplete()){
                long end = System.currentTimeMillis() - start;
                int priceResult = master.getResult();
                System.out.println("最終結果:" + priceResult + ", 執行時間:" + end);
                break;
            }
        }

    }
}

最終輸出結果

最終結果:48098, 執行時間:2500

1.4 生產者-消費者模式

生產者和消費者也是一個非常經典的多執行緒模式,我們在實際中開發應用非常廣泛的思想理念。在生產-消費模式中:通常由兩類執行緒,即若干個生產者和若干個消費者的執行緒。生產者負責提交使用者資料,消費者負責具體處理生產者提交的任務,在生產者和消費者之間通過共享記憶體快取區進行通訊。

高併發生產者-消費者模式

示例:
現在就來模擬一下上面的圖示
main方法

public class Main {

    public static void main(String[] args) throws Exception {
        //記憶體緩衝區
        BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
        //生產者
        Provider p1 = new Provider(queue);
        Provider p2 = new Provider(queue);
        Provider p3 = new Provider(queue);
        //消費者
        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);
        Consumer c3 = new Consumer(queue);
        //建立執行緒池執行,這是一個快取的執行緒池,可以建立無窮大的執行緒,沒有任務的時候不建立執行緒。空閒執行緒存活時間為60s(預設值)

        ExecutorService cachePool = Executors.newCachedThreadPool();
        cachePool.execute(p1);
        cachePool.execute(p2);
        cachePool.execute(p3);
        cachePool.execute(c1);
        cachePool.execute(c2);
        cachePool.execute(c3);

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        p1.stop();
        p2.stop();
        p3.stop();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }       
//      cachePool.shutdown(); 
//      cachePool.shutdownNow();

    }

}

Provider

public class Provider implements Runnable{

    //共享快取區
    private BlockingQueue<Data> queue;
    //多執行緒間是否啟動變數,有強制從主記憶體中重新整理的功能。即時返回執行緒的狀態
    private volatile boolean isRunning = true;
    //id生成器
    private static AtomicInteger count = new AtomicInteger();
    //隨機物件
    private static Random r = new Random(); 

    public Provider(BlockingQueue queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        while(isRunning){
            try {
                //隨機休眠0 - 1000 毫秒 表示獲取資料(產生資料的耗時) 
                Thread.sleep(r.nextInt(1000));
                //獲取的資料進行累計...
                int id = count.incrementAndGet();
                //比如通過一個getData方法獲取了
                Data data = new Data(Integer.toString(id), "資料" + id);
                System.out.println("當前執行緒:" + Thread.currentThread().getName() + ", 獲取了資料,id為:" + id + ", 進行裝載到公共緩衝區中...");
                if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
                    System.out.println("提交緩衝區資料失敗....");
                    //do something... 比如重新提交
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void stop(){
        this.isRunning = false;
    }

}

Consumer

public class Consumer implements Runnable{

    private BlockingQueue<Data> queue;

    public Consumer(BlockingQueue queue){
        this.queue = queue;
    }

    //隨機物件
    private static Random r = new Random(); 

    @Override
    public void run() {
        while(true){
            try {
                //獲取資料
                Data data = this.queue.take();
                //進行資料處理。休眠0 - 1000毫秒模擬耗時
                Thread.sleep(r.nextInt(1000));
                System.out.println("當前消費執行緒:" + Thread.currentThread().getName() + ", 消費成功,消費資料為id: " + data.getId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Data類

public final class Data {

    private String id;
    private String name;

    public Data(String id, String name){
        this.id = id;
        this.name = name;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString(){
        return "{id: " + id + ", name: " + name + "}";
    }

}

輸出

當前執行緒:pool-1-thread-2, 獲取了資料,id為:1, 進行裝載到公共緩衝區中...
當前消費執行緒:pool-1-thread-5, 消費成功,消費資料為id: 1
當前執行緒:pool-1-thread-3, 獲取了資料,id為:2, 進行裝載到公共緩衝區中...
當前執行緒:pool-1-thread-2, 獲取了資料,id為:3, 進行裝載到公共緩衝區中...
當前消費執行緒:pool-1-thread-6, 消費成功,消費資料為id: 2
當前執行緒:pool-1-thread-1, 獲取了資料,id為:4, 進行裝載到公共緩衝區中...
當前執行緒:pool-1-thread-2, 獲取了資料,id為:5, 進行裝載到公共緩衝區中...
當前消費執行緒:pool-1-thread-5, 消費成功,消費資料為id: 4
當前消費執行緒:pool-1-thread-4, 消費成功,消費資料為id: 3
當前執行緒:pool-1-thread-1, 獲取了資料,id為:6, 進行裝載到公共緩衝區中...
當前執行緒:pool-1-thread-3, 獲取了資料,id為:7, 進行裝載到公共緩衝區中...
當前消費執行緒:pool-1-thread-6, 消費成功,消費資料為id: 5
當前消費執行緒:pool-1-thread-4, 消費成功,消費資料為id: 7
當前執行緒:pool-1-thread-2, 獲取了資料,id為:8, 進行裝載到公共緩衝區中...
當前執行緒:pool-1-thread-2, 獲取了資料,id為:9, 進行裝載到公共緩衝區中...
當前執行緒:pool-1-thread-1, 獲取了資料,id為:10, 進行裝載到公共緩衝區中...
當前消費執行緒:pool-1-thread-5, 消費成功,消費資料為id: 6
當前執行緒:pool-1-thread-3, 獲取了資料,id為:11, 進行裝載到公共緩衝區中...
當前執行緒:pool-1-thread-1, 獲取了資料,id為:12, 進行裝載到公共緩衝區中...
當前消費執行緒:pool-1-thread-5, 消費成功,消費資料為id: 10
當前消費執行緒:pool-1-thread-6, 消費成功,消費資料為id: 8
當前執行緒:pool-1-thread-1, 獲取了資料,id為:13, 進行裝載到公共緩衝區中...
當前執行緒:pool-1-thread-3, 獲取了資料,id為:14, 進行裝載到公共緩衝區中...
當前消費執行緒:pool-1-thread-6, 消費成功,消費資料為id: 12
當前執行緒:pool-1-thread-1, 獲取了資料,id為:15, 進行裝載到公共緩衝區中...
當前消費執行緒:pool-1-thread-4, 消費成功,消費資料為id: 9
當前執行緒:pool-1-thread-2, 獲取了資料,id為:16, 進行裝載到公共緩衝區中...
當前消費執行緒:pool-1-thread-6, 消費成功,消費資料為id: 13
當前消費執行緒:pool-1-thread-5, 消費成功,消費資料為id: 11
當前執行緒:pool-1-thread-3, 獲取了資料,id為:17, 進行裝載到公共緩衝區中...
當前消費執行緒:pool-1-thread-6, 消費成功,消費資料為id: 15
當前消費執行緒:pool-1-thread-4, 消費成功,消費資料為id: 14
當前執行緒:pool-1-thread-2, 獲取了資料,id為:18, 進行裝載到公共緩衝區中...
當前執行緒:pool-1-thread-1, 獲取了資料,id為:19, 進行裝載到公共緩衝區中...
當前消費執行緒:pool-1-thread-5, 消費成功,消費資料為id: 16
當前消費執行緒:pool-1-thread-6, 消費成功,消費資料為id: 17
當前消費執行緒:pool-1-thread-5, 消費成功,消費資料為id: 19
當前消費執行緒:pool-1-thread-4, 消費成功,消費資料為id: 18

特別感謝網際網路架構師白鶴翔老師,本文大多出自他的講解。
筆者主要是記錄筆記,正所謂好記性不如爛筆頭,爛筆頭不如雲筆記