1. 程式人生 > >使用Disruptor完成多執行緒下併發、等待、先後等操作

使用Disruptor完成多執行緒下併發、等待、先後等操作

Java完成多執行緒間的等待功能:

場景1:一個執行緒等待其他多個執行緒都完成後,再進行下一步操作(如裁判員計分功能,需要等待所有運動員都跑完後,才去統計分數。裁判員和每個運動員都是一個執行緒)。

場景2:多個執行緒都等待至某個狀態後,再同時執行(模擬併發操作,啟動100個執行緒 ,先啟動完的需要等待其他未啟動的,然後100個全部啟動完畢後,再一起做某個操作)。

以上兩個場景都較為常見,Java已經為上面的場景1和2分別提供了CountDownLatch和CyclicBarrier兩個實現類來完成,參考另一篇文章:https://blog.csdn.net/tianyaleixiaowu/article/details/75234600

而對於更復雜的場景,如


譬如希望1執行完後才執行2,3執行完後才執行4,1和3並行執行,2和4都執行完後才執行last。

還有其他的更奇怪的執行順序等等。當然這些也可以通過組合多個CountDownLatch或者CyclicBarrier、甚至使用wait、Lock等組合來實現。不可避免的是,都需要使用大量的鎖,直接導致效能的急劇下降和多執行緒死鎖等問題發生。那麼有沒有高效能的無鎖的方式來完成這種複雜的需求實現呢?

那就是Disruptor!

Disruptor可以非常簡單的完成這種複雜的多執行緒併發、等待、先後執行等。

至於Disruptor是什麼就不說了,直接來看使用:

直接新增依賴包,別的什麼都不需要。

<dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.1</version>
        </dependency>

我只帖關鍵程式碼,別的上傳到壓縮包了。https://download.csdn.net/download/tianyaleixiaowu/10322342

package b;

import a.FirstEventHandler;
import a.LongEvent;
import a.LongEventFactory;
import a.LongEventProducer;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * @author wuweifeng wrote on 2018/3/29.
 */
public class Simple {
    public static void main(String[] args) {
        ThreadFactory producerFactory = Executors.defaultThreadFactory();
        LongEventFactory eventFactory = new LongEventFactory();
        int bufferSize = 8;

        Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, bufferSize, producerFactory,
                ProducerType.SINGLE, new BlockingWaitStrategy());

        FirstEventHandler firstEventHandler = new FirstEventHandler();
        SecondEventHandler secondHandler = new SecondEventHandler();
        ThirdEventHandler thirdEventHandler = new ThirdEventHandler();
        FourthEventHandler fourthEventHandler = new FourthEventHandler();
        LastEventHandler lastEventHandler = new LastEventHandler();

        //1,2,last順序執行
        //disruptor.handleEventsWith(new LongEventHandler()).handleEventsWith(new SecondEventHandler())
        //        .handleEventsWith(new LastEventHandler());

        //也是1,2,last順序執行
        //disruptor.handleEventsWith(firstEventHandler);
        //disruptor.after(firstEventHandler).handleEventsWith(secondHandler).then(lastEventHandler);

        //1,2併發執行,之後才是last
        //disruptor.handleEventsWith(firstEventHandler, secondHandler);
        //disruptor.after(firstEventHandler, secondHandler).handleEventsWith(lastEventHandler);

        //1後2,3後4,1和3併發,2和4都結束後last
        disruptor.handleEventsWith(firstEventHandler, thirdEventHandler);
        disruptor.after(firstEventHandler).handleEventsWith(secondHandler);
        disruptor.after(thirdEventHandler).handleEventsWith(fourthEventHandler);
        disruptor.after(secondHandler, fourthEventHandler).handleEventsWith(lastEventHandler);


        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long i = 0; i < 10L; i++) {
            bb.putLong(0, i);
            longEventProducer.onData(bb);
        }

        disruptor.shutdown();
    }
}

主要就是這個類。我註釋掉的部分分別為順序執行、和12併發然後執行last。

上面那個圖對應的程式碼主要就是after的使用。

執行結果 :


可以看到,由於buffer為8,所以在一個週期內,最大value=7.順序就是1-3,2-4,1、3是併發的。對於同一個Event,2和4執行完後才執行last。多執行幾次看看,就能看明白。

這裡用的producer只打印了10個,可以調大,結果就會隨機性更好。