1. 程式人生 > >Oracle官方併發教程之高階併發物件

Oracle官方併發教程之高階併發物件

原文地址譯文地址

譯者:李任

目前為止,該教程重點講述了最初作為Java平臺一部分的低級別API。這些API對於非常基本的任務來說已經足夠,但是對於更高階的任務就需要更高階的API。特別是針對充分利用了當今多處理器和多核系統的大規模併發應用程式。 本節,我們將著眼於Java 5.0新增的一些高階併發特徵。大多數特徵已經在新的java.util.concurrent包中實現。Java集合框架中也定義了新的併發資料結構。

  • 鎖物件提供了可以簡化許多併發應用的鎖的慣用法。
  • Executors為載入和管理執行緒定義了高階API。Executors的實現由java.util.concurrent包提供,提供了適合大規模應用的執行緒池管理。
  • 併發集合簡化了大型資料集合管理,且極大的減少了同步的需求。
  • 原子變數有減小同步粒度和避免記憶體一致性錯誤的特徵。
  • 併發隨機數(JDK7)提供了高效的多執行緒生成偽隨機數的方法。

鎖物件

譯者:李任

同步程式碼依賴於一種簡單的可重入鎖。這種鎖使用簡單,但也有諸多限制。java.util.concurrent.locks包提供了更復雜的鎖。我們不會詳細考察這個包,但會重點關注其最基本的介面,鎖。 鎖物件作用非常類似同步程式碼使用的隱式鎖。如同隱式鎖,每次只有一個執行緒可以獲得鎖物件。通過關聯Condition物件,鎖物件也支援wait/notify機制。 鎖物件之於隱式鎖最大的優勢在於,它們有能力收回獲得鎖的嘗試。如果當前鎖物件不可用,或者鎖請求超時(如果超時時間已指定),tryLock方法會收回獲取鎖的請求。如果在鎖獲取前,另一個執行緒傳送了一箇中斷,lockInterruptibly方法也會收回獲取鎖的請求。 讓我們使用鎖物件來解決我們在

活躍度中見到的死鎖問題。Alphonse和Gaston已經把自己訓練成能注意到朋友何時要鞠躬。我們通過要求Friend物件在雙方鞠躬前必須先獲得鎖來模擬這次改善。下面是改善後模型的原始碼,Safelock。為了展示其用途廣泛,我們假設Alphonse和Gaston對於他們新發現的穩定鞠躬的能力是如此入迷,以至於他們無法不相互鞠躬。

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Random;

public class Safelock {
    static class Friend {
        private final String name;
        private final Lock lock = new ReentrantLock();

        public Friend(String name) {
            this.name = name;
        }

        public String getName() {
            return this.name;
        }

        public boolean impendingBow(Friend bower) {
            Boolean myLock = false;
            Boolean yourLock = false;
            try {
                myLock = lock.tryLock();
                yourLock = bower.lock.tryLock();
            } finally {
                if (! (myLock && yourLock)) {
                    if (myLock) {
                        lock.unlock();
                    }
                    if (yourLock) {
                        bower.lock.unlock();
                    }
                }
            }
            return myLock && yourLock;
        }

        public void bow(Friend bower) {
            if (impendingBow(bower)) {
                try {
                    System.out.format("%s: %s has"
                        + " bowed to me!%n",
                        this.name, bower.getName());
                    bower.bowBack(this);
                } finally {
                    lock.unlock();
                    bower.lock.unlock();
                }
            } else {
                System.out.format("%s: %s started"
                    + " to bow to me, but saw that"
                    + " I was already bowing to"
                    + " him.%n",
                    this.name, bower.getName());
            }
        }

        public void bowBack(Friend bower) {
            System.out.format("%s: %s has" +
                " bowed back to me!%n",
                this.name, bower.getName());
        }
    }
    static class BowLoop implements Runnable {
        private Friend bower;
        private Friend bowee;

        public BowLoop(Friend bower, Friend bowee) {
            this.bower = bower;
            this.bowee = bowee;
        }

        public void run() {
            Random random = new Random();
            for (;;) {
                try {
                    Thread.sleep(random.nextInt(10));
                } catch (InterruptedException e) {}
                bowee.bow(bower);
            }
        }
    }

    public static void main(String[] args) {
        final Friend alphonse =
            new Friend("Alphonse");
        final Friend gaston =
            new Friend("Gaston");
        new Thread(new BowLoop(alphonse, gaston)).start();
        new Thread(new BowLoop(gaston, alphonse)).start();
    }
}

執行器(Executors)

在之前所有的例子中,Thread物件表示的執行緒和Runnable物件表示的執行緒所執行的任務之間是緊耦合的。這對於小型應用程式來說沒問題,但對於大規模併發應用來說,合理的做法是將執行緒的建立與管理和程式的其他部分分離開。封裝這些功能的物件就是執行器,接下來的部分將講詳細描述執行器。

 Executor介面

譯者:Greenster java.util.concurrent中包括三個Executor介面:

  • Executor,一個執行新任務的簡單介面。
  • ExecutorService,擴充套件了Executor介面。添加了一些用來管理執行器生命週期和任務生命週期的方法。
  • ScheduledExecutorService,擴充套件了ExecutorService。支援Future和定期執行任務。

通常來說,指向Executor物件的變數應被宣告為以上三種介面之一,而不是具體的實現類

Executor介面

Executor介面只有一個execute方法,用來替代通常建立(啟動)執行緒的方法。例如:r是一個Runnable物件,e是一個Executor物件。可以使用

  e.execute(r);

來代替

  (new Thread(r)).start();

但execute方法沒有定義具體的實現方式。對於不同的Executor實現,execute方法可能是建立一個新執行緒並立即啟動,但更有可能是使用已有的工作執行緒執行r,或者將r放入到佇列中等待可用的工作執行緒。(我們將線上程池一節中描述工作執行緒。)

ExecutorService介面

新加了更加通用的submit方法。submit方法除了和execute方法一樣可以接受Runnable物件作為引數,還可以接受Callable物件作為引數。使用Callable物件可以能使任務返還執行的結果。通過submit方法返回的Future物件可以讀取Callable任務的執行結果,或是管理Callable任務和Runnable任務的狀態。 ExecutorService也提供了批量執行Callable任務的方法。最後,ExecutorService還提供了一些關閉執行器的方法。如果需要支援即時關閉,執行器所執行的任務需要正確處理中斷。

ScheduledExecutorService介面

ScheduledExecutorService擴充套件ExecutorService介面並添加了schedule方法。呼叫schedule方法可以在指定的延時後執行一個Runnable或者Callable任務。ScheduledExecutorService介面還定義了按照指定時間間隔定期執行任務的scheduleAtFixedRate方法和scheduleWithFixedDelay方法。

執行緒池

在java.util.concurrent包中多數的執行器實現都使用了由工作執行緒組成的執行緒池,工作執行緒獨立於所它所執行的Runnable任務和Callable任務,並且常用來執行多個任務。 使用工作執行緒可以使建立執行緒的開銷最小化。在大規模併發應用中,建立大量的Thread物件會佔用佔用大量系統記憶體,分配和回收這些物件會產生很大的開銷。 一種最常見的執行緒池是固定大小的執行緒池。這種執行緒池始終有一定數量的執行緒在執行,如果一個執行緒由於某種原因終止運行了,執行緒池會自動建立一個新的執行緒來代替它。需要執行的任務通過一個內部佇列提交給執行緒,當沒有更多的工作執行緒可以用來執行任務時,佇列儲存額外的任務。 使用固定大小的執行緒池一個很重要的好處是可以實現優雅退化。例如一個Web伺服器,每一個HTTP請求都是由一個單獨的執行緒來處理的,如果為每一個HTTP都建立一個新執行緒,那麼當系統的開銷超出其能力時,會突然地對所有請求都停止響應。如果限制Web伺服器可以建立的執行緒數量,那麼它就不必立即處理所有收到的請求,而是在有能力處理請求時才處理。 建立一個使用執行緒池的執行器最簡單的方法是呼叫java.util.concurrent.ExecutorsnewFixedThreadPool方法。Executors類還提供了下列一下方法:

  • newCachedThreadPool方法建立了一個可擴充套件的執行緒池。適合用來啟動很多短任務的應用程式。
  • 還有一些建立ScheduledExecutorService執行器的方法。

Fork/Joint

譯者:Zach

fork/join框架是ExecutorService介面的一種具體實現,目的是為了幫助你更好地利用多處理器帶來的好處。它是為那些能夠被遞迴地拆解成子任務的工作型別量身設計的。其目的在於能夠使用所有可用的運算能力來提升你的應用的效能。 類似於ExecutorService介面的其他實現,fork/join框架會將任務分發給執行緒池中的工作執行緒。fork/join框架的獨特之處在與它使用工作竊取(work-stealing)演算法。完成自己的工作而處於空閒的工作執行緒能夠從其他仍然處於忙碌(busy)狀態的工作執行緒處竊取等待執行的任務。 fork/join框架的核心是ForkJoinPool類,它是對AbstractExecutorService類的擴充套件。ForkJoinPool實現了工作偷取演算法,並可以執行ForkJoinTask任務。

基本使用方法

使用fork/join框架的第一步是編寫執行一部分工作的程式碼。你的程式碼結構看起來應該與下面所示的虛擬碼類似:

if (當前這個任務工作量足夠小)
    直接完成這個任務
else
    將這個任務或這部分工作分解成兩個部分
    分別觸發(invoke)這兩個子任務的執行,並等待結果

你需要將這段程式碼包裹在一個ForkJoinTask的子類中。不過,通常情況下會使用一種更為具體的的型別,或者是RecursiveTask(會返回一個結果),或者是RecursiveAction。 當你的ForkJoinTask子類準備好了,建立一個代表所有需要完成工作的物件,然後將其作為引數傳遞給一個ForkJoinPool例項的invoke()方法即可。

要清晰,先模糊

想要了解fork/join框架的基本工作原理,接下來的這個例子會有所幫助。假設你想要模糊一張圖片。原始的source圖片由一個整數的陣列表示,每個整數表示一個畫素點的顏色數值。與source圖片相同,模糊之後的destination圖片也由一個整數陣列表示。 對圖片的模糊操作是通過對source陣列中的每一個畫素點進行處理完成的。處理的過程是這樣的:將每個畫素點的色值取出,與周圍畫素的色值(紅、黃、藍三個組成部分)放在一起取平均值,得到的結果被放入destination陣列。因為一張圖片會由一個很大的陣列來表示,這個流程會花費一段較長的時間。如果使用fork/join框架來實現這個模糊演算法,你就能夠藉助多處理器系統的並行處理能力。下面是上述演算法結合fork/join框架的一種簡單實現:

public class ForkBlur extends RecursiveAction {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;

// Processing window size; should be odd.
private int mBlurWidth = 15;

public ForkBlur(int[] src, int start, int length, int[] dst) {
    mSource = src;
    mStart = start;
    mLength = length;
    mDestination = dst;
}

protected void computeDirectly() {
    int sidePixels = (mBlurWidth - 1) / 2;
    for (int index = mStart; index < mStart + mLength; index++) {
        // Calculate average.
        float rt = 0, gt = 0, bt = 0;
        for (int mi = -sidePixels; mi <= sidePixels; mi++) {
            int mindex = Math.min(Math.max(mi + index, 0),
                                mSource.length - 1);
            int pixel = mSource[mindex];
            rt += (float)((pixel & 0x00ff0000) >> 16)
                  / mBlurWidth;
            gt += (float)((pixel & 0x0000ff00) >>  8)
                  / mBlurWidth;
            bt += (float)((pixel & 0x000000ff) >>  0)
                  / mBlurWidth;
        }

        // Reassemble destination pixel.
        int dpixel = (0xff000000     ) |
               (((int)rt) << 16) |
               (((int)gt) <<  8) |
               (((int)bt) <<  0);
        mDestination[index] = dpixel;
    }
}

接下來你需要實現父類中的compute()方法,它會直接執行模糊處理,或者將當前的工作拆分成兩個更小的任務。陣列的長度可以作為一個簡單的閥值來判斷任務是應該直接完成還是應該被拆分。

protected static int sThreshold = 100000;

protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }

    int split = mLength / 2;

    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
              new ForkBlur(mSource, mStart + split, mLength - split,
                           mDestination));
}

如果前面這個方法是在一個RecursiveAction的子類中,那麼設定任務在ForkJoinPool中執行就再直觀不過了。通常會包含以下一些步驟:

  1. 建立一個表示所有需要完成工作的任務。
    // source image pixels are in src
    // destination image pixels are in dst
    ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
  2. 建立將要用來執行任務的ForkJoinPool
    ForkJoinPool pool = new ForkJoinPool();
  3. 執行任務。
    pool.invoke(fb);

想要瀏覽完成的原始碼,請檢視ForkBlur,其中還包含一些建立destination圖片檔案的額外程式碼。

標準實現

除了能夠使用fork/join框架來實現能夠在多處理系統中被並行執行的定製化演算法(如前文中的ForkBlur.java例子),在Java SE中一些比較常用的功能點也已經使用fork/join框架來實現了。在Java SE 8中,java.util.Arrays類的一系列parallelSort()方法就使用了fork/join來實現。這些方法與sort()系列方法很類似,但是通過使用fork/join框架,藉助了併發來完成相關工作。在多處理器系統中,對大陣列的並行排序會比序列排序更快。這些方法究竟是如何運用fork/join框架並不在本教程的討論範圍內。想要了解更多的資訊,請參見Java API文件。 其他採用了fork/join框架的方法還包括java.util.streams包中的一些方法,此包是作為Java SE 8發行版中Project Lambda的一部分。想要了解更多資訊,請參見Lambda Expressions一節。

併發集合

譯者:李任

java.util.concurrent包囊括了Java集合框架的一些附加類。它們也最容易按照集合類所提供的介面來進行分類:

  • BlockingQueue定義了一個先進先出的資料結構,當你嘗試往滿佇列中新增元素,或者從空佇列中獲取元素時,將會阻塞或者超時。
  • ConcurrentMapjava.util.Map的子介面,定義了一些有用的原子操作。移除或者替換鍵值對的操作只有當key存在時才能進行,而新增操作只有當key不存在時。使這些操作原子化,可以避免同步。ConcurrentMap的標準實現是ConcurrentHashMap,它是HashMap的併發模式。

所有這些集合,通過 在集合裡新增物件和訪問或移除物件的操作之間,定義一個happens-before的關係,來幫助程式設計師避免記憶體一致性錯誤

原子變數

譯者:李任

java.util.concurrent.atomic包定義了對單一變數進行原子操作的類。所有的類都提供了get和set方法,可以使用它們像讀寫volatile變數一樣讀寫原子類。就是說,同一變數上的一個set操作對於任意後續的get操作存在happens-before關係。原子的compareAndSet方法也有記憶體一致性特點,就像應用到整型原子變數中的簡單原子演算法。 為了看看這個包如何使用,讓我們返回到最初用於演示執行緒干擾的Counter類:

class Counter {
    private int c = 0;
    public void increment() {
        c++;
    }

    public void decrement() {
        c--;
    }

    public int value() {
        return c;
    }
}
class SynchronizedCounter {
private int c = 0;
public synchronized void increment() {
c++;
}
public synchronized void decrement() {
c--;
}
public synchronized int value() {
return c;
}
}

對於這個簡單的類,同步是一種可接受的解決方案。但是對於更復雜的類,我們可能想要避免不必要同步所帶來的活躍度影響。將int替換為AtomicInteger允許我們在不進行同步的情況下阻止執行緒干擾,如AtomicCounter

import java.util.concurrent.atomic.AtomicInteger;
class AtomicCounter {
private AtomicInteger c = new AtomicInteger(0);
public void increment() {
c.incrementAndGet();
}

public void decrement() {
c.decrementAndGet();
}

public int value() {
return c.get();
}

併發隨機數

譯者:李任

在JDK7中,java.util.concurrent包含了一個相當便利的類,ThreadLocalRandom,當應用程式期望在多個執行緒或ForkJoinTasks中使用隨機數時。

對於併發訪問,使用TheadLocalRandom代替Math.random()可以減少競爭,從而獲得更好的效能。

你只需呼叫ThreadLocalRandom.current(), 然後呼叫它的其中一個方法去獲取一個隨機數即可。下面是一個例子:

   int r = ThreadLocalRandom.current().nextInt(4,77);