Dubbo學習筆記8:Dubbo的執行緒模型與執行緒池策略
阿新 • • 發佈:2018-12-13
Dubbo預設的底層網路通訊使用的是Netty,服務提供方NettyServer使用兩級執行緒池,其中 EventLoopGroup(boss) 主要用來接受客戶端的連結請求,並把接受的請求分發給 EventLoopGroup(worker) 來處理,boss和worker執行緒組我們稱之為IO執行緒。
如果服務提供方的邏輯能迅速完成,並且不會發起新的IO請求,那麼直接在IO執行緒上處理會更快,因為這減少了執行緒池排程。
但如果處理邏輯很慢,或者需要發起新的IO請求,比如需要查詢資料庫,則IO執行緒必須派發請求到新的執行緒池進行處理,否則IO執行緒會阻塞,將導致不能接收其它請求。
Dubbo提供的執行緒模型
根據請求的訊息類被IO執行緒處理還是被業務執行緒池處理,Dubbo提供了下面幾種執行緒模型:
- all : (AllDispatcher類)所有訊息都派發到業務執行緒池,這些訊息包括請求/響應/連線事件/斷開事件/心跳等,這些執行緒模型如下圖:
- direct : (DirectDispacher類)所有訊息都不派發到業務執行緒池,全部在IO執行緒上直接執行,模型如下圖:
- message : (MessageOnlyDispatcher類)只有請求響應訊息派發到業務執行緒池,其他連線斷開事件/心跳等訊息,直接在IO執行緒上執行,模型圖如下:
- execution:(ExecutionDispatcher類)只把請求類訊息派發到業務執行緒池處理,但是響應和其它連線斷開事件,心跳等訊息直接在IO執行緒上執行,模型如下圖:
- connection:(ConnectionOrderedDispatcher類)在IO執行緒上,將連線斷開事件放入佇列,有序逐個執行,其它訊息派發到業務執行緒池處理,模型如下圖:
其中AllDispatcher對應的handler程式碼如下:
public class AllChannelHandler extends WrappedChannelHandler{ public AllChannelHandler(ChannelHandler handler , URL url){ super(handler,url); } // 連結事件,交給業務執行緒池處理 public void connected(Channel channel) throws RemotingExcecption{ ExecutorService cexecutor = getExecutorService(); try{ cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.CONNECTED)); }catch(Throwable t){ throw new ExecutionException("connect event" , channel , getClass() + "error when process connected event.",t); } } // 連結斷開事件,交給業務執行緒池處理 public void disconnected(Channel channel) throws RemotingException{ ExecutorService cexecutor = getExecutorService(); try{ cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.DISCONNECTED)); }catch(Throwable t){ throw new ExecutionException("disconnect event",channel,getClass()+" error when process disconnected event.",t); } } // 請求響應事件,交給業務執行緒池處理 public void received(Channel channel , Object message) throws RemotingException{ ExecutorService cexecutor = getExecutorService(); try{ cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.RECEIVED,message)); }catch(Throwable t){ // TODO 臨時解決執行緒池滿後異常資訊無法傳送到對端的問題。待重構 // fix 執行緒池滿了拒絕呼叫不返回,導致消費者一直等待超時 if(message instanceof Request && t instanceof RejectedExecutionException ){ ... } throw new ExecutionException(message , channel ,getClass() + " error when process received event.",t); } } // 異常處理事件,交給業務執行緒池處理 public void caught(Channel channel , Throwable exception) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try{ cexecutor.execute(new ChannelEventRunnable(channel,handler,ChannelState.CAUGHT,exception)); }catch(Throwable t){ throw new ExecutionException("caught event",channel,getClass() + " error when process caught event ."); } } ... }
可知所有事件都直接交給業務執行緒池進行處理了。
Dubbo提供了常用的執行緒池模型,這些模型可以滿足我們絕大多數的需求,但是您可以根據自己的需要進行擴充套件定製。在服務提供者啟動執行緒時,我們會看到什麼時候載入的執行緒模型的實現。
Dubbo提供的執行緒池策略
擴充套件介面 ThreadPool 的SPI實現有如下幾種:
- fixed:固定大小執行緒池,啟動時建立執行緒,不關閉,一直持有(預設)。
- cached:快取執行緒池,空閒一分鐘自動刪除,需要時重建。
- limited:可伸縮執行緒池,但池中的執行緒數只會增長不會收縮。只增長不收縮的目的是為了避免收縮時突然帶來大流量引起效能問題。
其中fixed策略對應擴充套件實現類是FixedThreadPool,程式碼如下:
public class FixedThreadPool implements ThreadPool{ public Executor getExecutor(URL url){ String name = url.getParameter(Constants.THREAD_NAME_KEY,Constants.DEFAULT_THREAD_NAME); int threads = url.getParameter(Constants.THREADS_KEY,Constants.DEFAULT_THREADS); int queues = url.getParameter(Constants.QUEUES_KEY,Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(threads , threads , 0 , TimeUnit.MILLISECONDS , queues==0 ? new SynchronousQueue<Runnable>() : (queue < 0 ? new LinkedBlockingQueue<Runnable>(queues)) , new NamedThreadFactory(name,true) , new AbortPolicyWithReport(name,url)); } }
可知使用ThreadPoolExecutor建立了核心執行緒數=最大執行緒池數=threads的執行緒池。
Dubbo執行緒池擴充套件,這些擴充套件可以滿足我們絕大多數的需求,但是您可以根據自己的需要進行擴充套件定製。在服務提供者啟動流程時,我們會看到什麼時候載入的執行緒池擴充套件實現。