1. 程式人生 > >tomcat原始碼淺析(五)之請求的完整過程(http1.1)

tomcat原始碼淺析(五)之請求的完整過程(http1.1)

    http1.1的Connector的protocolHandler為org.apache.coyote.http11.Http11Protocol。Http11Protocol的endpoint為JIoEndpoint。JIoEndpoint的handler為Http11Protocol.Http11ConnectionHandler。

    以url為http://127.0.0.1:8080/為例執行緒棧的資訊如下:

    1.JIoEndpoint在start時會先建立Executor執行緒池用來執行SocketProcessor處理接收的請求,然後會建立一個接收請求執行緒組。

Jioendpoint.startinternal程式碼
  1. public void startInternal() throws Exception {  
  2.   
  3.         if (!running) {  
  4.             running = true;  
  5.             paused = false;  
  6.   
  7.             // Create worker collection  
  8.             if (getExecutor() == null) {  
  9.                 createExecutor();  
  10.             }  
  11.   
  12.             initializeConnectionLatch();  
  13.   
  14.             startAcceptorThreads();  
  15.   
  16.             // Start async timeout thread  
  17.             Thread timeoutThread = new Thread(new AsyncTimeout(), getName() + "-AsyncTimeout");  
  18.             timeoutThread.setPriority(threadPriority);  
  19.             timeoutThread.setDaemon(true);  
  20.             timeoutThread.start();  
  21.         }  
  22.     }  

  

Abstractendpoint.createexecutor程式碼
  1. public void createExecutor() {  
  2.        internalExecutor = true;  
  3.        TaskQueue taskqueue = new TaskQueue();  
  4.        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());  
  5.        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);  
  6.        taskqueue.setParent( (ThreadPoolExecutor) executor);  
  7.    }  

 

Abstractendpoint.startacceptorthreads程式碼
  1. protected final void startAcceptorThreads() {  
  2.        int count = getAcceptorThreadCount();  
  3.        acceptors = new Acceptor[count];  
  4.   
  5.        for (int i = 0; i < count; i++) {  
  6.            acceptors[i] = createAcceptor();  
  7.            String threadName = getName() + "-Acceptor-" + i;  
  8.            acceptors[i].setThreadName(threadName);  
  9.            Thread t = new Thread(acceptors[i], threadName);  
  10.            t.setPriority(getAcceptorThreadPriority());  
  11.            t.setDaemon(getDaemon());  
  12.            t.start();  
  13.        }  
  14.    }  

     2.Acceptor講接收到的請求socke後呼叫processSocketWithOptions包裝成SocketWrapper並呼叫執行緒池執行SocketProcessor。

Jioendpoint.acceptor.run程式碼
  1. public void run() {  
  2.   
  3.             int errorDelay = 0;  
  4.   
  5.             // Loop until we receive a shutdown command  
  6.             while (running) {  
  7.   
  8.                 // Loop if endpoint is paused  
  9.                 while (paused && running) {  
  10.                     state = AcceptorState.PAUSED;  
  11.                     try {  
  12.                         Thread.sleep(50);  
  13.                     } catch (InterruptedException e) {  
  14.                         // Ignore  
  15.                     }  
  16.                 }  
  17.   
  18.                 if (!running) {  
  19.                     break;  
  20.                 }  
  21.                 state = AcceptorState.RUNNING;  
  22.   
  23.                 try {  
  24.                     //if we have reached max connections, wait  
  25.                     countUpOrAwaitConnection();  
  26.   
  27.                     Socket socket = null;  
  28.                     try {  
  29.                         // Accept the next incoming connection from the server  
  30.                         // socket  
  31.                         socket = serverSocketFactory.acceptSocket(serverSocket);  
  32.                     } catch (IOException ioe) {  
  33.                         countDownConnection();  
  34.                         // Introduce delay if necessary  
  35.                         errorDelay = handleExceptionWithDelay(errorDelay);  
  36.                         // re-throw  
  37.                         throw ioe;  
  38.                     }  
  39.                     // Successful accept, reset the error delay  
  40.                     errorDelay = 0;  
  41.   
  42.                     // Configure the socket  
  43.                     if (running && !paused && setSocketOptions(socket)) {  
  44.                         // Hand this socket off to an appropriate processor  
  45.                         if (!processSocket(socket)) {  
  46.                             countDownConnection();  
  47.                             // Close socket right away  
  48.                             closeSocket(socket);  
  49.                         }  
  50.                     } else {  
  51.                         countDownConnection();  
  52.                         // Close socket right away  
  53.                         closeSocket(socket);  
  54.                     }  
  55.                 } catch (IOException x) {  
  56.                     if (running) {  
  57.                         log.error(sm.getString("endpoint.accept.fail"), x);  
  58.                     }  
  59.                 } catch (NullPointerException npe) {  
  60.                     if (running) {  
  61.                         log.error(sm.getString("endpoint.accept.fail"), npe);  
  62.                     }  
  63.                 } catch (Throwable t) {  
  64.                     ExceptionUtils.handleThrowable(t);  
  65.                     log.error(sm.getString("endpoint.accept.fail"), t);  
  66.                 }  
  67.             }  
  68.             state = AcceptorState.ENDED;  
  69.         }  

 

Jioendpoint.processsocket程式碼
  1. protected boolean processSocket(Socket socket) {  
  2.         // Process the request from this socket  
  3.         try {  
  4.             SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);  
  5.             wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());  
  6.             wrapper.setSecure(isSSLEnabled());  
  7.             // During shutdown, executor may be null - avoid NPE  
  8.             if (!running) {  
  9.                 return false;  
  10.             }  
  11.             getExecutor().execute(new SocketProcessor(wrapper));  
  12.         } catch (RejectedExecutionException x) {  
  13.             log.warn("Socket processing request was rejected for:" + socket, x);  
  14.             return false;  
  15.         } catch (Throwable t) {  
  16.             ExceptionUtils.handleThrowable(t);  
  17.             // This means we got an OOM or similar creating a thread, or that  
  18.             // the pool and its queue are full  
  19.             log.error(sm.getString("endpoint.process.fail"), t);  
  20.             return false;  
  21.         }  
  22.         return true;  
  23.     }  

    3.SocketProcessor會呼叫handler的process。

Socketprocessor.run程式碼
  1. public void run() {  
  2.             boolean launch = false;  
  3.             synchronized (socket) {  
  4.                 try {  
  5.                     SocketState state = SocketState.OPEN;  
  6.   
  7.                     try {  
  8.                         // SSL handshake  
  9.                         serverSocketFactory.handshake(socket.getSocket());  
  10.                     } catch (Throwable t) {  
  11.                         ExceptionUtils.handleThrowable(t);  
  12.                         if (log.isDebugEnabled()) {  
  13.                             log.debug(sm.getString("endpoint.err.handshake"), t);  
  14.                         }  
  15.                         // Tell to close the socket  
  16.                         state = SocketState.CLOSED;  
  17.                     }  
  18.   
  19.                     if ((state != SocketState.CLOSED)) {  
  20.                         if (status == null) {  
  21.                             state = handler.process(socket, SocketStatus.OPEN_READ);  
  22.                         } else {  
  23.                             state = handler.process(socket, status);  
  24.                         }  
  25.                     }  
  26.                     if (state == SocketState.CLOSED) {  
  27.                         // Close socket  
  28.                         if (log.isTraceEnabled()) {  
  29.                             log.trace("Closing socket:" + socket);  
  30.                         }  
  31.                         countDownConnection();  
  32.                         try {  
  33.                             socket.getSocket().close();  
  34.                         } catch (IOException e) {  
  35.                             // Ignore  
  36.                         }  
  37.                     } else if (state == SocketState.OPEN || state == SocketState.UPGRADING  
  38.                             || state == SocketState.UPGRADING_TOMCAT || state == SocketState.UPGRADED) {  
  39.                         socket.setKeptAlive(true);  
  40.                         socket.access();  
  41.                         launch = true;  
  42.                     } else if (state == SocketState.LONG) {  
  43.                         socket.access();  
  44.                         waitingRequests.add(socket);  
  45.                     }  
  46.                 } finally {  
  47.                     if (launch) {  
  48.                         try {  
  49.                             getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));  
  50.                         } catch (RejectedExecutionException x) {  
  51.                             log.warn("Socket reprocessing request was rejected for:" + socket, x);  
  52.                             try {  
  53.                                 //unable to handle connection at this time  
  54.                                 handler.process(socket, SocketStatus.DISCONNECT);  
  55.                             } finally {  
  56.                                 countDownConnection();  
  57.                             }  
  58.   
  59.                         } catch (NullPointerException npe) {  
  60.                             if (running) {  
  61.                                 log.error(sm.getString("endpoint.launch.fail"), npe);  
  62.                             }  
  63.                         }  
  64.                     }  
  65.                 }  
  66.             }  
  67.             socket = null;  
  68.             // Finish up this request  
  69.         }  
  70.     }  

 4.Http11Protocol.Http11ConnectionHandler又會呼叫Processor的process方法

Abstractprotocol.process程式碼
  1. public SocketState process(SocketWrapper<S> wrapper,  
  2.                 SocketStatus status) {  
  3.             if (wrapper == null) {  
  4.                 // Nothing to do. Socket has been closed.  
  5.                 return SocketState.CLOSED;  
  6.             }  
  7.   
  8.             S socket = wrapper.getSocket();  
  9.             if (socket == null) {  
  10.                 // Nothing to do. Socket has been closed.  
  11.                 return SocketState.CLOSED;  
  12.             }  
  13.   
  14.             Processor<S> processor = connections.get(socket);  
  15.             if (status == SocketStatus.DISCONNECT && processor == null) {  
  16.                 // Nothing to do. Endpoint requested a close and there is no  
  17.                 // longer a processor associated with this socket.  
  18.                 return SocketState.CLOSED;  
  19.             }  
  20.   
  21.             wrapper.setAsync(false);  
  22.             ContainerThreadMarker.markAsContainerThread();  
  23.   
  24.             try {  
  25.                 if (processor == null) {  
  26.                     processor = recycledProcessors.poll();  
  27.                 }  
  28.                 if (processor == null) {  
  29.                     processor = createProcessor();  
  30.                 }  
  31.   
  32.                 initSsl(wrapper, processor);  
  33.   
  34.                 SocketState state = SocketState.CLOSED;  
  35.                 do {  
  36.                     if (status == SocketStatus.DISCONNECT &&  
  37.                             !processor.isComet()) {  
  38.                         // Do nothing here, just wait for it to get recycled  
  39.                         // Don't do this for Comet we need to generate an end  
  40.                         // event (see BZ 54022)  
  41.                     } else if (processor.isAsync() || state == SocketState.ASYNC_END) {  
  42.                         state = processor.asyncDispatch(status);  
  43.                         if (state == SocketState.OPEN) {  
  44.                             // release() won't get called so in case this request  
  45.                             // takes a long time to process, remove the socket from  
  46.                             // the waiting requests now else the async timeout will  
  47.                             // fire  
  48.                             getProtocol().endpoint.removeWaitingRequest(wrapper);  
  49.                             // There may be pipe-lined data to read. If the data  
  50.                             // isn't processed now, execution will exit this  
  51.                             // loop and call release() which will recycle the  
  52.                             // processor (and input buffer) deleting any  
  53.                             // pipe-lined data. To avoid this, process it now.  
  54.                             state = processor.process(wrapper);  
  55.                         }  
  56.                     } else if (processor.isComet()) {  
  57.                         state = processor.event(status);  
  58.                     } else if (processor.getUpgradeInbound() != null) {  
  59.                         state = processor.upgradeDispatch();  
  60.                     } else if (processor.isUpgrade()) {  
  61.                         state = processor.upgradeDispatch(status);  
  62.                     } else {  
  63.                         state = processor.process(wrapper);  
  64.                     }  
  65.   
  66.                     if (state != SocketState.CLOSED && processor.isAsync()) {  
  67.                         state = processor.asyncPostProcess();  
  68.                     }  
  69.   
  70.                     if (state == SocketState.UPGRADING) {  
  71.                         // Get the HTTP upgrade handler  
  72.                         HttpUpgradeHandler httpUpgradeHandler =  
  73.                                 processor.getHttpUpgradeHandler();  
  74.                         // Release the Http11 processor to be re-used  
  75.                         release(wrapper, processor, false, false);  
  76.                         // Create the upgrade processor  
  77.                         processor = createUpgradeProcessor(  
  78.                                 wrapper, httpUpgradeHandler);  
  79.                         // Mark the connection as upgraded  
  80.                         wrapper.setUpgraded(true);  
  81.                         // Associate with the processor with the connection  
  82.                         connections.put(socket, processor);  
  83.                         // Initialise the upgrade handler (which may trigger  
  84.                         // some IO using the new protocol which is why the lines  
  85.                         // above are necessary)  
  86.                         // This cast should be safe. If it fails the error  
  87.                         // handling for the surrounding try/catch will deal with  
  88.                         // it.  
  89.                         httpUpgradeHandler.init((WebConnection) processor);  
  90.                     } else if (state == SocketState.UPGRADING_TOMCAT) {  
  91.                         // Get the UpgradeInbound handler  
  92.                         org.apache.coyote.http11.upgrade.UpgradeInbound inbound =  
  93.                                 processor.getUpgradeInbound();  
  94.                         // Release the Http11 processor to be re-used  
  95.                         release(wrapper, processor, false, false);  
  96.                         // Create the light-weight upgrade processor  
  97.                         processor = createUpgradeProcessor(wrapper, inbound);  
  98.                         inbound.onUpgradeComplete();  
  99.                     }  
  100.                     if (getLog().isDebugEnabled()) {  
  101.                         getLog().debug("Socket: [" + wrapper +  
  102.                                 "], Status in: [" + status +  
  103.                                 "], State out: [" + state + "]");  
  104.                     }  
  105.                 } while (state == SocketState.ASYNC_END ||  
  106.                         state == SocketState.UPGRADING ||  
  107.                         state == SocketState.UPGRADING_TOMCAT);  
  108.   
  109.                 if (state == SocketState.LONG) {  
  110.                     // In the middle of processing a request/response. Keep the  
  111.                     // socket associated with the processor. Exact requirements  
  112.                     // depend on type of long poll  
  113.                     connections.put(socket, processor);  
  114.                     longPoll(wrapper, processor);  
  115.                 } else if (state == SocketState.OPEN) {  
  116.                     // In keep-alive but between requests. OK to recycle  
  117.                     // processor. Continue to poll for the next request.  
  118.                     connections.remove(socket);  
  119.                     release(wrapper, processor, false, true);  
  120.                 } else if (state == SocketState.SENDFILE) {  
  121.                     // Sendfile in progress. If it fails, the socket will be  
  122.                     // closed. If it works, the socket will be re-added to the  
  123.                     // poller  
  124.                     connections.remove(socket);  
  125.                     release(wrapper, processor, false, false);  
  126.                 } else if (state == SocketState.UPGRADED) {  
  127.                     // Need to keep the connection associated with the processor  
  128.                     connections.put(socket, processor);  
  129.                     // Don't add sockets back to the poller if this was a  
  130.                     // non-blocking write otherwise the poller may trigger  
  131.                     // multiple read events which may lead to thread starvation  
  132.   &nb