1. 程式人生 > >高併發資料結構Disruptor解析(6)

高併發資料結構Disruptor解析(6)

SequenceBarrier

SequenceBarrier是消費者與Ringbuffer之間建立消費關係的橋樑,同時也是消費者與消費者之間消費依賴的抽象。
這裡寫圖片描述

SequenceBarrier只有一個實現類,就是ProcessingSequenceBarrier。ProcessingSequenceBarrier由生產者Sequencer,消費定位cursorSequence,等待策略waitStrategy還有一組依賴sequence:dependentSequence組成:

public ProcessingSequenceBarrier(
        final Sequencer sequencer,
        final
WaitStrategy waitStrategy, final Sequence cursorSequence, final Sequence[] dependentSequences) { this.sequencer = sequencer; this.waitStrategy = waitStrategy; this.cursorSequence = cursorSequence; if (0 == dependentSequences.length) { dependentSequence = cursorSequence; } else
{ dependentSequence = new FixedSequenceGroup(dependentSequences); } }

首先,為了實現消費依賴,SequenceBarrier肯定有一個獲取可以消費的sequence方法,就是

long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;

實現為:

@Override
    public long waitFor(final long sequence)
        throws
AlertException, InterruptedException, TimeoutException { //檢查是否alerted checkAlert(); //通過等待策略獲取下一個可消費的sequence,這個sequence通過之前的講解可以知道,需要大於cursorSequence和dependentSequence,我們可以通過dependentSequence實現先後消費 long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); //等待可能被中斷,所以檢查下availableSequence是否小於sequence if (availableSequence < sequence) { return availableSequence; } //如果不小於,返回所有sequence(可能多生產者)和availableSequence中最大的 return sequencer.getHighestPublishedSequence(sequence, availableSequence); }

其他方法實現很簡單,功能上分別有:
1. 獲取當前cursorSequence(並沒有什麼用,就是為了監控)
2. 負責中斷和恢復的alert標記

    @Override
    public long getCursor()
    {
        return dependentSequence.get();
    }

    @Override
    public boolean isAlerted()
    {
        return alerted;
    }

    @Override
    public void alert()
    {
        alerted = true;
        waitStrategy.signalAllWhenBlocking();
    }

    @Override
    public void clearAlert()
    {
        alerted = false;
    }

    @Override
    public void checkAlert() throws AlertException
    {
        if (alerted)
        {
            throw AlertException.INSTANCE;
        }
    }

構造SequenceBarrier在框架中只有一個入口,就是AbstractSequencer的:

public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
    {
        return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
    }

SequenceProcessor

通過SequenceBarrier,我們可以實現消費之間的依賴關係,但是,消費方式(比如廣播,群組消費等等),需要通過SequenceProcessor的實現類實現:
這裡寫圖片描述
通過類依賴關係我們發現,EventProcessor都是拓展了Runnable介面,也就是我們可以把它們當做執行緒處理。

1. BatchEventProcessor:

它的構造方法:

/**
     * 構造一個消費者之間非互斥消費的消費者
     *
     * @param dataProvider    對應的RingBuffer
     * @param sequenceBarrier 依賴關係,通過構造不同的sequenceBarrier用互相的dependentsequence,我們可以構造出先後消費關係
     * @param eventHandler    使用者實現的處理消費的event的業務消費者.
     */
    public BatchEventProcessor(
        final DataProvider<T> dataProvider,
        final SequenceBarrier sequenceBarrier,
        final EventHandler<? super T> eventHandler)
    {
        this.dataProvider = dataProvider;
        this.sequenceBarrier = sequenceBarrier;
        this.eventHandler = eventHandler;

        if (eventHandler instanceof SequenceReportingEventHandler)
        {
            ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
        }

        timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
    }

執行緒為一個死迴圈:

 @Override
    public void run()
    {
        //檢查狀態
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        //清理
        sequenceBarrier.clearAlert();
        //如果使用者實現的EventHandler繼承了LifecycleAware,則執行其onStart方法
        notifyStart();

        T event = null;
        //sequence初始值為-1,設計上當前值是已經消費過的
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {
                    //獲取當前可以消費的最大sequence
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                    while (nextSequence <= availableSequence)
                    {
                        //獲取並處理
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }
                    //設定當前sequence,注意,出現異常需要特殊處理,防止重複消費
                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    //wait超時異常
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    //中斷異常
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    //如果出現異常則設定為nextSequence
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            //如果使用者實現的EventHandler繼承了LifecycleAware,則執行其onShutdown方法
            notifyShutdown();
            running.set(false);
        }
    }

可以看出:
1. BatchEventProcessor可以處理超時,可以處理中斷,可以通過使用者實現的異常處理類處理異常,同時,發生異常之後再次啟動,不會漏消費,也不會重複消費。
2. 不同的BatchEventProcessor之間通過SequenceBarrier進行依賴消費。原理如下圖所示:
這裡寫圖片描述
假設我們有三個消費者BatchEventProcessor1,BatchEventProcessor2,BatchEventProcessor3. 1需要先於2和3消費,那麼構建BatchEventProcessor和SequenceBarrier時,我們需要讓BatchEventProcessor2和BatchEventProcessor3的SequenceBarrier的dependentSequence中加入SequenceBarrier1的sequence。
其實這裡2和3共用一個SequenceBarrier就行。

2. WorkProcessor

另一種消費者是WorkProcessor。利用它,可以實現互斥消費,同樣的利用SequenceBarrier可以實現消費順序

public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        boolean processedSequence = true;
        long cachedAvailableSequence = Long.MIN_VALUE;
        long nextSequence = sequence.get();
        T event = null;
        while (true)
        {
            try
            {
                if (processedSequence)
                {
                    processedSequence = false;
                    //獲取下一個可以消費的Sequence
                    do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    //多個WorkProcessor之間,如果共享一個workSequence,那麼,可以實現互斥消費,因為只有一個執行緒可以CAS更新成功
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }

                if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    processedSequence = true;
                }
                else
                {
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (!running.get())
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                // handle, mark as processed, unless the exception handler threw an exception
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
            }
        }

        notifyShutdown();

        running.set(false);
    }

3. WorkerPool

多個WorkerProcessor可以組成一個WorkerPool:

public WorkerPool(
        final RingBuffer<T> ringBuffer,
        final SequenceBarrier sequenceBarrier,
        final ExceptionHandler<? super T> exceptionHandler,
        final WorkHandler<? super T>... workHandlers)
    {
        this.ringBuffer = ringBuffer;
        final int numWorkers = workHandlers.length;
        workProcessors = new WorkProcessor[numWorkers];

        for (int i = 0; i < numWorkers; i++)
        {
            workProcessors[i] = new WorkProcessor<T>(
                ringBuffer,
                sequenceBarrier,
                workHandlers[i],
                exceptionHandler,
                workSequence);
        }
    }

裡面的 workHandlers[i]共享同一個workSequence,所以,同一個WorkerPool內,是互斥消費。

相關推薦

併發資料結構Disruptor解析6

SequenceBarrier SequenceBarrier是消費者與Ringbuffer之間建立消費關係的橋樑,同時也是消費者與消費者之間消費依賴的抽象。 SequenceBarrier只有一個實現類,就是ProcessingSequenceBarr

併發資料結構Disruptor解析2

Sequence(續) 之前說了Sequence通過給他的核心值value新增前置無用的padding long還有後置無用的padding long來避免對於value操作的false sharing的發生。那麼對於這個value的操作是怎麼操作的呢? 這

野生前端的資料結構基礎練習6——集合

網上的相關教程非常多,基礎知識自行搜尋即可。 習題主要選自Orelly出版的《資料結構與演算法javascript描述》一書。 參考程式碼可見:https://github.com/dashnowords/blogs/tree/master/Structure/Set [TOC] 集

python資料結構與演算法6

Python中的順序表Python中的list和tuple兩種型別採⽤了順序表的實現技術,具有前⾯討論的順 序表的所有性質。tuple是不可變型別,即不變的順序表,因此不⽀持改變其內部狀態的任何操 作,⽽其他⽅⾯,則與list的性質類似。list的基本實現技術Python標準型別list就是⼀種元素個數可變的

小白的資料結構程式碼實戰6----共享棧

共享棧=棧1+棧2 棧1的棧底為共享棧的首,棧2的棧底為共享棧的尾 共享棧滿:S->top1+1==S->top2 //Author:張佳琪 #include <stdio.h> #include <stdlib.h> #define

野生前端的資料結構基礎練習5——雜湊

網上的相關教程非常多,基礎知識自行搜尋即可。 習題主要選自Orelly出版的《資料結構與演算法javascript描述》一書。 參考程式碼可見:https://github.com/dashnowords/blogs/tree/master/Structure/Hash 雜湊的基本知識

聊聊併發系統之降級特技

在開發高併發系統時有三把利器用來保護系統:快取、降級和限流。之前已經有一些文章介紹過快取和限流了。本文將詳細聊聊降級。當訪問量劇增、服務出現問題(如響應時間慢或不響應)或非核心服務影響到核心流程的效能時,仍然需要保證服務還是可用的,即使是有損服務。系統可以根據一些關鍵資料進行自動降級,也

資料結構與演算法--遞迴

遞迴條件: 1.遞迴條件:每次調自己,然後記錄當時的狀態 2.基準條件:執行到什麼時候結束遞迴,不然遞迴就會無休止的呼叫自己, 遞迴的資料結構:棧(先進先出)和彈夾原理一樣,每一次呼叫自己都記錄了當時的一種狀態,然後把這種狀態的結果返回。 棧相對應的資料結構:佇列(先進後出

資料結構-----------線性表下篇之雙向連結串列

//----------雙向連結串列的儲存結構------------ typedef struct DuLNode { ElemType date; struct DoLNode *prior; struct DoLNode *next; } DoLNode,*DoLinkList;

資料結構---------------線性表下篇之單鏈表

單鏈表 特點:儲存空間不連續 結點(資料元素組成):資料域(儲存資料)和指標域(指標)A1 若用p來指向  則資料域為p->date   指標域為p->next 鏈式儲存結構: 單鏈表、迴圈連結串列、雙向連結串列根據連

資料結構與演算法2—— 棧java

1 棧的實現 1.1 簡單陣列實現棧 package mystack; public class ArrayStack { private int top; //當前棧頂元素的下標 private int[] array; public ArraySt

資料結構上機題週三

週三,19號的上機題。 題目,如圖: 不多廢話,直接原始碼: #include<iostream> #include<stdlib.h> int a[100]; using namespace std; class node { public: node

hashmap資料結構詳解之HashMap、HashTable、ConcurrentHashMap 的區別

【hashmap 與 hashtable】   hashmap資料結構詳解(一)之基礎知識奠基 hashmap資料結構詳解(二)之走進JDK原始碼 hashmap資料結構詳解(三)之hashcode例項及大小是2的冪次方解釋 hashmap資料結構詳解(四)之has

野生前端的資料結構基礎練習7——二叉樹

網上的相關教程非常多,基礎知識自行搜尋即可。 習題主要選自Orelly出版的《資料結構與演算法javascript描述》一書。 參考程式碼可見:https://github.com/dashnowords/blogs/tree/master/Structure/btree 一.二叉樹的

Java資料結構和演算法:簡介

  本系列部落格我們將學習資料結構和演算法,為什麼要學習資料結構和演算法,這裡我舉個簡單的例子。   程式設計好比是一輛汽車,而資料結構和演算法是汽車內部的變速箱。一個開車的人不懂變速箱的原理也是能開車的,同理一個不懂資料結構和演算法的人也能程式設計。但是如果一個開車的人懂變速箱的原理,比如降低速

3D引擎資料結構與glTF1:簡介

不是有句老話講“程式 = 演算法 + 資料結構”嘛,對於3D引擎來說也是這樣。學習和掌握3D引擎中的核心資料有哪些,它們直接的關係是怎樣等等問題,對於理解3D引擎的架構和圖形渲染關係都有著非常大的幫助。然而,現在的商業3D引擎非常複雜,想要通過學習其原始碼嘛非常困難,那麼你就這樣放棄了嗎

3D引擎資料結構與glTF2: Scene Graph

圖形學中的 Scene Graph Scene Graph 中文常翻譯為“場景圖”,是一種常用的場景物件組織方式。我們把場景中的物件,按照一定的規則(通常是空間關係)組織成一棵樹,樹上的每個節點代表場景中的一個物件。每個節點都可以有零到多個子節點,但只有一個父節點。 每個節點都包含一

Java併發程式設計之synchronized關鍵字

上一篇文章講了synchronized的部分關鍵要點,詳見:Java高併發程式設計之synchronized關鍵字(一) 本篇文章接著講synchronized的其他關鍵點。 在使用synchronized關鍵字的時候,不要以字串常量作為鎖定物件。看下面的例子: public class

Java併發程式設計之synchronized關鍵字

首先看一段簡單的程式碼: public class T001 { private int count = 0; private Object o = new Object(); public void m() { //任何執行緒要執行下面這段程式碼

JavaEE-SSM:009 Mybatis的配置檔案解析6

檔案型別轉換器(不常用)   假設資料庫有blob格式的欄位儲存需求: 對應著POJO的byte陣列: ResultMap中有對應的typeHandler配置:   當然,我們可以在POJO中使用InputStream替代byte陣列,但