1. 程式人生 > >《Java高併發程式設計》學習 --5.4 高效能的生產者-消費者:無鎖的實現

《Java高併發程式設計》學習 --5.4 高效能的生產者-消費者:無鎖的實現

BlockingQueue實現生產者-消費者是一個不錯的選擇,它很自然地實現了作為生產者和消費者的記憶體緩衝區。但是,BlockingQueue並不是一個高效能的實現,它完全使用鎖和阻塞等待來實現執行緒間的同步。在高併發場合,它的效能並不是特別優越。 就像我們之前提過的ConcurrentLinkedQueue是一個高效能的佇列,但是BlockingQueue只是為了方便資料共享。而ConcurrentLinkedQueue的祕訣就是大量使用了無鎖的CAS操作。同理,如果我們使用了CAS來實現生產者-消費者模式,也同樣可以獲得可觀的效能提升。 1)無鎖的快取框架:Disruptor Disruptor框架是由於LMAX公司開發的一款高效的無鎖記憶體佇列,它使用無鎖的方式實現了一個環形佇列,非常適合生產者-消費者模式。在Disruptor中,使用了環形佇列來代替普通的線性佇列,這個環形佇列內部實現為一個普通的陣列。對於一般的佇列,勢必要提供佇列頭部head和尾部tail兩個指標,用於出隊和入隊,這樣無疑就增加了執行緒協作的複雜度。但如果佇列的環形的,則只需要提供一個當前佇列的位置cursor,利用這個cursor既可以出隊也可以入隊。由於是環形佇列的緣故,佇列的總大小必須事先指定,不能動態擴充套件。為了能夠快速從一個序列sequence對應陣列的實際位置(每次有元素入隊,序列就加1),Disruptor要求我們必須將陣列的大小設定為2的整數次方。這樣通過sequence&(queueSize-1)就能立即定位到實際的元素位置index。這個要比取餘(%)操作快得多。 如圖所示,顯示了RingBuffer的結構,生產者向緩衝區中寫入資料,而消費者從中讀取資料,生產者寫入資料使用CAS操作,消費者讀取資料時,為了防止多個消費者處理同一個資料,也使用CAS操作進行保護。 這種固定大小的環形佇列的另一個好處就是可以做到完全記憶體複用。在系統執行過程中,不會有新的空間需要分配或者老的空間需要回收。因此,可以大大減少系統分配空間以及回收空間的額外開銷。
2)用Disruptor實現生產者-消費者案例 首先,我們需要一個代表資料的PCData:
public class PCData {
	private long value;
	public void set(long value) {
		this.value = value;
	}
	public long get() {
		return value;
	}
}
消費者實現為WorkHandler介面,它來著Disruptor框架:
public class Consumer implements WorkHandler<PCData> {
	@Override
	public void onEvent(PCData event) throws Exception {
		System.out.println(Thread.currentThread().getId() + ":Event: --" +
				event.get() * event.get() + "--");
	}
}
消費者的作用是讀取資料進行處理。這裡,資料的讀取已經由Disruptor進行封裝,onEvent()方法為框架的回撥方法。因此,這個只需要簡單地進行資料處理即可。 還需要一個產生PCData的工廠類。它會在Disruptor系統初始化時,構造所有的緩衝區中的物件例項:
public class PCDataFactory implements EventFactory<PCData>{
	@Override
	public PCData newInstance() {
		return new PCData();
	}
}
接下來,看一下生產者:
public class Producer {
    private final RingBuffer<PCData> ringBuffer;
    public Producer(RingBuffer<PCData> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }
    public void pushData(ByteBuffer byteBuffer){
        long sequence = ringBuffer.next();
        try {
            PCData event = ringBuffer.get(sequence);
            event.set(byteBuffer.getLong(0));
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}
生產者需要一個RingBuffer的引用,也就是環形緩衝區。它有一個重要的方法pushData()將產生的資料推入緩衝區。方法pushData()接收一個ByteBuffer物件。在ByteBuffer中可以用來包裝任何資料型別。pushData()的功能就是將傳入的ByteBuffer中的資料提取出來,並裝載到環形緩衝區中。 上述第12行程式碼,通過next()方法得到下一個可用的序列號。通過序列號,取得下一個空閒可用的PCData,並且將PCData的資料設為期望值,這個值最終會傳遞給消費者。最後,在第21行,進行資料釋出。只有釋出後的資料才會真正被消費者看見。 至此,我們的生產者、消費者和資料都已經準備就緒。只差一個統籌規劃的主函式將所有內容整合起來:
public static void main(String[] args) throws InterruptedException {
        Executor executor = Executors.newCachedThreadPool();
        //PCDataFactory factory = new PCDataFactory();
        EventFactory<PCData> factory = new EventFactory<PCData>() {
			@Override
			public PCData newInstance() {
				return new PCData();
			}
		};
        //設定緩衝區大小,一定要是2的整數次冪
        int bufferSize = 1024;
        WaitStrategy startegy =  new BlockingWaitStrategy();
        //建立disruptor,它封裝了整個Disruptor的使用,提供了一些便捷的API.

        Disruptor<PCData> disruptor = new Disruptor<PCData>(factory, bufferSize, executor, ProducerType.MULTI, startegy);
        
        //設定消費者,系統會將每一個消費者例項對映到一個系統中,也就是提供4個消費者執行緒.
        disruptor.handleEventsWithWorkerPool(new Consumer(),
                new Consumer(),
                new Consumer(),
                new Consumer());
        //啟動並初始化disruptor系統.
        disruptor.start();
        RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
        //建立生產者
        Producer productor = new Producer(ringBuffer);
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        //生產者不斷向緩衝區中存入資料.
        for (long l=0;true;l++){
            byteBuffer.putLong(0,l);
            productor.pushData(byteBuffer);
            Thread.sleep(new Random().nextInt(500));
            System.out.println("add data "+l);
        }
    }
3)提高消費者的響應時間:選擇合適的策略 Disruptor為我們提供了幾個策略,這些策略由WaitStrategy介面進行封裝。 1. BlockingWaitStrategy:預設策略。和BlockingQueue是非常類似的,他們都使用了Lock(鎖)和Condition(條件)進行資料監控和執行緒喚醒。因為涉及到執行緒的切換,BlockingWaitStrategy策略是最省CPU的,但在高併發下效能表現是最差的一種等待策略。 2. SleepingWaitStrategy:這個策略也是對CPU非常保守的。它會在迴圈中不斷等待資料。它會先進行自旋等待,如果不成功,則使用Thread.yield()讓出CPU,並最終使用LockSupport.parkNanos(1)進行執行緒休眠,以確保不佔用太多的CPU資料。因此,這個策略對於資料處理可能產生比較高的平均延時。適用於對延時要求不是特別高的場合,好處是他對生產者執行緒的影響最小。典型的場景是非同步日誌。 3. YieldWaitStrategy:用於低延時場合。消費者執行緒會不斷迴圈監控緩衝區變化,在迴圈內部,它會使用Thread.yield()讓出CPU給別的執行緒執行時間。如果需要高效能系統,並且對延遲有較高要求,則可以考慮這種策略。這種策略相當於消費者執行緒變成了一個內部執行Thread.yield()的死迴圈, 因此最好有多於消費者執行緒的邏輯CPU(“雙核四執行緒”中的四執行緒),否則整個應用會受到影響。 4. BusySpinWaitStrategy:瘋狂等待策略。它就是一個死迴圈,消費者執行緒會盡最大努力監控緩衝區的變化。它會吃掉CPU所有資源。所以只在非常苛刻的場合使用它。因為這個策略等同於開一個死迴圈監控。因此,物理CPU數量必須大於消費者執行緒數。因為如果是邏輯核,那麼另外一個邏輯核必然會受到這種超密集計算的影響而不能正常工作。 4)CPU Cache的優化:解決偽共存問題 我們知道,為了提高CPU的速度,CPU有一個快取記憶體Cache。在快取記憶體中,讀寫資料的最小單位是快取行(Cache Line),它是主記憶體(memory)複製到 快取(Cache)的最小單位,一般為32~128byte(位元組)。 假如兩個變數存放在同一個快取行中,在多執行緒訪問中,可能互相影響彼此的效能。如圖,執行在CPU1上的執行緒更新了X,那麼CPU2傷的快取行就會失效,同一行的Y即使沒有修改也會變成無效,導致Cache無法命中。接著,如果在CPU2上的執行緒更新了Y,則導致CPU1上的快取行又失效(此時,同一行的X)。這無疑是一個潛在的效能殺手,如果CPU經常不能命中快取,那麼系統的吞吐量會急劇下降。 為了使這種情況不發生,一種可行的做法就是在X變數前後空間都佔據一定的位置(暫叫padding,用來填充Cache Line)。這樣,當記憶體被讀入快取中時,這個快取行中,只有X一個變數實際是有效的,因此就不會發生多個執行緒同時修改快取行中不同變數而導致變數全體失效的情況。
public class FalseSharing implements Runnable {
	public final static int NUM_THREADS = 4;
	public final static long ITERATIONS = 500L * 1000L * 1000L;
	private final int arrayIndex;
	private static VolatileLong[] longs = new VolatileLong[NUM_THREADS];
	
	static {
		for(int i=0; i<longs.length; i++) {
			longs[i] = new VolatileLong();
		}
	}
	
	public FalseSharing(final int arrayIndex) {
		this.arrayIndex = arrayIndex;
	}
	
	public static void main(String[] args) throws Exception {
		final long start = System.currentTimeMillis();
		runTest();
		System.out.println("duration = " + (System.currentTimeMillis() - start));
	}
	
	private static void runTest() throws InterruptedException {
		Thread[] threads = new Thread[NUM_THREADS];
		
		for(int i=0; i<threads.length; i++) {
			threads[i] = new Thread(new FalseSharing(i));
		}
		
		for(Thread t : threads) {
			t.start();
		}
		
		for(Thread t : threads) {
			t.join();
		}
	}
	
	@Override
	public void run() {
		long i = ITERATIONS + 1;
		while(0 != --i) {
			longs[arrayIndex].value = i;
		}
	}
	
	public final static class VolatileLong {
		public volatile long value = 0L;
		public long p1, p2, p3, p4, p5, p6, p7;
	}
}
在VolatileLong中,準備了7個long型變數用來填充快取。實際上,只有VolatileLong.value是會被使用的。而那些p1、p2等僅僅用於將陣列中第一個VolatileLong.value是會被使用的。而那些p1、p2等僅僅用於將陣列第一個VolatileLong.value和第二個VolatileLong.value分開,防止它們進入同一個快取行。 Disruptor框架充分考慮了這個問題,它的核心元件Sequence會被非常頻繁的訪問(每次入隊,它都會被加1),其基本結構如下:
class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
    protected volatile long value;
}
class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding  {
//省略具體實現
}
雖然在Sequence中,主要使用的只有value。但是,通過LhsPadding和RhsPadding,在這個value的前後安置了一些佔位空間,使得value可以無衝突的存在於快取中。 此外,對於Disruptor的環形緩衝區RingBuffer,它內部的陣列是通過以下語句構造的: this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; 實際產生的陣列大小是緩衝區實際大小再加上兩倍的BUFFER_PAD。這就相當於在這個陣列的頭部和尾部兩段各增加了BUFFER_PAD個填充,使得整個陣列被載入Cache時不會受到其他變數的影響而失效。

注:本篇部落格內容摘自《Java高併發程式設計》