tomcat原始碼 Connector
Connector容器主要負責解析socket請求,在tomcat中的原始碼位於org.apache.catalina.connector和org.apache.coyote包路徑下;通過上兩節的分析,我們知道了Connector是Service的子容器,而Service又是Server的子容器。在server.xml檔案中配置,然後在Catalina類中通過Digester完成例項化。在server.xml中預設配置了兩種Connector的實現,分別用來處理Http請求和AJP請求。
Connector的實現一共有以下三種:
1、Http Connector:解析HTTP請求,又分為BIO Http Connector和NIO Http Connector,即阻塞IO Connector和非阻塞IO Connector。本文主要分析NIO Http Connector的實現過程。
2、AJP:基於AJP協議,用於Tomcat與HTTP伺服器通訊定製的協議,能提供較高的通訊速度和效率。如與Apache伺服器整合時,採用這個協議。
3、APR HTTP Connector:用C實現,通過JNI呼叫的。主要提升對靜態資源(如HTML、圖片、CSS、JS等)的訪問效能。
具體要使用哪種Connector可以在server.xml檔案中通過protocol屬性配置如下:
<Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" />
然後看一下Connector的構造器:
public Connector(String protocol) { setProtocol(protocol); // Instantiate protocol handler ProtocolHandler p = null; try { Class<?> clazz = Class.forName(protocolHandlerClassName); p = (ProtocolHandler) clazz.getConstructor().newInstance(); } catch (Exception e) { log.error(sm.getString( "coyoteConnector.protocolHandlerInstantiationFailed"), e); } finally { this.protocolHandler = p; } if (Globals.STRICT_SERVLET_COMPLIANCE) { uriCharset = StandardCharsets.ISO_8859_1; } else { uriCharset = StandardCharsets.UTF_8; } } public void setProtocol(String protocol) { boolean aprConnector = AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseAprConnector(); if ("HTTP/1.1".equals(protocol) || protocol == null) { if (aprConnector) { setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol"); } else { setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol"); } } else if ("AJP/1.3".equals(protocol)) { if (aprConnector) { setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol"); } else { setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol"); } } else { setProtocolHandlerClassName(protocol); } }
通過分析Connector構造器的原始碼可以知道,每一個Connector對應了一個protocolHandler,一個protocolHandler被設計用來監聽伺服器某個埠的網路請求,但並不負責處理請求(處理請求由Container元件完成)。下面就以Http11NioProtocol為例分析Http請求的解析過程。
在Connector的startInterval方法中啟動了protocolHandler,程式碼如下:
protected void startInternal() throws LifecycleException { // Validate settings before starting if (getPort() < 0) { throw new LifecycleException(sm.getString( "coyoteConnector.invalidPort", Integer.valueOf(getPort()))); } setState(LifecycleState.STARTING); try { protocolHandler.start(); } catch (Exception e) { throw new LifecycleException( sm.getString("coyoteConnector.protocolHandlerStartFailed"), e); } }
Http11NioProtocol建立一個org.apache.tomcat.util.net.NioEndpoint例項,然後將監聽埠並解析請求的工作全被委託給NioEndpoint實現。tomcat在使用Http11NioProtocol解析HTTP請求時一共設計了三種執行緒,分別為Acceptor,Poller和Worker。
1、Acceptor執行緒
Acceptor實現了Runnable介面,根據其命名就知道它是一個接收器,負責接收socket,其接收方法是serverSocket.accept()方式,獲得SocketChannel物件,然後封裝成tomcat自定義的org.apache.tomcat.util.net.NioChannel。雖然是Nio,但在接收socket時仍然使用傳統的方法,使用阻塞方式實現。Acceptor以執行緒池的方式被建立和管理,在NioEndpoint的startInternal()方法中完成Acceptor的啟動,原始碼如下:
public void startInternal() throws Exception { if (!running) { running = true; paused = false; processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); // Create worker collection if ( getExecutor() == null ) { createExecutor(); } //設定最大連線數,預設值為maxConnections = 10000,通過同步器AQS實現。 initializeConnectionLatch(); //預設是2個,Math.min(2,Runtime.getRuntime().availableProcessors());和虛擬機器處理器個數比較 // Start poller threads pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } startAcceptorThreads(); } }
繼續追蹤startAcceptorThreads的原始碼
protected final void startAcceptorThreads() { //啟動Acceptor執行緒,預設是1個 int count = getAcceptorThreadCount(); acceptors = new Acceptor[count]; for (int i = 0; i < count; i++) { acceptors[i] = createAcceptor(); String threadName = getName() + "-Acceptor-" + i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }
Acceptor執行緒的核心程式碼在它的run方法中:
protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait countUpOrAwaitConnection(); SocketChannel socket = null; try { // Accept the next incoming connection from the server // socket //接收socket請求 socket = serverSock.accept(); } catch (IOException ioe) { // We didn't get a socket countDownConnection(); if (running) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (running && !paused) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful if (!setSocketOptions(socket)) { closeSocket(socket); } } else { closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; } private void closeSocket(SocketChannel socket) { countDownConnection(); try { socket.socket().close(); } catch (IOException ioe){ if (log.isDebugEnabled()) { log.debug(sm.getString("endpoint.err.close"), ioe); } } try { socket.close(); } catch (IOException ioe) { if (log.isDebugEnabled()) { log.debug(sm.getString("endpoint.err.close"), ioe); } } } }
Acceptor完成了socket請求的接收,然後交給NioEndpoint 進行配置,繼續追蹤Endpoint的setSocketOptions方法。
protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //disable blocking, APR style, we are gonna be polling it //設定為非阻塞 socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); NioChannel channel = nioChannels.pop(); if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } //輪訓pollers陣列元素,呼叫Poller的register方法,完成channel的註冊。 getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; }
分析setSocketOptions的原始碼可以知道,該方法的主要功能是利用傳入的SocketChannel引數生成SecureNioChannel或者NioChannel,然後註冊到Poller執行緒的selector中,可以進一步瞭解Java nio的相關知識,對這一塊內容有更深的理解。
2、Poller執行緒
Poller同樣實現了Runnable介面,是NioEndpoint類的內部類。在Endpoint的startInterval方法中建立、配置並啟動了Poller執行緒,見程式碼清單4。Poller主要職責是不斷輪詢其selector,檢查準備就緒的socket(有資料可讀或可寫),實現io的多路複用。其構造其中初始化了selector。
public Poller() throws IOException { this.selector = Selector.open(); }
在分析Acceptor的時候,提到了Acceptor接受到一個socket請求後,呼叫NioEndpoint的setSocketOptions方法(程式碼清單6),該方法生成了NioChannel後呼叫Poller的register方法生成PoolorEvent後加入到Eventqueue,register方法的原始碼如下:
public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getSocketProperties().getSoTimeout()); ka.setWriteTimeout(getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); PollerEvent r = eventCache.pop(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. //生成PoolorEvent並加入到Eventqueue if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); }
Poller的核心程式碼也在其run方法中:
public void run() { // Loop until destroy() is called // 呼叫了destroy()方法後終止此迴圈 while (true) { boolean hasEvents = false; try { if (!close) { hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //do a non blocking select //非阻塞的 select keyCount = selector.selectNow(); } else { //阻塞selector,直到有準備就緒的socket keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { //該方法遍歷了eventqueue中的所有PollerEvent,然後依次呼叫PollerEvent的run,將socket註冊到selector中。 events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. //遍歷就緒的socket事件 while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); //呼叫processKey方法對有資料讀寫的socket進行處理,在分析Worker執行緒時會分析該方法 processKey(sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); }//while getStopLatch().countDown(); }
run方法中呼叫了events方法:
public boolean events() { boolean result = false; PollerEvent pe = null; for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) { result = true; try { //將pollerEvent中的每個socketChannel註冊到selector中 pe.run(); pe.reset(); if (running && !paused) { //將註冊了的pollerEvent加到endPoint.eventCache eventCache.push(pe); } } catch ( Throwable x ) { log.error("",x); } } return result; }
繼續跟進PollerEvent的run方法:
public void run() { if (interestOps == OP_REGISTER) { try { //將SocketChannel註冊到selector中,註冊時間為SelectionKey.OP_READ讀事件 socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail"), x); } } else { final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { if (key == null) { // The key was cancelled (e.g. due to socket closure) // and removed from the selector while it was being // processed. Count down the connections at this point // since it won't have been counted down when the socket // closed. socket.socketWrapper.getEndpoint().countDownConnection(); ((NioSocketWrapper) socket.socketWrapper).closed = true; } else { final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment(); if (socketWrapper != null) { //we are registering the key to start with, reset the fairness counter. int ops = key.interestOps() | interestOps; socketWrapper.interestOps(ops); key.interestOps(ops); } else { socket.getPoller().cancelledKey(key); } } } catch (CancelledKeyException ckx) { try { socket.getPoller().cancelledKey(key); } catch (Exception ignore) {} } } }
3、Worker執行緒
Worker執行緒即SocketProcessor是用來處理Socket請求的。SocketProcessor也同樣是Endpoint的內部類。在Poller的run方法中(程式碼清單8)監聽到準備就緒的socket時會呼叫processKey方法進行處理:
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { try { if ( close ) { cancelledKey(sk); } else if ( sk.isValid() && attachment != null ) { //有讀寫事件就緒時 if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write // socket可讀時,先處理讀事件 if (sk.isReadable()) { //呼叫processSocket方法進一步處理 if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } //寫事件 if (!closeSocket && sk.isWritable()) { //呼叫processSocket方法進一步處理 if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { cancelledKey(sk); } } } } else { //invalid key cancelledKey(sk); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("",t); } }
繼續跟蹤processSocket方法:
public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } // 嘗試迴圈利用之前回收的SocketProcessor物件,如果沒有可回收利用的則建立新的SocketProcessor物件 SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { // 迴圈利用回收的SocketProcessor物件 sc.reset(socketWrapper, event); } Executor executor = getExecutor(); if (dispatch && executor != null) { //SocketProcessor實現了Runneble介面,可以直接傳入execute方法進行處理 executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; } //NioEndpoint中createSocketProcessor建立一個SocketProcessor。 protected SocketProcessorBase<NioChannel> createSocketProcessor( SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) { return new SocketProcessor(socketWrapper, event); }
總結:
Http11NioProtocol是基於Java Nio實現的,建立了Acceptor、Poller和Worker執行緒實現多路io的複用。三類執行緒之間的關係如下圖所示:
Acceptor和Poller之間是生產者消費者模式的關係,Acceptor不斷向EventQueue中新增PollerEvent,Pollor輪詢檢查EventQueue中就緒的PollerEvent,然後傳送給Work執行緒進行處理。