1. 程式人生 > >tomcat的NIO執行緒模型原始碼分析

tomcat的NIO執行緒模型原始碼分析

1 tomcat8的併發引數控制

這種問題其實到官方文件上檢視一番就可以知道,tomcat很早的版本還是使用的BIO,之後就支援NIO了,具體版本我也不記得了,有興趣的自己可以去查下。本篇的tomcat版本是tomcat8.5。可以到這裡看下tomcat8.5的配置引數

我們先來簡單回顧下目前一般的NIO伺服器端的大致實現,借鑑infoq上的一篇文章Netty系列之Netty執行緒模型中的一張圖

一般NIO執行緒模型

  • 一個或多個Acceptor執行緒,每個執行緒都有自己的Selector,Acceptor只負責accept新的連線,一旦連線建立之後就將連線註冊到其他Worker執行緒中

  • 多個Worker執行緒,有時候也叫IO執行緒,就是專門負責IO讀寫的。一種實現方式就是像Netty一樣,每個Worker執行緒都有自己的Selector,可以負責多個連線的IO讀寫事件,每個連線歸屬於某個執行緒。另一種方式實現方式就是有專門的執行緒負責IO事件監聽,這些執行緒有自己的Selector,一旦監聽到有IO讀寫事件,並不是像第一種實現方式那樣(自己去執行IO操作),而是將IO操作封裝成一個Runnable交給Worker執行緒池來執行,這種情況每個連線可能會被多個執行緒同時操作,相比第一種併發性提高了,但是也可能引來多執行緒問題,在處理上要更加謹慎些。tomcat的NIO模型就是第二種。

所以一般引數就是Acceptor執行緒個數,Worker執行緒個數。來具體看下引數

1.1 acceptCount

文件描述為:

The maximum queue length for incoming connection requests when all possible request processing threads are in use. Any requests received when the queue is full will be refused. The default value is 100.

這個引數就立馬牽涉出一塊大內容:TCP三次握手的詳細過程,這個之後再詳細探討。這裡可以簡單理解為:連線在被ServerSocketChannel accept之前就暫存在這個佇列中,acceptCount就是這個佇列的最大長度。ServerSocketChannel accept就是從這個佇列中不斷取出已經建立連線的的請求。所以當ServerSocketChannel accept取出不及時就有可能造成該佇列積壓,一旦滿了連線就被拒絕了

1.2 acceptorThreadCount

文件如下描述

The number of threads to be used to accept connections. Increase this value on a multi CPU machine, although you would never really need more than 2. Also, with a lot of non keep alive connections, you might want to increase this value as well. Default value is 1.

Acceptor執行緒只負責從上述佇列中取出已經建立連線的請求。在啟動的時候使用一個ServerSocketChannel監聽一個連線埠如8080,可以有多個Acceptor執行緒併發不斷呼叫上述ServerSocketChannel的accept方法來獲取新的連線。引數acceptorThreadCount其實使用的Acceptor執行緒的個數。

1.3 maxConnections

文件描述如下

The maximum number of connections that the server will accept and process at any given time. When this number has been reached, the server will accept, but not process, one further connection. This additional connection be blocked until the number of connections being processed falls below maxConnections at which point the server will start accepting and processing new connections again. Note that once the limit has been reached, the operating system may still accept connections based on the acceptCount setting. The default value varies by connector type. For NIO and NIO2 the default is 10000. For APR/native, the default is 8192.

Note that for APR/native on Windows, the configured value will be reduced to the highest multiple of 1024 that is less than or equal to maxConnections. This is done for performance reasons. If set to a value of -1, the maxConnections feature is disabled and connections are not counted.

這裡就是tomcat對於連線數的一個控制,即最大連線數限制。一旦發現當前連線數已經超過了一定的數量(NIO預設是10000),上述的Acceptor執行緒就被阻塞了,即不再執行ServerSocketChannel的accept方法從佇列中獲取已經建立的連線。但是它並不阻止新的連線的建立,新的連線的建立過程不是Acceptor控制的,Acceptor僅僅是從佇列中獲取新建立的連線。所以當連線數已經超過maxConnections後,仍然是可以建立新的連線的,存放在上述acceptCount大小的佇列中,這個佇列裡面的連線沒有被Acceptor獲取,就處於連線建立了但是不被處理的狀態。當連線數低於maxConnections之後,Acceptor執行緒就不再阻塞,繼續呼叫ServerSocketChannel的accept方法從acceptCount大小的佇列中繼續獲取新的連線,之後就開始處理這些新的連線的IO事件了

1.4 maxThreads

文件描述如下

The maximum number of request processing threads to be created by this Connector, which therefore determines the maximum number of simultaneous requests that can be handled. If not specified, this attribute is set to 200. If an executor is associated with this connector, this attribute is ignored as the connector will execute tasks using the executor rather than an internal thread pool.

這個簡單理解就算是上述worker的執行緒數,下面會詳細的說明。他們專門用於處理IO事件,預設是200。

2 tomcat的NioEndpoint

上面引數僅僅是簡單瞭解了下引數配置,下面我們就來詳細研究下tomcat的NIO伺服器具體情況,這就要詳細瞭解下tomcat的NioEndpoint實現了

輸入圖片說明

這張圖勾畫出了NioEndpoint的大致執行流程圖,worker執行緒並沒有體現出來,它是作為一個執行緒池不斷的執行IO讀寫事件即SocketProcessor(一個Runnable),即這裡的Poller僅僅監聽Socket的IO事件,然後封裝成一個個的SocketProcessor交給worker執行緒池來處理。下面我們來詳細的介紹下NioEndpoint中的Acceptor、Poller、SocketProcessor

2.1 Acceptor

2.1.1 初始化過程

獲取指定的Acceptor數量的執行緒

protected final void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new Acceptor[count];

    for (int i = 0; i < count; i++) {
        acceptors[i] = createAcceptor();
        String threadName = getName() + "-Acceptor-" + i;
        acceptors[i].setThreadName(threadName);
        Thread t = new Thread(acceptors[i], threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}

2.1.2 Acceptor的run方法

protected classAcceptorextendsAbstractEndpoint.Acceptor {

    @Override
    public void run() {

        int errorDelay = 0;

        // Loop until we receive a shutdown command
        while (running) {

            // Loop if endpoint is paused
            while (paused && running) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // Ignore
                }
            }

            if (!running) {
                break;
            }
            state = AcceptorState.RUNNING;

            try {
                //if we have reached max connections, wait
                countUpOrAwaitConnection();

                SocketChannel socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // socket
                    socket = serverSock.accept();
                } catch (IOException ioe) {
                    //we didn't get a socket
                    countDownConnection();
                    // Introduce delay if necessary
                    errorDelay = handleExceptionWithDelay(errorDelay);
                    // re-throw
                    throw ioe;
                }
                // Successful accept, reset the error delay
                errorDelay = 0;

                // setSocketOptions() will add channel to the poller
                // if successful
                if (running && !paused) {
                    if (!setSocketOptions(socket)) {
                        countDownConnection();
                        closeSocket(socket);
                    }
                } else {
                    countDownConnection();
                    closeSocket(socket);
                }
            } catch (SocketTimeoutException sx) {
                // Ignore: Normal condition
            } catch (IOException x) {
                if (running) {
                    log.error(sm.getString("endpoint.accept.fail"), x);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("endpoint.accept.fail"), t);
            }
        }
        state = AcceptorState.ENDED;
    }
}

可以看到就是一個while迴圈,迴圈裡面不斷的accept新的連線。

2.1.3 countUpOrAwaitConnection

先來看下在accept新的連線之前,首選進行連線數的自增,即countUpOrAwaitConnection

protected voidcountUpOrAwaitConnection()throws InterruptedException {
    if (maxConnections==-1) return;
    LimitLatch latch = connectionLimitLatch;
    if (latch!=null) latch.countUpOrAwait();
}

當我們設定maxConnections=-1的時候就表示不用限制最大連線數。預設是限制10000,如果不限制則一旦出現大的衝擊,則tomcat很有可能直接掛掉,導致服務停止。

這裡的需求就是當前連線數一旦超過最大連線數maxConnections,就直接阻塞了,一旦當前連線數小於最大連線數maxConnections,就不再阻塞,我們來看下這個功能的具體實現latch.countUpOrAwait()

具體看這個需求無非就是一個共享鎖,來看具體實現:

LimitLatch實現

目前實現裡算是使用了2個鎖,LimitLatch本身的AQS實現再加上AtomicLong的AQS實現。也可以不使用AtomicLong來實現。

共享鎖的tryAcquireShared實現中,如果不依託AtomicLong,則需要進行for迴圈加CAS的自增,自增之後沒有超過limit這裡即maxConnections,則直接返回1表示獲取到了共享鎖,如果一旦超過limit則首先進行for迴圈加CAS的自減,然後返回-1表示獲取鎖失敗,便進入加入同步佇列進入阻塞狀態。

共享鎖的tryReleaseShared實現中,該方法可能會被併發執行,所以釋放共享鎖的時候也是需要for迴圈加CAS的自減

上述的for迴圈加CAS的自增、for迴圈加CAS的自減的實現全部被替換成了AtomicLong的incrementAndGet和decrementAndGet而已。

上文我們關注的latch.countUpOrAwait()方法其實就是在獲取一個共享鎖,如下:

/**
 * Acquires a shared latch if one is available or waits for one if no shared
 * latch is current available.
 * @throws InterruptedException If the current thread is interrupted
 */
public void countUpOrAwait() throws InterruptedException {
    if (log.isDebugEnabled()) {
        log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
    }
    sync.acquireSharedInterruptibly(1);
}

2.1.4 連線的處理

從上面可以看到在真正獲取一個連線之前,首先是把連線計數先自增了。一旦TCP三次握手成功連線建立,就能從ServerSocketChannel的accept方法中獲取到新的連線了。一旦獲取連線或者處理過程發生異常則需要將當前連線數自減的,否則會造成連線數虛高,即當前連線數並沒有那麼多,但是當前連線數卻很大,一旦超過最大連線數,就導致其他請求全部阻塞,沒有辦法被ServerSocketChannel的accept處理。該bug在Tomcat7.0.26版本中出現了,詳細見這裡的一篇文章Tomcat7.0.26的連線數控制bug的問題排查

然後我們來看下,一個SocketChannel連線被accept獲取之後如何來處理的呢?

protected booleansetSocketOptions(SocketChannel socket){
    // Process the connection
    try {
        //disable blocking, APR style, we are gonna be polling it
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);

        NioChannel channel = nioChannels.pop();
        if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
            } else {
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            channel.setIOChannel(socket);
            channel.reset();
        }
        getPoller0().register(channel);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        try {
            log.error("",t);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(t);
        }
        // Tell to close the socket
        return false;
    }
    return true;
}

處理過程如下:

  • 設定非阻塞,以及其他的一些引數如SoTimeout、ReceiveBufferSize、SendBufferSize

  • 然後將SocketChannel封裝成一個NioChannel,封裝過程使用了快取,即避免了重複建立NioChannel物件,直接利用原有的NioChannel,並將NioChannel中的資料全部清空。也正是這個快取也造成了一次bug,詳見斷網故障時Mtop觸發tomcat高併發場景下的BUG排查和修復(已被apache採納)

  • 選擇一個Poller進行註冊

下面就來詳細介紹下Poller

2.2 Poller

2.2.1 初始化過程

// Start poller threads
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
    pollers[i] = new Poller();
    Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
    pollerThread.setPriority(threadPriority);
    pollerThread.setDaemon(true);
    pollerThread.start();
}

前面沒有說到Poller的數量控制,來看下

/** * Poller thread count. */
private int pollerThreadCount = Math.min(2,Runtime.getRuntime().availableProcessors());
publicvoidsetPollerThreadCount(int pollerThreadCount){ this.pollerThreadCount = pollerThreadCount; }
publicintgetPollerThreadCount(){ return pollerThreadCount; }

如果不設定的話最大就是2

2.2.2 Poller註冊SocketChannel

來詳細看下getPoller0().register(channel):

public Poller getPoller0(){
    int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
    return pollers[idx];
}

就是輪訓一個Poller來進行SocketChannel的註冊

/** * Registers a newly created socket with the poller. * * @param socket The newly created socket */
publicvoidregister(final NioChannel socket){
    socket.setPoller(this);
    NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
    socket.setSocketWrapper(ka);
    ka.setPoller(this);
    ka.setReadTimeout(getSocketProperties().getSoTimeout());
    ka.setWriteTimeout(getSocketProperties().getSoTimeout());
    ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
    ka.setSecure(isSSLEnabled());
    ka.setReadTimeout(getSoTimeout());
    ka.setWriteTimeout(getSoTimeout());
    PollerEvent r = eventCache.pop();
    ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
    if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
    else r.reset(socket,ka,OP_REGISTER);
    addEvent(r);
}

privatevoidaddEvent(PollerEvent event){
    events.offer(event);
    if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}

private final SynchronizedQueue<PollerEvent> events =
            new SynchronizedQueue<>();

這裡又是進行一些引數包裝,將socket和Poller的關係繫結,再次從快取中取出或者重新構建一個PollerEvent,然後將該event放到Poller的事件佇列中等待被非同步處理

2.2.3 Poller的run方法

在Poller的run方法中不斷處理上述事件佇列中的事件,直接執行PollerEvent的run方法,將SocketChannel註冊到自己的Selector上。

public boolean events() {
    boolean result = false;

    PollerEvent pe = null;
    while ( (pe = events.poll()) != null ) {
        result = true;
        try {
            pe.run();
            pe.reset();
            if (running && !paused) {
                eventCache.push(pe);
            }
        } catch ( Throwable x ) {
            log.error("",x);
        }
    }

    return result;
}

並將Selector監聽到的IO讀寫事件封裝成SocketProcessor,交給執行緒池執行

SocketProcessor sc = processorCache.pop();
if ( sc == null ) sc = new SocketProcessor(attachment, status);
else sc.reset(attachment, status);
Executor executor = getExecutor();
if (dispatch && executor != null) {
    executor.execute(sc);
} else {
    sc.run();
}

我們來看看這個執行緒池的初始化:

public void createExecutor() {
    internalExecutor = true;
    TaskQueue taskqueue = new TaskQueue();
    TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
    executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
    taskqueue.setParent( (ThreadPoolExecutor) executor);
}

就是建立了一個ThreadPoolExecutor,那我們就重點關注下核心執行緒數、最大執行緒數、任務佇列等資訊

private int minSpareThreads = 10;
public intgetMinSpareThreads(){
    return Math.min(minSpareThreads,getMaxThreads());
}

核心執行緒數最大是10個,再來看下最大執行緒數

private int maxThreads = 200;

預設就是上面的配置引數maxThreads為200。還有就是TaskQueue,這裡的TaskQueue是LinkedBlockingQueue<Runnable>的子類,最大容量就是Integer.MAX_VALUE,根據之前ThreadPoolExecutor的原始碼分析,核心執行緒數滿了之後,會先將任務放到佇列中,佇列滿了才會創建出新的非核心執行緒,如果佇列是一個大容量的話,也就是不會到建立新的非核心執行緒那一步了。

但是這裡的TaskQueue修改了底層offer的實現

public booleanoffer(Runnable o){
  //we can't do any checks
    if (parent==null) returnsuper.offer(o);
    //we are maxed out on threads, simply queue the object
    if (parent.getPoolSize() == parent.getMaximumPoolSize()) returnsuper.offer(o);
    //we have idle threads, just add it to the queue
    if (parent.getSubmittedCount()<(parent.getPoolSize())) returnsuper.offer(o);
    //if we have less threads than maximum force creation of a new thread
    if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
    //if we reached here, we need to add it to the queue
    returnsuper.offer(o);
}

這裡當執行緒數小於最大執行緒數的時候就直接返回false即入佇列失敗,則迫使ThreadPoolExecutor創建出新的非核心執行緒。

TaskQueue這一塊沒太看懂它的意圖是什麼,有待繼續研究。

3 結束語

本篇文章描述了tomcat8.5中的NIO執行緒模型,以及其中涉及到的相關引數的設定。