1. 程式人生 > >Java 使用阻塞佇列 BlockingQueue 多執行緒搜尋目錄及子目錄下包含關鍵字所有檔案

Java 使用阻塞佇列 BlockingQueue 多執行緒搜尋目錄及子目錄下包含關鍵字所有檔案

Java 使用阻塞佇列 BlockingQueue 多執行緒在一個目錄及它的所以子目錄下搜尋所有檔案,打印出包含關鍵字的行

阻塞佇列( blocking queue )

生產者執行緒向佇列插人元素, 消費者執行緒則取出它們。使用佇列,可以安全地從一個執行緒向另一個執行緒傳遞資料。 工作者執行緒可以週期性地將中間結果儲存在阻塞佇列中。其他的工作者執行緒移出中間結果並進一步加以修改。佇列會自動地平衡負載。如果第一個執行緒集執行得比第二個慢, 第二個執行緒集在等待結果時會阻塞。 如果第一個執行緒集執行得快, 它將等待第二個佇列集趕上來。

java.util.concurrent

包提供了阻塞佇列的幾個變種。
- LinkedBlockingQueue 的容量是沒有上邊界的但是,也可以選擇指定最大容量。LinkedBlockingDeque 是一個雙端的版本。
- ArrayBlockingQueue 在構造時需要指定容量,並且有一個可選的引數來指定是否需要公平性。若設定了公平引數, 則那麼等待了最長時間的執行緒會優先得到處理。通常,公平性會降低效能,只有在確實非常需要時才使用它。
- PriorityBlockingQueue 是一個帶優先順序的佇列, 而不是先進先出佇列。元素按照它們的優先順序順序被移出。該佇列是沒有容量上限,但是 ,如果佇列是空的, 取元素的操作會阻塞。

常用的方法有兩個,put方法新增元素,take方法取出元素。如果佇列滿, 則 put 方法阻塞 ; 如果佇列空, 則 take 方法阻塞。

以下程式展示瞭如何使用阻塞佇列來控制一組執行緒。程式在一個目錄及它的所有子目錄下搜尋所有檔案, 打印出包含指定關鍵字的行

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

/**
 * 使用BlockingQueue多執行緒技術,在一個目錄及它的所以子目錄下搜尋所有檔案,打印出包含關鍵字的檔案及行號
 *
 */
public class BlcokingQueueTest
{
private static final int FILE_QUEUE_SIZE = 10; private static final int SEARCH_THREADS = 100; // 搜尋執行緒數 private static final File DUMMY = new File(""); // 虛擬空資料夾,作為執行緒結束標誌 // 阻塞佇列,先進先出 private static BlockingQueue<File> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE); public static void main(String[] args) { try (Scanner in = new Scanner(System.in)) { System.out.print("Enter base directory (e.g. /opt/jdk1.8.0/src): "); String directory = in.nextLine(); System.out.print("Enter keyword (e.g. BlockingQueue): "); String keyword = in.nextLine(); // 開啟一個執行緒,遞迴列舉給定目錄及其所有子目錄下所有檔案的檔名 Runnable enumerator = () -> { try { enumerate(new File(directory)); queue.put(DUMMY); // 最後新增進空檔案,以空檔案作為結束標誌 } catch (InterruptedException e) { } }; new Thread(enumerator).start(); // 開啟 SEARCH_THREADS(100) 個執行緒,查詢所有檔案中具有keyword的行 for (int i = 1; i <= SEARCH_THREADS; i++) { Runnable searcher = () -> { try { boolean done = false; while (!done) { File file = queue.take(); // 取出佇列中的檔名 if (file == DUMMY) { queue.put(file); // 空檔案則說明已經結束 done = true; } else search(file, keyword); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { } }; new Thread(searcher).start(); } } } /** * 遞迴列舉給定目錄及其所有子目錄下所有檔案的檔名 * * @param directory * 開始搜尋的目錄 * @throws InterruptedException */ public static void enumerate(File directory) throws InterruptedException { File[] files = directory.listFiles(); // 返回目錄下所有檔名和目錄的list迭代器 for (File file : files) { if (file.isDirectory()) // 判斷是否的目錄 enumerate(file); // 對於目錄繼續遞迴列舉 else queue.put(file); // 檔案則將檔名加入佇列 } } /** * 開啟檔案,搜尋檔案中包含keyword的行 * * @param file * 搜尋關鍵字的檔案 * @param keyword * 用於搜尋的關鍵字 * @throws IOException */ public static void search(File file, String keyword) throws IOException { try (Scanner in = new Scanner(file, "UTF-8")) { int lineNumber = 0; while (in.hasNextLine()) { lineNumber ++; String line = in.nextLine(); if (line.contains(keyword)) // 列印檔案中包含 keyword 的行的資訊 System.out.printf("%s:%d:%s%n", file.getPath(), lineNumber, line); } } } }

以上程式碼來自 Java 核心技術卷一第14章。