java並發之同步輔助類Phaser
阿新 • • 發佈:2018-03-20
java 多線程 Phaser含義:
isTerminated():判斷Phaser是否終止。
register():將一個新的參與者註冊到Phaser中,這個新的參與者將被當成沒有執行完本階段的線程。
forceTermination():強制Phaser進入終止態。
... ...
更加復雜和強大的同步輔助類。它允許並發執行多階段任務。當我們有並發任務並且需要分解成幾步執行時,(CyclicBarrier是分成兩步),就可以選擇使用Phaser。Phaser類機制是在每一步結束的位置對線程進行同步,當所有的線程都完成了這一步,才允許執行下一步。
跟其他同步工具一樣,必須對Phaser類中參與同步操作的任務數進行初始化,不同的是,可以動態的增加或者減少任務數。
函數:
arriveAndAwaitAdvance():類似於CyclicBarrier的await()方法,等待其它線程都到來之後同步繼續執行。
arriveAndDeregister():把執行到此的線程從Phaser中註銷掉。
register():將一個新的參與者註冊到Phaser中,這個新的參與者將被當成沒有執行完本階段的線程。
forceTermination():強制Phaser進入終止態。
... ...
例子
使用Phaser類同步三個並發任務。這三個任務將在三個不同的文件夾及其子文件夾中查找過去24小時內修改過擴展為為.log的文件。這個任務分成以下三個步驟:
1、在執行的文件夾及其子文件夾中獲取擴展名為.log的文件
2、對每一步的結果進行過濾,刪除修改時間超過24小時的文件
3、將結果打印到控制臺
在第一步和第二步結束的時候,都會檢查所查找到的結果列表是不是有元素存在。如果結果列表是空的,對應的線程將結束執行,並從Phaser中刪除。(也就是動態減少任務數)
文件查找類:
import java.io.File; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; public class FileSearch implements Runnable { private String initPath; private String end; private List<String> results; private Phaser phaser; public FileSearch(String initPath, String end, Phaser phaser) { this.initPath = initPath; this.end = end; this.phaser=phaser; results=new ArrayList<>(); } @Override public void run() { phaser.arriveAndAwaitAdvance();//等待所有的線程創建完成,確保在進行文件查找的時候所有的線程都已經創建完成了 System.out.printf("%s: Starting.\n",Thread.currentThread().getName()); // 1st Phase: 查找文件 File file = new File(initPath); if (file.isDirectory()) { directoryProcess(file); } // 如果查找結果為false,那麽就把該線程從Phaser中移除掉並且結束該線程的運行 if (!checkResults()){ return; } // 2nd Phase: 過濾結果,過濾出符合條件的(一天內的)結果集 filterResults(); // 如果過濾結果集結果是空的,那麽把該線程從Phaser中移除,不讓它進入下一階段的執行 if (!checkResults()){ return; } // 3rd Phase: 顯示結果 showInfo(); phaser.arriveAndDeregister();//任務完成,註銷掉所有的線程 System.out.printf("%s: Work completed.\n",Thread.currentThread().getName()); } private void showInfo() { for (int i=0; i<results.size(); i++){ File file=new File(results.get(i)); System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath()); } // Waits for the end of all the FileSearch threads that are registered in the phaser phaser.arriveAndAwaitAdvance(); } private boolean checkResults() { if (results.isEmpty()) { System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase()); System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase()); //結果為空,Phaser完成並把該線程從Phaser中移除掉 phaser.arriveAndDeregister(); return false; } else { // 等待所有線程查找完成 System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size()); phaser.arriveAndAwaitAdvance(); return true; } } private void filterResults() { List<String> newResults=new ArrayList<>(); long actualDate=new Date().getTime(); for (int i=0; i<results.size(); i++){ File file=new File(results.get(i)); long fileDate=file.lastModified(); if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){ newResults.add(results.get(i)); } } results=newResults; } private void directoryProcess(File file) { // Get the content of the directory File list[] = file.listFiles(); if (list != null) { for (int i = 0; i < list.length; i++) { if (list[i].isDirectory()) { // If is a directory, process it directoryProcess(list[i]); } else { // If is a file, process it fileProcess(list[i]); } } } } private void fileProcess(File file) { if (file.getName().endsWith(end)) { results.add(file.getAbsolutePath()); } } }
測試主類:
import java.util.concurrent.Phaser;
public class PhaserMain {
public static void main(String[] args) {
Phaser phaser = new Phaser(3);
FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
FileSearch documents = new FileSearch("C:\\Documents And Settings", "log", phaser);
Thread systemThread = new Thread(system, "System");
systemThread.start();
Thread appsThread = new Thread(apps, "Apps");
appsThread.start();
Thread documentsThread = new Thread(documents, "Documents");
documentsThread.start();
try {
systemThread.join();
appsThread.join();
documentsThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("Terminated: %s\n", phaser.isTerminated());
}
}
海量視頻獲取 vue angular
java並發之同步輔助類Phaser