tomcat原始碼淺析(五)之請求的完整過程(http1.1)
阿新 • • 發佈:2019-01-05
http1.1的Connector的protocolHandler為org.apache.coyote.http11.Http11Protocol。Http11Protocol的endpoint為JIoEndpoint。JIoEndpoint的handler為Http11Protocol.Http11ConnectionHandler。
以url為http://127.0.0.1:8080/為例執行緒棧的資訊如下:
1.JIoEndpoint在start時會先建立Executor執行緒池用來執行SocketProcessor處理接收的請求,然後會建立一個接收請求執行緒組。
- public void startInternal() throws Exception {
- if (!running) {
- running = true;
- paused = false;
- // Create worker collection
- if (getExecutor() == null) {
- createExecutor();
- }
- initializeConnectionLatch();
- startAcceptorThreads();
- // Start async timeout thread
- Thread timeoutThread = new Thread(new AsyncTimeout(), getName() + "-AsyncTimeout");
- timeoutThread.setPriority(threadPriority);
- timeoutThread.setDaemon(true);
- timeoutThread.start();
- }
- }
Abstractendpoint.createexecutor程式碼
- public void createExecutor() {
- internalExecutor = true;
- TaskQueue taskqueue = new TaskQueue();
- TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
- executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
- taskqueue.setParent( (ThreadPoolExecutor) executor);
- }
Abstractendpoint.startacceptorthreads程式碼
- protected final void startAcceptorThreads() {
- int count = getAcceptorThreadCount();
- acceptors = new Acceptor[count];
- for (int i = 0; i < count; i++) {
- acceptors[i] = createAcceptor();
- String threadName = getName() + "-Acceptor-" + i;
- acceptors[i].setThreadName(threadName);
- Thread t = new Thread(acceptors[i], threadName);
- t.setPriority(getAcceptorThreadPriority());
- t.setDaemon(getDaemon());
- t.start();
- }
- }
2.Acceptor講接收到的請求socke後呼叫processSocketWithOptions包裝成SocketWrapper並呼叫執行緒池執行SocketProcessor。
Jioendpoint.acceptor.run程式碼- public void run() {
- int errorDelay = 0;
- // Loop until we receive a shutdown command
- while (running) {
- // Loop if endpoint is paused
- while (paused && running) {
- state = AcceptorState.PAUSED;
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- // Ignore
- }
- }
- if (!running) {
- break;
- }
- state = AcceptorState.RUNNING;
- try {
- //if we have reached max connections, wait
- countUpOrAwaitConnection();
- Socket socket = null;
- try {
- // Accept the next incoming connection from the server
- // socket
- socket = serverSocketFactory.acceptSocket(serverSocket);
- } catch (IOException ioe) {
- countDownConnection();
- // Introduce delay if necessary
- errorDelay = handleExceptionWithDelay(errorDelay);
- // re-throw
- throw ioe;
- }
- // Successful accept, reset the error delay
- errorDelay = 0;
- // Configure the socket
- if (running && !paused && setSocketOptions(socket)) {
- // Hand this socket off to an appropriate processor
- if (!processSocket(socket)) {
- countDownConnection();
- // Close socket right away
- closeSocket(socket);
- }
- } else {
- countDownConnection();
- // Close socket right away
- closeSocket(socket);
- }
- } catch (IOException x) {
- if (running) {
- log.error(sm.getString("endpoint.accept.fail"), x);
- }
- } catch (NullPointerException npe) {
- if (running) {
- log.error(sm.getString("endpoint.accept.fail"), npe);
- }
- } catch (Throwable t) {
- ExceptionUtils.handleThrowable(t);
- log.error(sm.getString("endpoint.accept.fail"), t);
- }
- }
- state = AcceptorState.ENDED;
- }
Jioendpoint.processsocket程式碼
- protected boolean processSocket(Socket socket) {
- // Process the request from this socket
- try {
- SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
- wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
- wrapper.setSecure(isSSLEnabled());
- // During shutdown, executor may be null - avoid NPE
- if (!running) {
- return false;
- }
- getExecutor().execute(new SocketProcessor(wrapper));
- } catch (RejectedExecutionException x) {
- log.warn("Socket processing request was rejected for:" + socket, x);
- return false;
- } catch (Throwable t) {
- ExceptionUtils.handleThrowable(t);
- // This means we got an OOM or similar creating a thread, or that
- // the pool and its queue are full
- log.error(sm.getString("endpoint.process.fail"), t);
- return false;
- }
- return true;
- }
3.SocketProcessor會呼叫handler的process。
Socketprocessor.run程式碼- public void run() {
- boolean launch = false;
- synchronized (socket) {
- try {
- SocketState state = SocketState.OPEN;
- try {
- // SSL handshake
- serverSocketFactory.handshake(socket.getSocket());
- } catch (Throwable t) {
- ExceptionUtils.handleThrowable(t);
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("endpoint.err.handshake"), t);
- }
- // Tell to close the socket
- state = SocketState.CLOSED;
- }
- if ((state != SocketState.CLOSED)) {
- if (status == null) {
- state = handler.process(socket, SocketStatus.OPEN_READ);
- } else {
- state = handler.process(socket, status);
- }
- }
- if (state == SocketState.CLOSED) {
- // Close socket
- if (log.isTraceEnabled()) {
- log.trace("Closing socket:" + socket);
- }
- countDownConnection();
- try {
- socket.getSocket().close();
- } catch (IOException e) {
- // Ignore
- }
- } else if (state == SocketState.OPEN || state == SocketState.UPGRADING
- || state == SocketState.UPGRADING_TOMCAT || state == SocketState.UPGRADED) {
- socket.setKeptAlive(true);
- socket.access();
- launch = true;
- } else if (state == SocketState.LONG) {
- socket.access();
- waitingRequests.add(socket);
- }
- } finally {
- if (launch) {
- try {
- getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
- } catch (RejectedExecutionException x) {
- log.warn("Socket reprocessing request was rejected for:" + socket, x);
- try {
- //unable to handle connection at this time
- handler.process(socket, SocketStatus.DISCONNECT);
- } finally {
- countDownConnection();
- }
- } catch (NullPointerException npe) {
- if (running) {
- log.error(sm.getString("endpoint.launch.fail"), npe);
- }
- }
- }
- }
- }
- socket = null;
- // Finish up this request
- }
- }
4.Http11Protocol.Http11ConnectionHandler又會呼叫Processor的process方法
Abstractprotocol.process程式碼- public SocketState process(SocketWrapper<S> wrapper,
- SocketStatus status) {
- if (wrapper == null) {
- // Nothing to do. Socket has been closed.
- return SocketState.CLOSED;
- }
- S socket = wrapper.getSocket();
- if (socket == null) {
- // Nothing to do. Socket has been closed.
- return SocketState.CLOSED;
- }
- Processor<S> processor = connections.get(socket);
- if (status == SocketStatus.DISCONNECT && processor == null) {
- // Nothing to do. Endpoint requested a close and there is no
- // longer a processor associated with this socket.
- return SocketState.CLOSED;
- }
- wrapper.setAsync(false);
- ContainerThreadMarker.markAsContainerThread();
- try {
- if (processor == null) {
- processor = recycledProcessors.poll();
- }
- if (processor == null) {
- processor = createProcessor();
- }
- initSsl(wrapper, processor);
- SocketState state = SocketState.CLOSED;
- do {
- if (status == SocketStatus.DISCONNECT &&
- !processor.isComet()) {
- // Do nothing here, just wait for it to get recycled
- // Don't do this for Comet we need to generate an end
- // event (see BZ 54022)
- } else if (processor.isAsync() || state == SocketState.ASYNC_END) {
- state = processor.asyncDispatch(status);
- if (state == SocketState.OPEN) {
- // release() won't get called so in case this request
- // takes a long time to process, remove the socket from
- // the waiting requests now else the async timeout will
- // fire
- getProtocol().endpoint.removeWaitingRequest(wrapper);
- // There may be pipe-lined data to read. If the data
- // isn't processed now, execution will exit this
- // loop and call release() which will recycle the
- // processor (and input buffer) deleting any
- // pipe-lined data. To avoid this, process it now.
- state = processor.process(wrapper);
- }
- } else if (processor.isComet()) {
- state = processor.event(status);
- } else if (processor.getUpgradeInbound() != null) {
- state = processor.upgradeDispatch();
- } else if (processor.isUpgrade()) {
- state = processor.upgradeDispatch(status);
- } else {
- state = processor.process(wrapper);
- }
- if (state != SocketState.CLOSED && processor.isAsync()) {
- state = processor.asyncPostProcess();
- }
- if (state == SocketState.UPGRADING) {
- // Get the HTTP upgrade handler
- HttpUpgradeHandler httpUpgradeHandler =
- processor.getHttpUpgradeHandler();
- // Release the Http11 processor to be re-used
- release(wrapper, processor, false, false);
- // Create the upgrade processor
- processor = createUpgradeProcessor(
- wrapper, httpUpgradeHandler);
- // Mark the connection as upgraded
- wrapper.setUpgraded(true);
- // Associate with the processor with the connection
- connections.put(socket, processor);
- // Initialise the upgrade handler (which may trigger
- // some IO using the new protocol which is why the lines
- // above are necessary)
- // This cast should be safe. If it fails the error
- // handling for the surrounding try/catch will deal with
- // it.
- httpUpgradeHandler.init((WebConnection) processor);
- } else if (state == SocketState.UPGRADING_TOMCAT) {
- // Get the UpgradeInbound handler
- org.apache.coyote.http11.upgrade.UpgradeInbound inbound =
- processor.getUpgradeInbound();
- // Release the Http11 processor to be re-used
- release(wrapper, processor, false, false);
- // Create the light-weight upgrade processor
- processor = createUpgradeProcessor(wrapper, inbound);
- inbound.onUpgradeComplete();
- }
- if (getLog().isDebugEnabled()) {
- getLog().debug("Socket: [" + wrapper +
- "], Status in: [" + status +
- "], State out: [" + state + "]");
- }
- } while (state == SocketState.ASYNC_END ||
- state == SocketState.UPGRADING ||
- state == SocketState.UPGRADING_TOMCAT);
- if (state == SocketState.LONG) {
- // In the middle of processing a request/response. Keep the
- // socket associated with the processor. Exact requirements
- // depend on type of long poll
- connections.put(socket, processor);
- longPoll(wrapper, processor);
- } else if (state == SocketState.OPEN) {
- // In keep-alive but between requests. OK to recycle
- // processor. Continue to poll for the next request.
- connections.remove(socket);
- release(wrapper, processor, false, true);
- } else if (state == SocketState.SENDFILE) {
- // Sendfile in progress. If it fails, the socket will be
- // closed. If it works, the socket will be re-added to the
- // poller
- connections.remove(socket);
- release(wrapper, processor, false, false);
- } else if (state == SocketState.UPGRADED) {
- // Need to keep the connection associated with the processor
- connections.put(socket, processor);
- // Don't add sockets back to the poller if this was a
- // non-blocking write otherwise the poller may trigger
- // multiple read events which may lead to thread starvation
- &nb