1. 程式人生 > >java設計模式之執行緒池模式

java設計模式之執行緒池模式

前序:
  Thread-Per-Message Pattern,是一種對於每個命令或請求,都分配一個執行緒,由這個執行緒執行工作。它將“委託訊息的一端”和“執行訊息的一端”用兩個不同的執行緒來實現。該執行緒模式主要包括三個部分:
  1,Request參與者(委託人),也就是訊息傳送端或者命令請求端
  2,Host參與者,接受訊息的請求,負責為每個訊息分配一個工作執行緒。
  3,Worker參與者,具體執行Request參與者的任務的執行緒,由Host參與者來啟動。
  由於常規呼叫一個方法後,必須等待該方法完全執行完畢後才能繼續執行下一步操作,而利用執行緒後,就不必等待具體任務執行完畢,就可以馬上返回繼續執行下一步操作。
  背景:
  由於在Thread-Per-Message Pattern中對於每一個請求都會生成啟動一個執行緒,而執行緒的啟動是很花費時間的工作,所以鑑於此,提出了Worker Thread,重複利用已經啟動的執行緒。
  執行緒池:
  Worker Thread,也稱為工人執行緒或背景執行緒,不過一般都稱為執行緒池。該模式主要在於,事先啟動一定數目的工作執行緒。當沒有請求工作的時候,所有的工人執行緒都會等待新的請求過來,一旦有工作到達,就馬上從執行緒池中喚醒某個執行緒來執行任務,執行完畢後繼續線上程池中等待任務池的工作請求的到達。
  任務池:主要是儲存接受請求的集合,利用它可以緩衝接受到的請求,可以設定大小來表示同時能夠接受最大請求數目。這個任務池主要是供執行緒池來訪問。
  執行緒池:這個是工作執行緒所在的集合,可以通過設定它的大小來提供併發處理的工作量。對於執行緒池的大小,可以事先生成一定數目的執行緒,根據實際情況來動態增加或者減少執行緒數目。執行緒池的大小不是越大越好,執行緒的切換也會耗時的。
  存放池的資料結構,可以用陣列也可以利用集合,在集合類中一般使用Vector,這個是執行緒安全的。
  Worker Thread的所有參與者:
  1,Client參與者,傳送Request的參與者
  2,Channel參與者,負責快取Request的請求,初始化啟動執行緒,分配工作執行緒
  3,Worker參與者,具體執行Request的工作執行緒
  4,Request參與者
  注意:將在Worker執行緒內部等待任務池非空的方式稱為正向等待。
  將在Channel執行緒提供Worker執行緒來判斷任務池非空的方式稱為反向等待。
  執行緒池例項1:
  利用同步方法來實現,使用陣列來作為任務池的存放資料結構。在Channel有快取請求方法和處理請求方法,利用生成者與消費者模式來處理儲存請求,利用反向等待來判斷任務池的非空狀態。

  Channel參與者:

//用到了生產者與消費者模式
//生成執行緒池,接受客戶端執行緒的請求,找到一個工作執行緒分配該客戶端請求
public class Channel {
    private static final int MAX_REQUEST = 100;// 併發數目,就是同時可以接受多少個客戶端請求
    //利用陣列來存放請求,每次從陣列末尾新增請求,從開頭移除請求來處理
    private final Request[] requestQueue;// 儲存接受客戶執行緒的數目
    private int tail;//下一次存放Request的位置
    private int head;//下一次獲取Request的位置
    private int count;// 當前request數量
    private final WorkerThread[] threadPool;// 儲存執行緒池中的工作執行緒
    // 運用陣列來儲存
    public Channel(int threads) {
        this.requestQueue = new Request[MAX_REQUEST];
        this.head = 0;
        this.head = 0;
        this.count = 0;
        threadPool = new WorkerThread[threads];
        // 啟動工作執行緒
        for (int i = 0; i < threadPool.length; i++) {
            threadPool[i] = new WorkerThread("Worker-" + i, this);
        }
    }
    public void startWorkers() {
        for (int i = 0; i < threadPool.length; i++) {
            threadPool[i].start();
        }
    }
    // 接受客戶端請求執行緒
    public synchronized void putRequest(Request request) {
        // 當Request的數量大於或等於同時接受的數目時候,要等待
        while (count >= requestQueue.length)
            try {
                wait();
            } catch (InterruptedException e) {
            }
        requestQueue[tail] = request;
        tail = (tail + 1) % requestQueue.length;
        count++;
        notifyAll();
    }
    // 處理客戶端請求執行緒
    public synchronized Request takeRequest() {
        while (count <= 0)
            try {
                wait();
            } catch (InterruptedException e) {
            }
        Request request = requestQueue[head];
        head = (head + 1) % requestQueue.length;
        count--;
        notifyAll();
        return request;
    }
}
客戶端請求執行緒:
import java.util.Random;
//向Channel傳送Request請求的
public class ClientThread extends Thread{
    private final Channel channel;
    private static final Random random=new Random();
                                                               
    public ClientThread(String name,Channel channel)
    {
        super(name);
        this.channel=channel;
    }
    public void run()
    {
        try{
            for(int i=0;true;i++)
            {
                Request request=new Request(getName(),i);
                channel.putRequest(request);
                Thread.sleep(random.nextInt(1000));
            }
        }catch(InterruptedException e)
        {
        }
    }
}
工作執行緒:
<span style="font-family:宋體;">//具體工作執行緒
public class WorkerThread extends Thread{
                                                      
    private final Channel channel;
    public WorkerThread(String name,Channel channel)
    {
      super(name);
      this.channel=channel;
    }
                                                      
    public void run()
    {
        while(true)
        {
            Request request=channel.takeRequest();
            request.execute();
        }
    }
}</span>
Request任務體:
<span style="font-family:宋體;">public class Request {
	private int id;
	private String name;

	public Request(String name, int id) {
		super();
		this.id = id;
		this.name = name;
	}

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	/**
	 * 執行耗時任務
	 */
	public void execute() {
		System.out.println(toString());
	}

	@Override
	public String toString() {
		return "Request [id=" + id + ", name=" + name + "]";
	}

}</span>
執行測試:
<span style="font-family:宋體;">public class WorkTest {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		Channel channel = new Channel(5);
		channel.startWorkers();

		new ClientThread("clientthread", channel).start();
	}

}</span>

以上是對利用反向等待來處理任務池的非空狀態的介紹,下面開始介紹正向等待的思路。

執行緒池例項2:
  工作執行緒:
  利用同步塊來處理,利用Vector來儲存客戶端請求。在Channel有快取請求方法和處理請求方法,利用生成者與消費者模式來處理儲存請求,利用正向等待來判斷任務池的非空狀態。
  這種例項,可以借鑑到網路ServerSocket處理使用者請求的模式中,有很好的擴充套件性與實用性。
  利用Vector來儲存,依舊是每次集合的最後一個位置新增請求,從開始位置移除請求來處理。

Channel參與者:

import java.util.Vector;



/*
 * 反向等待:執行緒池模式
 * 這個主要的作用如下
 * 0,緩衝客戶請求執行緒(利用生產者與消費者模式)
 * 1,儲存客戶端請求的執行緒
 * 2,初始化啟動一定數量的執行緒
 * 3,主動來喚醒處於任務池中wait set的一些執行緒來執行任務
 */
public class Channel {
	public final static int THREAD_COUNT = 4;

	public static void main(String[] args) {
		// 定義兩個集合,一個是存放客戶端請求的,利用Vector,
		// 一個是儲存執行緒的,就是執行緒池中的執行緒數目

		// Vector是執行緒安全的,它實現了Collection和List
		// Vector 類可以實現可增長的物件陣列。與陣列一樣,
		// 它包含可以使用整數索引進行訪問的元件。但Vector 的大小可以根據需要增大或縮小,
		// 以適應建立 Vector 後進行新增或移除項的操作。
		// Collection中主要包括了list相關的集合以及set相關的集合,Queue相關的集合
		// 注意:Map不是Collection的子類,都是java.util.*下的同級包
		Vector pool = new Vector();
		// 工作執行緒,初始分配一定限額的數目
		WorkerThread[] workers = new WorkerThread[THREAD_COUNT];

		// 初始化啟動工作執行緒
		for (int i = 0; i < workers.length; i++) {
			workers[i] = new WorkerThread(pool);
			workers[i].start();
		}

		// 接受新的任務,並且將其儲存在Vector中
		Request task = new Request("test request", 1);// 模擬的任務實體類
		// 此處省略具體工作
		// 在網路程式設計中,這裡就是利用ServerSocket來利用ServerSocket.accept接受一個Socket從而喚醒執行緒

		// 當有具體的請求達到
		synchronized (pool) {
			pool.add(pool.size(), task);
			pool.notifyAll();// 通知所有在pool wait set中等待的執行緒,喚醒一個執行緒進行處理
		}
		// 注意上面這步驟新增任務池請求,以及通知執行緒,都可以放在工作執行緒內部實現
		// 只需要定義該方法為static,在方法體用同步塊,且共享的執行緒池也是static即可

		// 下面這步,可以有可以沒有根據實際情況
		// 取消等待的執行緒
		for (int i = 0; i < workers.length; i++) {
			workers[i].interrupt();
		}
	}
}
工作執行緒
public class WorkerThread extends Thread {
	private List pool;// 任務請求池
	private static int fileCompressed = 0;// 所有例項共享的

	public WorkerThread(List pool) {
		this.pool = pool;
	}

	// 利用靜態synchronized來作為整個synchronized類方法,僅能同時一個操作該類的這個方法
	private static synchronized void incrementFilesCompressed() {
		fileCompressed++;
	}

	public void run() {
		// 保證無限迴圈等待中
		while (true) {
			Request request = null;
			// 共享互斥來訪問pool變數
			synchronized (pool) {
				// 利用多執行緒設計模式中的
				// Guarded Suspension Pattern,警戒條件為pool不為空,否則無限的等待中
				while (pool.isEmpty()) {
					try {
						pool.wait();// 進入pool的wait set中等待著,釋放了pool的鎖
					} catch (InterruptedException e) {
					}
				}
				// 當執行緒被喚醒,需要重新獲取pool的鎖,
				// 再次繼續執行synchronized程式碼塊中其餘的工作
				// 當不為空的時候,繼續再判斷是否為空,如果不為空,則跳出迴圈
				// 必須先從任務池中移除一個任務來執行,統一用從末尾新增,從開始處移除
				request = (Request) pool.remove(0);// 獲取任務池中的任務,並且要進行轉換
			}
			// 下面是執行緒所要處理的具體工作
			if(request!=null){
				request.execute();
			}else{
				System.out.println("null task");
			}
		}
	}
}

轉載自:http://computerdragon.blog.51cto.com/6235984/1205324


相關推薦

Java執行設計模式執行模式

前序:   Thread-Per-Message Pattern,是一種對於每個命令或請求,都分配一個執行緒,由這個執行緒執行工作。它將“委託訊息的一端”和“執行訊息的一端”用兩個不同的執行緒來實現。該執行緒模式主要包括三個部分:   1,Request參

java設計模式執行模式

前序:   Thread-Per-Message Pattern,是一種對於每個命令或請求,都分配一個執行緒,由這個執行緒執行工作。它將“委託訊息的一端”和“執行訊息的一端”用兩個不同的執行緒來實現。該執行緒模式主要包括三個部分:   1,Request參與者(委託人),也

Java併發程式設計執行(三)

一.介紹 Java通過Executors提供四種執行緒池,分別為: (1)newCachedThreadPool:建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。 (2)newFixedThreadPool: 建立一個定長執行緒池,可控制

java併發程式設計執行

前言 本文介紹幾種java常用的執行緒池如:FixedThreadPool,ScheduledThreadPool,CachedThreadPool等執行緒池,並分析介紹Executor框架,做到“知其然”:會用執行緒池,正確使用執行緒池。並且“知其所以然”:

Java併發程式設計執行、Callable和Future使用

知識儲備 收藏幾篇好文章: 目錄結構 Callable和Future使用 執行緒池使用 Callable和Future使用 多執行緒實現方式很多,分為兩類:1、沒有返回值的;2、有返回值的。 針對“沒有返回值的”這類可以參

Java併發程式設計執行的使用

1. 為什麼要使用多執行緒? 隨著科技的進步,現在的電腦及伺服器的處理器數量都比較多,以後可能會越來越多,比如我的工作電腦的處理器有8個,怎麼檢視呢? 計算機右鍵--屬性--裝置管理器,開啟屬性視窗,然後點選“裝置管理器”,在“處理器”下可看到所有的處理器: 也可以通過以下Java程式碼獲取到處理器的

Java入門系列執行ThreadPoolExecutor原理分析思考(十五)

前言 關於執行緒池原理分析請參看《http://objcoding.com/2019/04/25/threadpool-running/》,建議對原理不太瞭解的童鞋先看下此文然後再來看本文,這裡通過對原理的學習我談談對執行緒池的理解,若有錯誤之處,還望批評指正。 執行緒池思考 執行緒池我們可認為是準備好執行應

設計模式4--工廠模式 (Executors 執行用到)

工廠模式 主要是為建立物件提供過渡介面,以便將建立物件的具體過程遮蔽隔離起來,達到提高靈活性的目的。  工廠方法模式: 一個抽象產品類,可以派生出多個具體產品類。    一個抽象工廠類,可以派生出多個具體工廠類。    每個具體工廠類只能建立一個具體產品類的例

java 單例模式執行安全的餓漢模式和懶漢模式

單例模式 解決的問題:保證一個類在記憶體中的物件唯一性. 比如:多程式讀取一個配置檔案時,建議配置檔案封裝成物件。會方便操作其中資料,又要保證多個程式讀到的是同一個配置檔案物件, 就需要該配置檔案物件在記憶體中是唯一的。 如何保證物件唯一性呢? 思想: 1,不讓其他程式建立

設計模式】單例模式執行

好記性,不如爛筆頭。對於單例模式的理解和應用還是需要多多實踐,這次有感而發,寫份基於執行緒執行的單例模式。 單例模式該怎樣去實現:建構函式宣告為private或protect防止被外部函式例項化,內部

Java執行原始碼深入理解

在前面的文章中,我們使用執行緒的時候就去建立一個執行緒,這樣實現起來非常簡便,但是就會有一個問題: 如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行

java執行面試題

面試官:執行緒池有哪些?分別的作用是什麼? 常用的執行緒池有: newSingleThreadExecutor newFixedThreadExecutor newCacheThreadExecutor newScheduleThreadExecutor 1、newSingleTh

Java執行-併發執行

執行緒池有了解嗎? 答: java.util.concurrent.ThreadPoolExecutor 類就是一個執行緒池。客戶端呼叫ThreadPoolExecutor.submit(Runnable task) 提交任務,執行緒池內部維護的工作者執行緒的數量就是該執行緒池的執行

Java基礎執行

一、執行緒池概念 執行緒池:其實就是一個容納多個執行緒的容器,其中的執行緒可以反覆使用,省去了頻繁建立執行緒物件的操作,無需反覆建立執行緒而消耗過多資源。 二、工作原理 三、合理利用執行緒池的好處 降低資源消耗。減少了建立和銷燬執行緒的次數,每個工作執行緒都可以

Java基礎學習——多執行執行

1.執行緒池介紹     執行緒池是一種執行緒使用模式。執行緒由於具有空閒(eg:等待返回值)和繁忙這種不同狀態,當數量過多時其建立、銷燬、排程等都會帶來開銷。執行緒池維護了多個執行緒,當分配可併發執行的任務時,它負責排程執行緒執行工作,執行完畢後執行緒不關閉而是返回執行緒池,

單列模式執行安全實現

單例模式有五種寫法:懶漢、餓漢、雙重檢驗鎖、靜態內部類、列舉   懶漢式執行緒不安全 public class Singleton { private static Singleton instance; private Singleton (){} publ

典型C/S模式___執行實現

一、多執行緒和執行緒池的區別 執行緒池 多執行緒 同時啟動若干個執行緒,持續存在,時刻準備處理請求 來一個請求啟動一個執行緒 響應時間短 響應時間較長 二、執行緒池

理解Java併發工具包執行設計

為什麼需要執行緒池? 答:主要原因是因為建立一個執行緒開銷太大,尤其是對大量的小任務需要執行這種場景。 在Java裡面建立一個執行緒,需要包含的東西: (1)它為一個執行緒堆疊分配記憶體,該堆疊為每個執行緒方法呼叫儲存一個幀 (2)每個幀由區域性變數陣列,返回值,運算

Java執行系列--“JUC執行”01 執行架構

概要 前面分別介紹了”Java多執行緒基礎”、”JUC原子類”和”JUC鎖”。本章介紹JUC的最後一部分的內容——執行緒池。內容包括: 執行緒池架構圖 執行緒池示例 執行緒池架構圖 執行緒池的架構圖如下: 1、Executor

Java執行系列--“JUC執行”05 執行原理(四)

概要 本章介紹執行緒池的拒絕策略。內容包括: 拒絕策略介紹 拒絕策略對比和示例 拒絕策略介紹 執行緒池的拒絕策略,是指當任務新增到執行緒池中被拒絕,而採取的處理措施。 當任務新增到執行緒池中之所以被拒絕,可能是由於:第一,執行緒池異常關閉。第二,任務數量