1. 程式人生 > >disruptor架構三 使用場景 使用WorkHandler和BatchEventProcessor輔助創建消費者

disruptor架構三 使用場景 使用WorkHandler和BatchEventProcessor輔助創建消費者

提高效率 count() dga tin random 可用 numbers nth producer

在helloWorld的實例中,我們創建Disruptor實例,然後調用getRingBuffer方法去獲取RingBuffer,其實在很多時候,我們可以直接使用RingBuffer,以及其他的API操作。我們一起熟悉下示例:

使用EventProcessor消息處理器。

BatchEventProcessor 多線程並發執行,不同線程執行不同是不同的event

EventProcessor有3個實現類

BatchEventProcessor 多線程並發執行,不同線程執行不同是不同的event

使用BatchEventProcessor 消費者需要實現EventHandler接口

我們來看下面的代碼:

需要處理的實體類

package bhz.generate1;

import java.util.concurrent.atomic.AtomicInteger;

public class Trade {  
    
    private String id;//ID  
    private String name;
    private double price;//金額  
    private AtomicInteger count = new AtomicInteger(0);
    
    public String getId() {
        return id;
    }
    
public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; }
public AtomicInteger getCount() { return count; } public void setCount(AtomicInteger count) { this.count = count; } }

消費者類:

package bhz.generate1;

import java.util.UUID;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  
      
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
        //榪欓噷鍋氬叿浣撶殑娑堣垂閫昏緫  
        event.setId(UUID.randomUUID().toString());//綆?崟鐢熸垚涓婭D  
        System.out.println(event.getId());  
    }  
}  

消費者除了實現EventHandler接口之外,還實現了WorkHandler接口,為啥了,因為後面我們要使用了WokerPool來發送該實體類,所以這裏就讓該實體類實現兩個接口

我們來看看main方法

package bhz.generate1;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;

public class Main1 {  
   
    public static void main(String[] args) throws Exception {  
        int BUFFER_SIZE=1024;  
        int THREAD_NUMBERS=4;  
        /* 
         * createSingleProducer創建一個單生產者的RingBuffer, 
         * 第一個參數叫EventFactory,從名字上理解就是"事件工廠",其實它的職責就是產生數據填充RingBuffer的區塊。 
         * 第二個參數是RingBuffer的大小,它必須是2的指數倍 目的是為了將求模運算轉為&運算提高效率 
         * 第三個參數是RingBuffer的生產都在沒有可用區塊的時候(可能是消費者(或者說是事件處理器) 太慢了)的等待策略 
         */  
        final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, BUFFER_SIZE, new YieldingWaitStrategy());  
        
        //創建線程池  
        ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
        
        //創建SequenceBarrier  ,用於平衡生產者和消費者速率,用障礙來處理
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
          
        //創建消息處理器  
        BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(  
                ringBuffer, sequenceBarrier, new TradeHandler());  
          
        //這一步的目的就是把消費者的位置信息引用註入到生產者    如果只有一個消費者的情況可以省略 ,將生產者和消費者關聯起來
        ringBuffer.addGatingSequences(transProcessor.getSequence());  
          
        //把消息處理器提交到線程池  
        executors.submit(transProcessor);  
        
        //如果存在多個消費者 那重復執行上面3行代碼 把TradeHandler換成其它消費者類  
          
        Future<?> future= executors.submit(new Callable<Void>() {  
            @Override  
            public Void call() throws Exception {  
                long seq;  
                for(int i=0;i<10;i++){  
                    seq = ringBuffer.next();//占個坑 --ringBuffer一個可用區塊  
                    ringBuffer.get(seq).setPrice(Math.random()*9999);//給這個區塊放入 數據 
                    ringBuffer.publish(seq);//發布這個區塊的數據使handler(consumer)可見  
                }  
                return null;  
            }  
        }); 
        
        future.get();//等待生產者結束  
        Thread.sleep(1000);//等上1秒,等消費都處理完成  
        transProcessor.halt();//通知事件(或者說消息)處理器 可以結束了(並不是馬上結束!!!)  
        executors.shutdown();//終止線程  
    }  
}  

//創建消息處理器
BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(
ringBuffer, sequenceBarrier, new TradeHandler());

它主要有三個成員RingBuffer、SequenceBarrier和EventHandler

上面對應對應的是一個生產者,一個消費者的情況

我們來看看程序運行的效果

1a7226d0-e212-4183-b109-cab5e5c41545
3e1da0fa-686d-4361-bea2-600c2c5d26b9
bf31874a-3405-4008-80e7-03caf9f16ae4
080a05ef-0052-4271-a2ee-ee50038a5a77
71e1a5a8-24ba-4175-b53a-f8b71e99464a
99670de9-6aa5-48fa-8fa2-a490250e25ba
7a44b351-0caa-4ac3-b344-97cf72c9dd5f
10a7fe52-eef1-453c-80a2-126fd8bac948
c78f2ed5-3c3e-4481-9062-dd96ff7ba051
49f51ad6-2ee5-4c36-a0d0-96bc0e17fba9

如果是一個生產者,對應多個消費者,那麽

//創建消息處理器
BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(
ringBuffer, sequenceBarrier, new TradeHandler());

//這一步的目的就是把消費者的位置信息引用註入到生產者 如果只有一個消費者的情況可以省略 ,將生產者和消費者關聯起來
ringBuffer.addGatingSequences(transProcessor.getSequence());

//把消息處理器提交到線程池
executors.submit(transProcessor);

//如果存在多個消費者 那重復執行上面3行代碼 把TradeHandler換成其它消費者類

所以:BatchEventProcessor 多線程並發執行,不同線程執行不同是不同的event

2、使用WorkerPool消息處理器。

消費者需要實現:WorkHandler接口

我們來看看主程序的代碼:

package bhz.generate1;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;

public class Main2 {  
    public static void main(String[] args) throws InterruptedException {  
        int BUFFER_SIZE=1024;  
        int THREAD_NUMBERS=4;  
        
        EventFactory<Trade> eventFactory = new EventFactory<Trade>() {  
            public Trade newInstance() {  
                return new Trade();  
            }  
        };  
        
        RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);  
          
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
          
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS);  
          
        WorkHandler<Trade> handler = new TradeHandler();  

        WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler);  
          
        workerPool.start(executor);  
          
        //下面這個生產8個數據
        for(int i=0;i<8;i++){  
            long seq=ringBuffer.next();  
            ringBuffer.get(seq).setPrice(Math.random()*9999);  
            ringBuffer.publish(seq);  
        }           
        Thread.sleep(1000);  
        workerPool.halt();  
        executor.shutdown();  
    }  
}  

程序運行的效果:

4bbffa55-b19f-44a4-bfa7-100affc63323
121a0ee8-7e8e-4637-b659-ca78ae9aaa20
0fc1cdb8-8186-44fc-a3a5-4bf5fea66086
afb70a80-e1ce-46f9-bfc1-4e0d81be96b4
0e0b3690-830b-4d38-b78b-e0930b499515
f5b4e23f-10c8-45ea-b064-32ae40f54912
4a172494-480a-4509-99d0-d416b5e2c5c9
902c0669-6196-423e-9924-31cb9633bbb5

disruptor架構三 使用場景 使用WorkHandler和BatchEventProcessor輔助創建消費者