Disruptor 實踐:整合到現有的爬蟲框架

秋天的顏色.jpg
一. Disruptor
Disruptor 是一個高效能的非同步處理框架。
Disruptor 是 LMAX 線上交易平臺的關鍵組成部分,LMAX平臺使用該框架對訂單處理速度能達到600萬TPS,除金融領域之外,其他一般的應用中都可以用到Disruptor,它可以帶來顯著的效能提升。其實 Disruptor 與其說是一個框架,不如說是一種設計思路,這個設計思路對於存在“併發、緩衝區、生產者—消費者模型、事務處理”這些元素的程式來說,Disruptor提出了一種大幅提升效能(TPS)的方案。
二. 實踐
ofollow,noindex">NetDiscovery 是基於 Vert.x、RxJava 2 等框架實現的爬蟲框架。
NetDiscovery 預設的訊息佇列採用 JDK 的 ConcurrentLinkedQueue,由於爬蟲框架各個元件都可以被替換,所以下面基於 Disruptor 實現爬蟲的 Queue。
2.1 事件的封裝
將爬蟲的 request 封裝成一個 RequestEvent,該事件會在 Disruptor 中傳輸。
import com.cv4j.netdiscovery.core.domain.Request; import lombok.Data; /** * Created by tony on 2018/9/1. */ @Data public class RequestEvent { private Request request; public String toString() { return request.toString(); } }
2.2 釋出事件
下面編寫事件的釋出,從 RingBuffer 中獲取下一個可寫入事件的序號,將爬蟲要請求的 request 設定到 RequestEvent 事件中,最後將事件提交到 RingBuffer。
import com.cv4j.netdiscovery.core.domain.Request; import com.lmax.disruptor.RingBuffer; import java.util.concurrent.atomic.AtomicInteger; /** * Created by tony on 2018/9/2. */ public class Producer { private final RingBuffer<RequestEvent> ringBuffer; private AtomicInteger count = new AtomicInteger(0); // 計數器 public Producer(RingBuffer<RequestEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void pushData(Request request){ long sequence = ringBuffer.next(); try{ RequestEvent event = ringBuffer.get(sequence); event.setRequest(request); }finally { ringBuffer.publish(sequence); count.incrementAndGet(); } } /** * 傳送到佇列中到Request的數量 * @return */ public int getCount() { return count.get(); } }
2.3 消費事件
RequestEvent 設定了 request 之後,消費者需要處理具體的事件。下面的 Consumer 僅僅是記錄消費者的執行緒名稱以及 request。真正的“消費”還是需要從 DisruptorQueue 的 poll() 中獲取 request ,然後在 Spider 中進行“消費”。
import com.lmax.disruptor.WorkHandler; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicInteger; /** * Created by tony on 2018/9/2. */ @Slf4j public class Consumer implements WorkHandler<RequestEvent> { @Override public void onEvent(RequestEvent requestEvent) throws Exception { log.info("consumer:" + Thread.currentThread().getName() + " requestEvent: value=" + requestEvent.toString()); } }
2.4 DisruptorQueue 的實現
Disruptor 支援單生產者單消費者、多生產者、多消費者、分組等方式。
在 NetDiscovery 中採用多生產者多消費者。
在 RingBuffer 建立時,ProducerType 使用 MULTI 型別表示多生產者。建立 RingBuffer 採用了 YieldingWaitStrategy 。YieldingWaitStrategy 是一種WaitStrategy,不同的 WaitStrategy 會有不同的效能。
YieldingWaitStrategy 效能是最好的,適合用於低延遲的系統。在要求極高效能且事件處理線數小於CPU邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超執行緒的特性。
ringBuffer = RingBuffer.create(ProducerType.MULTI, new EventFactory<RequestEvent>() { @Override public RequestEvent newInstance() { return new RequestEvent(); } }, ringBufferSize , new YieldingWaitStrategy());
EventProcessor 用於處理 Disruptor 中的事件。
EventProcessor 的實現類包括:BatchEventProcessor 用於單執行緒批量處理事件,WorkProcessor 用於多執行緒處理事件。
WorkerPool 管理著一組 WorkProcessor。建立完 ringBuffer 之後,建立 workerPool:
SequenceBarrier barriers = ringBuffer.newBarrier(); for (int i = 0; i < consumers.length; i++) { consumers[i] = new Consumer(); } workerPool = new WorkerPool<RequestEvent>(ringBuffer, barriers, new EventExceptionHandler(), consumers);
啟動 workerPool:
ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); workerPool.start(Executors.newFixedThreadPool(threadNum));
最後是 DisruptorQueue 完整的程式碼:
import com.cv4j.netdiscovery.core.domain.Request; import com.cv4j.netdiscovery.core.queue.AbstractQueue; import com.lmax.disruptor.*; import com.lmax.disruptor.dsl.ProducerType; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** * Created by tony on 2018/9/1. */ @Slf4j public class DisruptorQueue extends AbstractQueue { private RingBuffer<RequestEvent> ringBuffer; private Consumer[] consumers = null; private Producer producer = null; private WorkerPool<RequestEvent> workerPool = null; private int ringBufferSize = 1024*1024; // RingBuffer 大小,必須是 2 的 N 次方 private AtomicInteger consumerCount = new AtomicInteger(0); private static final int CONSUME_NUM = 2; private static final int THREAD_NUM = 4; public DisruptorQueue() { this(CONSUME_NUM,THREAD_NUM); } public DisruptorQueue(int consumerNum,int threadNum) { consumers = new Consumer[consumerNum]; //建立ringBuffer ringBuffer = RingBuffer.create(ProducerType.MULTI, new EventFactory<RequestEvent>() { @Override public RequestEvent newInstance() { return new RequestEvent(); } }, ringBufferSize , new YieldingWaitStrategy()); SequenceBarrier barriers = ringBuffer.newBarrier(); for (int i = 0; i < consumers.length; i++) { consumers[i] = new Consumer(); } workerPool = new WorkerPool<RequestEvent>(ringBuffer, barriers, new EventExceptionHandler(), consumers); ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); workerPool.start(Executors.newFixedThreadPool(threadNum)); producer = new Producer(ringBuffer); } @Override protected void pushWhenNoDuplicate(Request request) { producer.pushData(request); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public Request poll(String spiderName) { Request request = ringBuffer.get(ringBuffer.getCursor() - producer.getCount() +1).getRequest(); ringBuffer.next(); consumerCount.incrementAndGet(); return request; } @Override public int getLeftRequests(String spiderName) { return producer.getCount()-consumerCount.get(); } public int getTotalRequests(String spiderName) { return super.getTotalRequests(spiderName); } static class EventExceptionHandler implements ExceptionHandler { public void handleEventException(Throwable ex, long sequence, Object event) { log.debug("handleEventException:" + ex); } public void handleOnStartException(Throwable ex) { log.debug("handleOnStartException:" + ex); } public void handleOnShutdownException(Throwable ex) { log.debug("handleOnShutdownException:" + ex); } } }
其中,pushWhenNoDuplicate() 是將 request 傳送到 ringBuffer 中。poll() 是從 ringBuffer 中取出對應的 request ,用於爬蟲進行網路請求、解析請求等處理。
總結:
爬蟲框架 github 地址: https://github.com/fengzhizi715/NetDiscovery
上述程式碼是比較經典的 Disruptor 多生產者多消費者的程式碼,亦可作為樣板程式碼使用。
最後,在爬蟲框架是面向介面程式設計的,所以替換其中的任意元件都比較方便。