數據結構與算法--關鍵路徑
關鍵路徑與無環加權有向圖的最長路徑
現在考慮一個這樣的問題:你今天事情比較多,要洗衣服、做作業還要燒水洗澡,之後出去找朋友玩。假設洗衣服要20分鐘,燒水要30分鐘,做作業的話你把朋友做好的帶回來抄,只需要10分鐘。你想能早些去找朋友,但在那之前又必須將那些事做完,你要怎麽安排呢?很容易想到,這三者同時進行:打好水開始燒水,衣服扔進洗衣機,回書桌抄作業...20分鐘後作業寫完了,衣服也洗好了,水還有10分鐘水才燒開,利用這時間把洗好的衣服晾曬好,差不多水也燒開了,好了最後去洗澡。簡直一氣呵成,這是我們能花費的最少時間了,在這個例子中剛好等於所有任務中持續時間最長的那個。(你做完了作業才想起來去燒水,花費不止半小時吧)
由此引申出一個更為廣泛的問題,給定一組需要完成的任務和每個任務所需的時間,以及一組關於任務完成的先後次序的優先級限制。在滿足限制條件的前提下應該如何在若幹相同的處理器上(數量不限,可並行處理多個任務)安排任務並在最短的時間內完成所有的任務?
此問題的提出主要是為了解決並行任務調度,使得完成所有任務的總時間最短。待處理的任務總數可能成百上千,因此需要一個算法幫我們快速規劃一個調度方案:按照怎樣的順序執行這些任務,哪些任務可以同時處理,如何使得耗費的總時間最短?正好存在一種叫做“關鍵路徑”的方法可以證明這個問題與無環加權有向圖的最長路徑問題等價。
關鍵路徑:把路徑上各個任務所持續的時間之和稱為路徑長度,從起點到終點的所有路徑中,具有最長路徑長度的路徑稱為關鍵路徑,關鍵路徑中的各個任務稱為關鍵任務。上面的例子中,燒水就是個關鍵任務。
首先,按照關鍵路徑的順序執行任務,一定能保證所有的任務都能完成,且此時花費的總時間最短。有些活動順序進行,有些活動並行進行。從起點到各個頂點,以至從起點到終點的有向路徑可能不止一條,這些路徑的長度也不盡相同。這若幹條從起點到終點的路徑可以看做一個生產過程的幾條不同的生產線,必須每條生產線都完工,整個生產過程才算結束,也就是不論如何你都得等那條花費時間最長的流水線做完,整個生產才可能完工。現在由於可以同時處理多個任務,在花費時間最長的流水線工作過程中,其他流水線一定會提前完工,因此花費時間最長的流水線做完後,整個工程也隨之竣工了。假設花費時間最長的那條流水線所用的時間是M,這就是說,不管怎麽安排,都需要至少M的時間才能竣工,而這已經是最短時間了。
再舉個例子,你和朋友們約好去某個地方聚餐。有些朋友到的比較早,有些朋友到得比較晚,但是不管怎麽樣,我們都要等到最後一個朋友到目的地,這樣大家才算是聚齊了。
說了半天,求並行任務調度中的關鍵路徑,實際上就是求從起點到終點的最長路徑。
通過求解最長路徑得到關鍵路徑
通過上面的討論,現在只需求最長路徑,就能得到關鍵路徑。我們知道任務調度必須要求圖是無環的,因此可以使用求無環加權有向圖的最短路徑的方法求最長路徑。
具體方法是:復制原圖得到一個副本,將副本的所有邊的權重取相反數,求副本的最短路徑實際上就是原圖的最長路徑。
或者一個更為簡單的方法:修改邊的放松方法。改為distTo[v] + edge.weight() > distTo[w]
(求最短路徑的不等號是<
),即:有比原來到w更長的路徑就更新。同時初始化的時候,distTo[i]從原來的正無窮改成負無窮。
求無環加權有向圖的最短路徑,可以按照拓補排序依次放松頂點。詳細地見我上一遍文章(拓補排序及無環加權有向圖的最短路徑),只需改前述兩個地方,就能求得最長路徑。
package Chap7;
import Java.util.LinkedList;
/**
* 求無環有向圖的最長路徑
*/
public class AcycliLP {
private DiEdge[] edgeTo;
private double[] distTo;
public AcycliLP(EdgeWeightedDiGraph<?> graph, int s) {
edgeTo = new DiEdge[graph.vertexNum()];
distTo = new double[graph.vertexNum()];
for (int i = 0; i < graph.vertexNum(); i++) {
// 1. 改成了負無窮
distTo[i] = Double.NEGATIVE_INFINITY;
}
distTo[s] = 0.0;
// 以上是初始化
TopoSort topo = new TopoSort(graph);
if (!topo.isDAG()) {
throw new RuntimeException("該圖存在有向環,本算法無法處理!");
}
for (int v : topo.order()) {
relax(graph, v);
}
}
private void relax(EdgeWeightedDiGraph<?> graph, int v) {
for (DiEdge edge : graph.adj(v)) {
int w = edge.to();
// 2、若路徑更長就更新
if (distTo[v] + edge.weight() > distTo[w]) {
distTo[w] = distTo[v] + edge.weight();
edgeTo[w] = edge;
}
}
}
public double distTo(int v) {
return distTo[v];
}
public boolean hasPathTo(int v) {
return distTo[v] != Double.NEGATIVE_INFINITY;
}
public Iterable<DiEdge> pathTo(int v) {
if (hasPathTo(v)) {
LinkedList<DiEdge> path = new LinkedList<>();
for (DiEdge edge = edgeTo[v]; edge != null; edge = edgeTo[edge.from()]) {
path.push(edge);
}
return path;
}
return null;
}
}
好,可以求得關鍵路徑了。現在來看一個任務調度的例子,如何利用上面的實現來安排任務。
現在要先將其轉換為圖。由於有些工程不好看出哪個任務是最先開工的,哪個是收尾的任務(比如上圖)。在不同的任務表中,每個任務都可能成為起點或終點。為了可以應付各種任務表,不妨設置虛擬的起點和終點。因為每個任務都可能最先開工,所以設置一個虛擬起點可以指向圖中所有頂點,且權值都為0;因為每個頂點都可能作為收尾任務,因此所有頂點指向一個虛擬的終點,權值是這些頂點代表的任務所持續的時間。這樣我們也不用在乎任務表中哪個任務最先開工、最後收尾的關系夠不夠明確了,設置了虛擬起點和終點後,只要求得從起點到終點的最長路徑,中間走過的路徑的就是各個任務執行的順序。
加入虛擬頂點後,上面的任務表其實就是下圖。
各條從s到t的路徑中(想象成各條生產線),找出最長的那條(費時最長的那條生產線),這條0 -> 9 -> 6 -> 8 -> 2就是關鍵路徑,按照這個順序執行任務就能使得完成整個工程總時間最短。
我們用代碼測試一下。
package Chap7;
public class CPM {
private AcycliLP lp;
private int s; // 虛擬的起點
private int t; // 虛擬的終點
private int jobsNum; // 任務個數
public CPM(double[] jobDuration, int[][] successorAfter) {
jobsNum = jobDuration.length;
// 設置兩個虛擬頂點,代表起點和終點
EdgeWeightedDiGraph<?> graph = new EdgeWeightedDiGraph<>(jobsNum + 2);
s = jobDuration.length; // 起點
t = s + 1; // 終點
for (int i = 0; i < jobsNum; i++) {
// 每個頂點都可能成為最先開工的,所以虛擬起點指向所有頂點,且費時都為0
graph.addDiEdge(new DiEdge(s, i, 0.0));
// 每個頂點都可能成為工程收尾的活動,所有頂點都指向該虛擬終點,費時自然是每個活動所持續的時間
graph.addDiEdge(new DiEdge(i, t, jobDuration[i]));
// 任務i必須在任務j之前完成, 即加入i -> j的有向邊
for (int j = 0; j < successorAfter[i].length; j++) {
int successor = successorAfter[i][j];
graph.addDiEdge(new DiEdge(i, successor, jobDuration[i]));
}
// 找到到每個活動的最長路徑
lp = new AcycliLP(graph, s);
}
}
public void printJobExecuteOrder() {
system.out.println("各任務開始時間表:");
for (int i = 0; i < jobsNum; i++) {
System.out.println(i + ": " + lp.distTo(i));
}
System.out.println("\n按照以下順序執行任務,開始時間相同的任務同時執行。");
for (DiEdge edge : lp.pathTo(t)) {
// 遇到起點不打印箭頭
if (edge.from() == s) {
System.out.print(edge.to());
}
// 最後一個任務在前一個頂點的就打印過了,遇到最後一條邊換行就行
else if (edge.to() == t) {
System.out.println();
} else {
System.out.print(" -> " + edge.to());
}
}
System.out.println("總共需要" + lp.distTo(t));
}
public static void main(String[] args) {
// 每個任務的持續時間
double[] duration = {41.0, 51.0, 50.0, 36.0, 38.0, 45.0, 21.0, 32.0, 32.0, 29.0};
// 必須在這些任務之前完成,如successorAfter[0]表示任務0的後繼任務1、7、9,也就是說0必須在1、7、9之前做完
// {} 表示該任務不要求在哪個任務執行前就得完成,說明它可能是作為收尾的任務
int[][] successorAfter = {{1, 7, 9}, {2}, {}, {}, {}, {}, {3, 8}, {3, 8}, {2}, {4, 6}};
CPM cpm = new CPM(duration, successorAfter);
cpm.printJobExecuteOrder();
}
}
根據任務表,給圖增加邊。
- 虛擬起點到所有頂點的邊,且權值為0;
- 所有頂點到虛擬終點的邊,且權值為頂點任務持續時間。
- 某任務v必須在一些任務之前完成的邊,且權值為任務v的持續時長。比如0必須在1、7、9之前,則增加0 -> 1,0 -> 7,0 -> 9邊,且權值都為任務0的持續時長。
接下來通過AcycliLP
類求得關鍵路徑,distTo[i]就表示i任務的開始時間,distTo[t]表示做完整個工程需要的最短時間;pathTo[i]表示執行到任務i的關鍵路徑,自然pathTo[t]就是整個工程的關鍵路徑。
上面的代碼會打印如下結果:
各任務開始時間表:
0: 0.0
1: 41.0
2: 123.0
3: 91.0
4: 70.0
5: 0.0
6: 70.0
7: 41.0
8: 91.0
9: 41.0
按照以下順序執行任務,開始時間相同的任務同時執行。
0 -> 9 -> 6 -> 8 -> 2
總共需要173.0
可以清楚地看到每個任務應該在什麽時刻開始動工,只要按照0 -> 9 -> 6 -> 8 -> 2這樣順序執行就好。而且,有些任務開始時間一樣,表示我們應該同時執行它們以節省時間。所以最後的方案是:0、5最先並同時執行,只要0做完了,就開始同時執行1、7、9,只要9做完了立刻同時執行4、6,一旦6做完了同時執行3、8,8只要完工緊接著做2,最後等待2也終於做完,整個工程竣工!需要的最短時間為173.0。
關鍵路徑中的任務都是做完後就立刻執行下一個的(比如0執行完立刻執行9,9完工立刻執行6...),不用等待和它一起開工的其他任務也做完,因為這些任務在花費最長時間的這條關鍵路徑上,它們遲早都能做完的。因此關鍵路徑中,各個關鍵任務是不會有空閑和等待時間。如下圖是上面任務表的執行流程
0 -> 9 -> 6 -> 8 -> 2一氣呵成,中間毫無停頓。而且其他任務在這條生產線執行過程中均已完成!
by @sunhaiyu
2017.9.29
Tags: 路徑 任務 時間 燒水 作業 衣服
文章來源: