1. 程式人生 > >netty原始碼解解析(4.0)-3 Channel的抽象實現

netty原始碼解解析(4.0)-3 Channel的抽象實現

AbstractChannel和AbstractUnsafe抽象類 io.netty.channel.AbstractChannel 從本章開始,會有大量的篇幅涉及到程式碼分析。為了能夠清晰簡潔的地說明程式碼的結構和功能,我會用程式碼註釋+獨立段落的方式加以呈現。 所以,為你能更好地理解程式碼,請不要忽略程式碼中黑體字註釋。   AbstractChannel和AbstractUnsafe之間的關係 AbstractChannel實現了Channel介面,AbstractUnsafe實現了Unsafe。這兩個類是抽象類,他們實現了Channel和Unsafe的絕大部分介面。在AbstractChannel的實現中,每個方法都會直接或間接呼叫Unsafe對應的同名方法。所有的inbound和outbound方法都是通過pipeline間接呼叫,其他的輔助方法直接使用unsafe例項呼叫。pipline和unsafe例項在AbstractChannel的構造方法建立: protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); // AbstractChannel沒有實現這個方法
pipeline = newChannelPipeline(); // newChannelPipline的實現 return new DefaultChannelPipeline(this); } 直接呼叫的例子: @Override public SocketAddress localAddres) { SocketAddress localAddress = this.localAddress; if (localAddress == null) { try { // 這裡直接呼叫了Unsafe的localAddress()方法
this.localAddress = localAddress = unsafe().localAddress(); } catch (Throwable t) { // Sometimes fails on a closed socket in Windows. return null; } } return localAddress; }   間接呼叫的例子 @Override public ChannelFuture bind(SocketAddress localAddress) { return pipeline.bind(localAddress); //通過pipline間接呼叫Unsafe的bind方法 } 關於pipline是怎樣呼叫Unsafe方法的,會在後面的Pipline相關章節詳細分析,這裡只需記住。pipeline所有方法呼叫最終都會(如果沒有改變ChannelContextHandler的預設實現)通過使用newUnsafe建立的Unsafe例項呼叫Unsafe的同名方法(如果有的話)。 netty給出這一對Abstract實現有兩個目的:
  • 進一步明確介面的語意。
  • 簡化Channel介面的實現。
下面來具體看一下AbstractUnsafe的主要方法實現。     AbstractUnsafe的重要實現   register實現 @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { // 檢查是否已經註冊, 避免重複只需註冊動作。 promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) {// 檢查eventloop是否滿足Channel的要求,由子類實現 promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; }   // 設定Channel的EventLoop例項 AbstractChannel.this.eventLoop = eventLoop;   if (eventLoop.inEventLoop()) { // 檢查是否在當前執行緒中,如果是,直接呼叫 register0(promise); } else { // 如果不是,把register0包裝到runnable中放到eventloop中呼叫。 try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } 這個方法的實現為我們展示了netty使用I/O執行緒的一般套路 if(eventLoop.inEventLoop()){ doSomething(); }else{ eventLoop.execute(new Runnable(){ @Override public void run() { doSomething(); } }); } 對於某個需要放到I/O線性中執行的方法,先檢查當前執行緒是不是I/O執行緒,是就直接執行,不是就把它包裝到Ruannable中放到eventLoop中執行。 register的功能總結一句話就是呼叫register0, 下面看看register0的實現。 private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop // 確保promise沒有被取消同時Channel沒有被關閉才能執行後面的動作 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); // 執行真正的register操作,留改子類實現 neverRegistered = false; registered = true; // 設定Channel已經處於registed狀態   // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. // 觸發handlerAdded事件 pipeline.invokeHandlerAddedIfNeeded();   safeSetSuccess(promise); pipeline.fireChannelRegistered(); // 觸發channelRegistered事件 // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) {// 確保Channel只有在第一次register 的時候被觸發   pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 // 對於設定了autoRead的Channel執行beginRead(); beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } register語義:
  1. 把channel和eventLoop繫結,eventLoop執行緒就是I/O執行緒。
  2. 確保真正的register操作在I/O執行緒中執行。
  3. 確保每個channel的register操作只執行一次。
  4. 真正的register操作執行成功後, 觸發channelRegistered事件,如果channel此時仍處於active狀態,觸發channelActive事件,並確保這些事件只觸發一次。
  5. 真正的register操作執行成功後, 如果channel此時仍處於active狀態,並且channel的配置支援autoRead, 則執行beginRead操作,讓eventLoop可以自動觸發channel的read事件。
    bind實現 @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop();   if (!promise.setUncancellable() || !ensureOpen(promise)) { return; }   // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } // 先儲存是否active的狀態 boolean wasActive = isActive(); try { doBind(localAddress); / /呼叫doBind, 需要子類實現這個方法完成真正的bind操作 } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; }   if (!wasActive && isActive()) { // 如果執行完doBind後從非active狀態變成active裝,則觸發channelActive事件 invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); }   safeSetSuccess(promise); } bind語義:
  • 呼叫抽象方法doBind, 它需要子類實現。
  • 如果channel的狀態從非active變成active狀態,則觸發channelActive事件
  disconnect實現 disconnect和bind的實現型別,不同的是他呼叫的是doDisconnect方法,這個方法同樣是抽象方法需要子類實現。當channel的狀態從非active變成active狀態時,呼叫pipeline.fireChannelInactive()觸發channelInactive事件。   close實現 @Override public final void close(final ChannelPromise promise) { assertEventLoop();   close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false); }   private void close(final ChannelPromise promise, final Throwable cause, final ClosedChannelException closeCause, final boolean notify) { if (!promise.setUncancellable()) { return; }   if (closeInitiated) { // 這段程式碼的作用就是防止多次執行close操作 if (closeFuture.isDone()) { // Closed already. safeSetSuccess(promise); } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise. // This means close() was called before so we just register a listener and return closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception {   promise.setSuccess();   }   });   }   return;   }   closeInitiated = true;   final boolean wasActive = isActive(); final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; // 把outboundBuffer置空,在這之後無法進行write或flush操作 this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. Executor closeExecutor = prepareToClose(); // 這個方法預設實現是return null. 如果有些可以在子類中覆蓋這個方法新增關閉前的準備代 // 下面的if..else執行的是相同的操作,不同的是如果closeExecutor可以用,就在這個executor中執行,否則在當前執行緒總執行 if (closeExecutor != null) { closeExecutor.execute(new Runnable() { @Override public void run() { try { // Execute the close. doClose0(promise); // 執行close操作 } finally { // Call invokeLater so closeAndDeregister is executed in the EventLoop again! // close完成之後的操作, 在eventLoop中執行 invokeLater(new Runnable() { @Override public void run() { if (outboundBuffer != null) { // Fail all the queued messages // 對outboundBuffer中的資料進行錯誤處理 outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); } // 執行deregister操作, 如果channel由active變成非active狀態就觸發channelInactive事件 fireChannelInactiveAndDeregister(wasActive); } }); } } }); } else { try { // Close the channel and fail the queued messages in all cases. doClose0(promise); } finally { if (outboundBuffer != null) { // Fail all the queued messages. outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); } } if (inFlush0) { // 如果正在執行flush操作,把deregister操作放在eventLoop中執行 invokeLater(new Runnable() { @Override public void run() {   fireChannelInactiveAndDeregister(wasActive); } }); } else {   fireChannelInactiveAndDeregister(wasActive); } } }   private void doClose0(ChannelPromise promise) { try { doClose(); // 呼叫doClose執行真正的close操作,它是一個抽象方法,需要在子類中實現。 closeFuture.setClosed(); safeSetSuccess(promise); } catch (Throwable t) { closeFuture.setClosed(); safeSetFailure(promise, t); } } close實現的程式碼雖然比較多,但做的事情比較簡單:首先執行close操作,然後實現deregister操作,觸發channelInactive事件。 在close的實現中,先呼叫assertEventLoop方法確保當前方法是在eventLoop中執行,然後多次使用invokeLater方法吧一系列操作放在放在Runnable中執行,這樣做的目的是事為了保證接下來的操作一定在當前操作完成之後才會執行,這一點是有eventLoop來保證的,eventLoop執行Runnable的順序和呼叫execute的順序一致,相關實現會在後面eventLoop章節具體討論。   deregister實現 @Override public final void deregister(final ChannelPromise promise) { assertEventLoop();   deregister(promise, false); }   private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) { if (!promise.setUncancellable()) {   return; }   if (!registered) { // 避免多次執行deregister操作   safeSetSuccess(promise);   return; }   // As a user may call deregister() from within any method while doing processing in the ChannelPipeline, // we need to ensure we do the actual deregister operation later. This is needed as for example, // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay, // the deregister operation this could lead to have a handler invoked by different EventLoop and so // threads. // // See: // https://github.com/netty/netty/issues/4435 invokeLater(new Runnable() {   @Override   public void run() {   try {     doDeregister(); // 執行真正的deregister操作,這方法預設沒做任何事情,子類可以根據需要覆蓋實現   } catch (Throwable t) {     logger.warn("Unexpected exception occurred while deregistering a channel.", t);   } finally {     if (fireChannelInactive) {       pipeline.fireChannelInactive(); // 觸發channelInactive事件     } // Some transports like local and AIO does not allow the deregistration of // an open channel. Their doDeregister() calls close(). Consequently, // close() calls deregister() again - no need to fire channelUnregistered, so check // if it was registered.     if (registered) {       registered = false;       pipeline.fireChannelUnregistered(); // 觸發channelUnregistered事件     }     safeSetSuccess(promise);   }   } }); } 語義:
  • 呼叫doDeregister執行真正的deregister操作
  • 根據引數可能需要觸發channelInactive事件
  • 觸發channelUnregistered事件
  write實現 @Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop();   ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // 如果outboundBuffer是null, 意味著這個channel已經被close掉了,需要使用promise返回錯誤,然後釋放掉msg // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; }   int size; try {   msg = filterOutboundMessage(msg); // 過濾msg, 預設實現中沒有做任何操作,把msg原樣返回, 資料可以根據需要覆蓋實現。   size = pipeline.estimatorHandle().size(msg); // 計算msg序列化之後的長度   if (size < 0) {     size = 0;   } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; }   outboundBuffer.addMessage(msg, size, promise); // 把msg放入outboundBuffer中 } write的操作比較簡單,他只是把訊息放到outboundBuffer中,並沒有做實際的寫操作。   flush實現 @Override public final void flush() { assertEventLoop(); // 確保在eventLoop中執行   ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) {   return; } outboundBuffer.addFlush(); // 如果outboundBuffer不是null才可以進入真正的write階段 flush0(); }   protected void flush0() { if (inFlush0) { // 確保不被多個執行緒同時執行 // Avoid re-entrance return; }   final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { // 確保outboundBuffer有資料是才執行下面的步驟   return; }   inFlush0 = true;   // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { // 如果channel不是active狀態,返回錯誤 try {   if (isOpen()) {     outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);   } else {     // Do not trigger channelWritabilityChanged because the channel is closed already.     outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);   } } finally {   inFlush0 = false; } return; }   try {   doWrite(outboundBuffer); // 執行真正的寫操作,這是一個抽象方法,需要子類實現。 } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ // 如是I/O異常,並且channel配置允許自動關閉,則關閉channel close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { try {   shutdownOutput(voidPromise(), t); // 關閉output通道,不允許執行write操作。 } catch (Throwable t2) {   close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } } finally {   inFlush0 = false; } } 語義:
  • 呼叫doWrite方法執行真正的寫操作
  • 如果寫操作失敗,呼叫close或者shutdownOutput進行善後。
    至此,已經分析完了AbstractChannel和AbstractUnsafe的所有重要的實現,回頭總結一下,這個類主要做了這麼幾件事: 1. 明確了AbstractChannel和AbstractUnsafe方法之間的呼叫關係,或通過unsafe例項直接呼叫,或通過pipleline間接呼叫。 2. 規定了Unsafe方法的執行執行緒,有些必須在eventLoop中執行,這樣的方法第一行就呼叫assertEventLoop來確保當前方法是在eventLoop線性中,有些不需要一定在eventLoop中執行的則沒有這個呼叫 3. 確保多執行緒多執行緒環境下的執行順序,這一點通過把一系列操作包裝成Runnable放入eventLoop中來保證,invokeLater方法就是一個典型的例子。 4. 定義了事件的觸發條件,在前面的程式碼分析中,頻繁地出現pipeline.fireXXX()的呼叫,這些呼叫就是在觸發特定的事件,大部分情況下使用者不要自己去觸發事件。 5. 優化多執行緒環境下的資料同步效能,使用volatile減少synchronized和Lock的使用, 典型的用法如下所示: private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { ...... return; } .... doWrite(outboundBuffer);   AbstractUnsafe的擴充套件點 前面說過,AbstractUnsafe做了很多事,但把臨門一腳的工作交給子類完成,這樣讓子類的實現變得簡單很多。AbstractUsafe把這些工作定義成形如doXXX的抽象方法或是沒有幹任何事的空方法。下面是這些方法的列表:
方法 說明
protected abstract SocketAddress localAddress0() 被localAddress呼叫,執行真正的獲取本地地址的操作。
protected abstract SocketAddress remoteAddress0() 被remoteAddress呼叫,是真正的獲取遠端地址的操作。
protected abstract boolean isCompatible(EventLoop loop) 檢查eventLoop是是否和這個Channel相容。
protected void doRegister() 呼叫鏈register->register0->doRegister, 真正的註冊操作。
protected abstract void doBind(SocketAddress localAddress) 被bind呼叫,執行真正繫結本地地址的操作。
protected abstract void doDisconnect() 被disconnect呼叫,執行真正的斷開連線操作。
protected abstract void doClose() 被close掉,執行真正的關閉channel操作。
protected void doShutdownOutput() 被shutdownOutput呼叫,用來關閉output通道,使Channel不能write。它的的預設實現是呼叫doClose
protected void doDeregister() 被deregister呼叫,是真正的登出操作,雖然不是抽象方法,然而只有一個{}, 還是要等你來搞定。
protected abstract void doBeginRead() 呼叫鏈register->register0->beginRead->doBeginRead, 實現讓eventLoop可以自動觸發read事件。
protected abstract void doWrite(ChannelOutboundBuffer in) 呼叫鏈flush->flush0->doWrite, 執行真正的寫操作。
protected Object filterOutboundMessage(Object msg) 被write呼叫,在訊息被放到outboundBuffer之前對訊息進行處理,預設啥事都沒幹,就是把你傳進去的msg還給你。