1. 程式人生 > >java並發之同步輔助類Phaser

java並發之同步輔助類Phaser

java 多線程

Phaser含義:

更加復雜和強大的同步輔助類。它允許並發執行多階段任務。當我們有並發任務並且需要分解成幾步執行時,(CyclicBarrier是分成兩步),就可以選擇使用Phaser。Phaser類機制是在每一步結束的位置對線程進行同步,當所有的線程都完成了這一步,才允許執行下一步。
跟其他同步工具一樣,必須對Phaser類中參與同步操作的任務數進行初始化,不同的是,可以動態的增加或者減少任務數。

函數:
arriveAndAwaitAdvance():類似於CyclicBarrier的await()方法,等待其它線程都到來之後同步繼續執行。
arriveAndDeregister():把執行到此的線程從Phaser中註銷掉。

isTerminated():判斷Phaser是否終止。
register():將一個新的參與者註冊到Phaser中,這個新的參與者將被當成沒有執行完本階段的線程。
forceTermination():強制Phaser進入終止態。
... ...

例子
使用Phaser類同步三個並發任務。這三個任務將在三個不同的文件夾及其子文件夾中查找過去24小時內修改過擴展為為.log的文件。這個任務分成以下三個步驟:
1、在執行的文件夾及其子文件夾中獲取擴展名為.log的文件
2、對每一步的結果進行過濾,刪除修改時間超過24小時的文件
3、將結果打印到控制臺
在第一步和第二步結束的時候,都會檢查所查找到的結果列表是不是有元素存在。如果結果列表是空的,對應的線程將結束執行,並從Phaser中刪除。(也就是動態減少任務數)

文件查找類:

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class FileSearch implements Runnable {

private String initPath;

private String end;

private List<String> results;

private Phaser phaser;

public FileSearch(String initPath, String end, Phaser phaser) {
    this.initPath = initPath;
    this.end = end;
    this.phaser=phaser;
    results=new ArrayList<>();
}
@Override
public void run() {

    phaser.arriveAndAwaitAdvance();//等待所有的線程創建完成,確保在進行文件查找的時候所有的線程都已經創建完成了

    System.out.printf("%s: Starting.\n",Thread.currentThread().getName());

    // 1st Phase: 查找文件
    File file = new File(initPath);
    if (file.isDirectory()) {
        directoryProcess(file);
    }

    // 如果查找結果為false,那麽就把該線程從Phaser中移除掉並且結束該線程的運行
    if (!checkResults()){
        return;
    }

    // 2nd Phase: 過濾結果,過濾出符合條件的(一天內的)結果集
    filterResults();

    // 如果過濾結果集結果是空的,那麽把該線程從Phaser中移除,不讓它進入下一階段的執行
    if (!checkResults()){
        return;
    }

    // 3rd Phase: 顯示結果
    showInfo();
    phaser.arriveAndDeregister();//任務完成,註銷掉所有的線程
    System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());
}
private void showInfo() {
    for (int i=0; i<results.size(); i++){
        File file=new File(results.get(i));
        System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath());
    }
    // Waits for the end of all the FileSearch threads that are registered in the phaser
    phaser.arriveAndAwaitAdvance();
}
private boolean checkResults() {
    if (results.isEmpty()) {
        System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase());
        System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase());
        //結果為空,Phaser完成並把該線程從Phaser中移除掉
        phaser.arriveAndDeregister();
        return false;
    } else {
        // 等待所有線程查找完成
        System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size());
        phaser.arriveAndAwaitAdvance();
        return true;
    }        
}
private void filterResults() {
    List<String> newResults=new ArrayList<>();
    long actualDate=new Date().getTime();
    for (int i=0; i<results.size(); i++){
        File file=new File(results.get(i));
        long fileDate=file.lastModified();

        if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){
            newResults.add(results.get(i));
        }
    }
    results=newResults;
}
private void directoryProcess(File file) {
    // Get the content of the directory
    File list[] = file.listFiles();
    if (list != null) {
        for (int i = 0; i < list.length; i++) {
            if (list[i].isDirectory()) {
                // If is a directory, process it
                directoryProcess(list[i]);
            } else {
                // If is a file, process it
                fileProcess(list[i]);
            }
        }
    }
}
private void fileProcess(File file) {
    if (file.getName().endsWith(end)) {
        results.add(file.getAbsolutePath());
    }
}
}

測試主類:

import java.util.concurrent.Phaser;

public class PhaserMain {

public static void main(String[] args) {

    Phaser phaser = new Phaser(3);

    FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
    FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
    FileSearch documents = new FileSearch("C:\\Documents And Settings", "log", phaser);

    Thread systemThread = new Thread(system, "System");
    systemThread.start();
    Thread appsThread = new Thread(apps, "Apps");
    appsThread.start();        
    Thread documentsThread = new Thread(documents, "Documents");
    documentsThread.start();
    try {
        systemThread.join();
        appsThread.join();
        documentsThread.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.printf("Terminated: %s\n", phaser.isTerminated());
}

}

海量視頻獲取 vue angular
技術分享圖片

java並發之同步輔助類Phaser