1. 程式人生 > >使用Java多執行緒實現任務分發

使用Java多執行緒實現任務分發

多執行緒下載由來已久,如 FlashGet、NetAnts 等工具,它們都是依懶於 HTTP 協議的支援(Range 欄位指定請求內容範圍),首先能讀取出請求內容 (即欲下載的檔案) 的大小,劃分出若干區塊,把區塊分段分發給每個執行緒去下載,執行緒從本段起始處下載資料及至段尾,多個執行緒下載的內容最終會寫入到同一個檔案中。

    只研究有用的,工作中的需求:要把多個任務分派給Java的多個執行緒去執行,這其中就會有一個任務列表指派到執行緒的策略思考:已知:1. 一個待執行的任務列表,2. 指定要啟動的執行緒數;問題是:每個執行緒實際要執行哪些任務。

    使用Java多執行緒實現這種任務分發的策略是:任務列表連續按執行緒數分段,先保證每執行緒平均能分配到的任務數,餘下的任務從前至後依次附加到執行緒中——只是數量上,實際每個執行緒執行的任務都還是連續的。如果出現那種僧多(執行緒) 粥(任務) 少的情況,實際啟動的執行緒數就等於任務數,一挑一。這裡只實現了每個執行緒各掃自家門前雪,動作快的完成後眼見別的執行緒再累都是愛莫能助。

    實現及演示程式碼如下:由三個類實現,寫在了一個 Java 檔案中:TaskDistributor 為任務分發器,Task 為待執行的任務,WorkThread 為自定的工作執行緒。程式碼中運用了命令模式,如若能配以監聽器,用上觀察者模式來控制 UI 顯示就更絕妙不過了,就能實現像下載中的區塊著色跳躍的動感了,在此定義下一步的著眼點了。

    程式碼中有較為詳細的註釋,看這些註釋和執行結果就很容易理解的。main() 是測試方法

package com.alpha.thread;

import java.util.ArrayList;
import java.util.List;

/**
 * 指派任務列表給執行緒的分發器
 */
public class TaskDistributor {
	
	/**
	 * 測試方法
	 * @param args
	 */
	@SuppressWarnings("unchecked")
	public static void main(String[] args) {
		// 初始化要執行的任務列表
		List taskList = new ArrayList();
		for (int i = 0; i < 100; i++) {
			taskList.add(new Task(i));
		}
		// 設定要啟動的工作執行緒數為 4 個
		int threadCount = 4;
		List[] taskListPerThread = distributeTasks(taskList, threadCount);
		System.out.println("實際要啟動的工作執行緒數:" + taskListPerThread.length);
		for (int i = 0; i < taskListPerThread.length; i++) {
			Thread workThread = new WorkThread(taskListPerThread[i], i);
			workThread.start();
		}
	}

	/**
	 * 把 List 中的任務分配給每個執行緒,先平均分配,剩於的依次附加給前面的執行緒 返回的陣列有多少個元素 (List) 就表明將啟動多少個工作執行緒
	 * 
	 * @param taskList
	 *            待分派的任務列表
	 * @param threadCount
	 *            執行緒數
	 * @return 列表的陣列,每個元素中存有該執行緒要執行的任務列表
	 */
	@SuppressWarnings("unchecked")
	public static List[] distributeTasks(List taskList, int threadCount) {
		// 每個執行緒至少要執行的任務數,假如不為零則表示每個執行緒都會分配到任務
		int minTaskCount = taskList.size() / threadCount;
		// 平均分配後還剩下的任務數,不為零則還有任務依個附加到前面的執行緒中
		int remainTaskCount = taskList.size() % threadCount;
		// 實際要啟動的執行緒數,如果工作執行緒比任務還多
		// 自然只需要啟動與任務相同個數的工作執行緒,一對一的執行
		// 畢竟不打算實現了執行緒池,所以用不著預先初始化好休眠的執行緒
		int actualThreadCount = minTaskCount > 0 ? threadCount
				: remainTaskCount;
		// 要啟動的執行緒陣列,以及每個執行緒要執行的任務列表
		List[] taskListPerThread = new List[actualThreadCount];
		int taskIndex = 0;
		// 平均分配後多餘任務,每附加給一個執行緒後的剩餘數,重新宣告與 remainTaskCount
		// 相同的變數,不然會在執行中改變 remainTaskCount 原有值,產生麻煩
		int remainIndces = remainTaskCount;
		for (int i = 0; i < taskListPerThread.length; i++) {
			taskListPerThread[i] = new ArrayList();
			// 如果大於零,執行緒要分配到基本的任務
			if (minTaskCount > 0) {
				for (int j = taskIndex; j < minTaskCount + taskIndex; j++) {
					taskListPerThread[i].add(taskList.get(j));
				}
				taskIndex += minTaskCount;
			}
			// 假如還有剩下的,則補一個到這個執行緒中
			if (remainIndces > 0) {
				taskListPerThread[i].add(taskList.get(taskIndex++));
				remainIndces--;
			}
		}
		// 列印任務的分配情況
		for (int i = 0; i < taskListPerThread.length; i++) {
			System.out.println("執行緒 "
					+ i
					+ " 的任務數:"
					+ taskListPerThread[i].size()
					+ " 區間["
					+ ((Task) taskListPerThread[i].get(0)).getTaskId()
					+ ","
					+ ((Task) taskListPerThread[i].get(taskListPerThread[i].size() - 1))
							.getTaskId() + "]");
		}
		return taskListPerThread;
	}
}
package com.alpha.thread;

/**
 * 要執行的任務,可在執行時改變它的某個狀態或呼叫它的某個操作 例如任務有三個狀態,就緒,執行,完成,預設為就緒態 要進一步完善,可為 Task
 * 加上狀態變遷的監聽器,因之決定UI的顯示
 */
class Task {
	public static final int READY = 0;
	public static final int RUNNING = 1;
	public static final int FINISHED = 2;
	@SuppressWarnings("unused")
	private int status;
	// 宣告一個任務的自有業務含義的變數,用於標識任務
	private int taskId;

	// 任務的初始化方法
	public Task(int taskId) {
		this.status = READY;
		this.taskId = taskId;
	}

	/**
	 * 執行任務
	 */
	public void execute() {
		// 設定狀態為執行中
		setStatus(Task.RUNNING);
		System.out.println("當前執行緒 ID 是:" + Thread.currentThread().getName()
				+ " | 任務 ID 是:" + this.taskId);
		// 附加一個延時
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		// 執行完成,改狀態為完成
		setStatus(FINISHED);
	}

	public void setStatus(int status) {
		this.status = status;
	}

	public int getTaskId() {
		return taskId;
	}
}
package com.alpha.thread;

import java.util.List;

/**
 * 自定義的工作執行緒,持有分派給它執行的任務列表
 */
class WorkThread extends Thread {
	// 本執行緒待執行的任務列表,你也可以指為任務索引的起始值
	private List<Task> taskList = null;
	@SuppressWarnings("unused")
	private int threadId;

	/**
	 * 構造工作執行緒,為其指派任務列表,及命名執行緒 ID
	 * 
	 * @param taskList
	 *            欲執行的任務列表
	 * @param threadId
	 *            執行緒 ID
	 */
	@SuppressWarnings("unchecked")
	public WorkThread(List taskList, int threadId) {
		this.taskList = taskList;
		this.threadId = threadId;
	}

	/**
	 * 執行被指派的所有任務
	 */
	public void run() {
		for (Task task : taskList) {
			task.execute();
		}
	}
}


執行結果如下,注意觀察每個Java多執行緒分配到的任務數量及區間。直到所有的執行緒完成了所分配到的任務後程序結束:

執行緒 0 的任務數:25 區間[0,24]
執行緒 1 的任務數:25 區間[25,49]
執行緒 2 的任務數:25 區間[50,74]
執行緒 3 的任務數:25 區間[75,99]
實際要啟動的工作執行緒數:4
當前執行緒 ID 是:Thread-0 | 任務 ID 是:0
當前執行緒 ID 是:Thread-3 | 任務 ID 是:75
當前執行緒 ID 是:Thread-1 | 任務 ID 是:25
當前執行緒 ID 是:Thread-2 | 任務 ID 是:50
當前執行緒 ID 是:Thread-1 | 任務 ID 是:26
當前執行緒 ID 是:Thread-3 | 任務 ID 是:76
當前執行緒 ID 是:Thread-0 | 任務 ID 是:1
當前執行緒 ID 是:Thread-2 | 任務 ID 是:51

上面坦白來只算是基本功夫,貼出來還真見笑了。還有更為複雜的功能。

    像Java多執行緒的下載工具的確更充分利用了網路資源,而且像 FlashGet、NetAnts 都實現了:假如某個執行緒下載完了欲先所分配段的內容之後,會幫其他執行緒下載未完成資料,直到任務完成;或某一下載執行緒的未完成段區間已經很小了,用不著別人來幫忙時,這就涉及到任務的進一步分配。再如,以上兩個工具都能動態增加、減小或中止執行緒,越說越複雜了,它們原本比這複雜多了,這些實現可能定義各種佇列來實現,如未完成任務佇列、下載中任務佇列和已完成佇列等。