1. 程式人生 > >Tomcat原始碼閱讀之閉鎖的實現與連線數量的控制

Tomcat原始碼閱讀之閉鎖的實現與連線數量的控制

嗯,今天其實在看HtttpProcessor的實現,但是突然想到了以前在看poller的時候看到了有閉鎖,用於控制當前connector的連線數量,嗯,那就順便把這部分來看了。。。

在Tomcat中,通過繼承AbstractQueuedSynchronizer來實現了自己的同步工具,進而來實現了一個用於控制連線數量的閉鎖。。LimitLatch。。

這裡就需對AbstractQueuedSynchronizer有一些初步的瞭解。。。

首先它concurrent類庫中提供的一個用於構建自己的同步工具的一個工具類。。可以通過繼承他來快速的完成一個同步類的實現

(1)acquireSharedInterruptibly()方法,用於以共享的方式來獲取鎖,如果暫時無法獲取,將會將執行緒掛起到佇列,進行阻塞,對於這個方法是否最終能獲取鎖,是通過tryAcquireShared()方法的返回來定義的,這個方法需要自己實現。。。如果能獲取鎖,那麼返回1,否則返回-1.。。

(2)releaseShared()方法。以共享的方法釋放一個鎖,這樣前面提到的掛起的執行緒將會喚醒,進而重新嘗試獲取鎖。。。

好啦,接下來就來看看LimitLatch的定義吧,直接上程式碼好了,。,。程式碼還是很簡單的。。

//其實是通過AbstractQueuedSynchronizer來構建的
public class LimitLatch {

    private static final Log log = LogFactory.getLog(LimitLatch.class);

    //構建Sync型別,實現基本的同步,以及阻塞。。
    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }

        @Override
        //用於增加計數,如果計數增加之後小於最大的,那麼返回1,不會阻塞,否則將會返回-1阻塞
        protected int tryAcquireShared(int ignored) {  //呼叫acquiredShared方法的時候會呼叫這個方法來返回狀態,如果返回1,那麼表示獲取成功,返回-1表示獲取失敗,將會阻塞
            long newCount = count.incrementAndGet();  //先增加計數
            if (!released && newCount > limit) {  //如果當前已經超過了最大的限制
                // Limit exceeded
                count.decrementAndGet();  //減少計數
                return -1;  //返回-1,將阻塞當前執行緒
            } else {
                return 1;
            }
        }

        @Override
        //用於減少計數
        protected boolean tryReleaseShared(int arg) {
            count.decrementAndGet();
            return true;
        }
    }

    private final Sync sync;  //同步物件
    private final AtomicLong count;  //計數器
    private volatile long limit;  //最大的數量
    private volatile boolean released = false;   //是否全部釋放

    /**
     * Instantiates a LimitLatch object with an initial limit.
     * @param limit - maximum number of concurrent acquisitions of this latch
     */
    public LimitLatch(long limit) {
        this.limit = limit;  //最大限制
        this.count = new AtomicLong(0);
        this.sync = new Sync();  //sync 物件
    }

    /**
     * Returns the current count for the latch
     * @return the current count for latch
     */
    public long getCount() {
        return count.get();
    }

    /**
     * Obtain the current limit.
     */
    public long getLimit() {
        return limit;
    }


    /**
     * Sets a new limit. If the limit is decreased there may be a period where
     * more shares of the latch are acquired than the limit. In this case no
     * more shares of the latch will be issued until sufficient shares have been
     * returned to reduce the number of acquired shares of the latch to below
     * the new limit. If the limit is increased, threads currently in the queue
     * may not be issued one of the newly available shares until the next
     * request is made for a latch.
     *
     * @param limit The new limit
     */
    public void setLimit(long limit) {
        this.limit = limit;
    }


    /**
     * Acquires a shared latch if one is available or waits for one if no shared
     * latch is current available.
     */
    //增加計數,如果太大,那麼等等待
    public void countUpOrAwait() throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
        }
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * Releases a shared latch, making it available for another thread to use.
     * @return the previous counter value
     */
    //減少計數
    public long countDown() {
        sync.releaseShared(0);  //釋放
        long result = getCount();
        if (log.isDebugEnabled()) {
            log.debug("Counting down["+Thread.currentThread().getName()+"] latch="+result);
    }
        return result;
    }

    /**
     * Releases all waiting threads and causes the {@link #limit} to be ignored
     * until {@link #reset()} is called.
     */
    //通過將released設定為true,將會釋放所有的執行緒,知道reset了
    public boolean releaseAll() {
        released = true;
        return sync.releaseShared(0);
    }

    /**
     * Resets the latch and initializes the shared acquisition counter to zero.
     * @see #releaseAll()
     */
    //重製
    public void reset() {
        this.count.set(0);
        released = false;
    }

    /**
     * Returns <code>true</code> if there is at least one thread waiting to
     * acquire the shared lock, otherwise returns <code>false</code>.
     */
    //當前是否有執行緒等待
    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     * Provide access to the list of threads waiting to acquire this limited
     * shared latch.
     */
    //獲取所有等待的執行緒
    public Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }
}

程式碼應該還是很簡單的吧,而且註釋也算是說的比較清楚。。。其實是構建了一個繼承自AbstractQueuedSynchronizer的Sync物件,通過它來進行真正的同步功能。。。然後通過一個原子的整數計數器,和一個最大值,來判斷當前是否可以獲取鎖

好啦,這裡來看看Tomcat是如何通過LimitLatch來控制連線數量的吧,先來看看NioEndpoint的啟動方法:

    //啟動當前的endpoint
    public void startInternal() throws Exception {

        if (!running) {
            running = true;  //設定表示為,表示已經看是運行了
            paused = false;  //沒有暫停

            // Create worker collection
            if ( getExecutor() == null ) {  //如果沒有executor,那麼建立
                createExecutor();   //建立executor
            }

            initializeConnectionLatch();   //初始化閉鎖,用於控制連線的數量

            // Start poller threads
            pollers = new Poller[getPollerThreadCount()];   //根據設定的poller數量來建立poller物件的陣列
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();  // 建立poller物件
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);  // 建立相應的poller執行緒
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();  //啟動poller
            }

            startAcceptorThreads();  //啟動acceptor
        }
    }

這裡呼叫了initializeConnectionLatch方法來初始化閉鎖,來看看吧:

    //初始化閉鎖,用於控制連線的數量
    protected LimitLatch initializeConnectionLatch() {
        if (maxConnections==-1) return null;  //這個是無限的連結數量
        if (connectionLimitLatch==null) {
            connectionLimitLatch = new LimitLatch(getMaxConnections());  //根據最大的連結數量來建立
        }
        return connectionLimitLatch;
    }

我們知道在Connector的配置中可以設定最大的連結數量,其實這裡也就是通過這個數量來構建LimitLatch物件的。。。

嗯,Tomcat是從哪裡獲取連線呢,這個就要從Accecptor看了。。。

 public void run() {

            int errorDelay = 0;

            // Loop until we receive a shutdown command
            while (running) {

                // Loop if endpoint is paused
                while (paused && running) {  //如果暫停了
                    state = AcceptorState.PAUSED;  //更改當前acceptor的狀態
                    try {
                        Thread.sleep(50);  
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {  //如果沒有執行,那麼這裡直接跳過
                    break;
                }
                state = AcceptorState.RUNNING;  //設定當前acceptor的狀態是running

                try {
                    //if we have reached max connections, wait
                    countUpOrAwaitConnection();  //增減閉鎖的計數,如果connection數量已經達到了最大,那麼暫停一下,這裡用到的是connectionLimitLatch鎖,可以理解為一個閉鎖吧

                    SocketChannel socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        socket = serverSock.accept();  //呼叫serversocket的accept方法
                    } catch (IOException ioe) {
                        //we didn't get a socket
                        countDownConnection();  //出了異常,並沒有獲取連結,那麼這裡減少閉鎖的計數
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // setSocketOptions() will add channel to the poller
                    // if successful
                    if (running && !paused) {
                        if (!setSocketOptions(socket)) {  //這裡主要是將socket加入到poller物件上面去,而且還要設定引數
                            countDownConnection();  //加入poller物件失敗了的話,那麼將閉鎖的計數減低
                            closeSocket(socket);  //關閉剛剛 建立的這個socket
                        }
                    } else {
                        countDownConnection();
                        closeSocket(socket);
                    }
                } catch (SocketTimeoutException sx) {
                    // Ignore: Normal condition
                } catch (IOException x) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), x);
                    }
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        log.error("", oom);
                    }catch ( Throwable oomt ) {
                        try {
                            try {
                                System.err.println(oomParachuteMsg);
                                oomt.printStackTrace();
                            }catch (Throwable letsHopeWeDontGetHere){
                                ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                            }
                        }catch (Throwable letsHopeWeDontGetHere){
                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
                        }
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;  //設定acceptor的狀態為ended
        }

這裡讀一下Accecptor,的run方法可以知道,每次在呼叫serverSocketChannel的accept方法之前都會呼叫countUpOrAwaitConnection方法來增加閉鎖的計數,如果有問題,那就會呼叫countDownConnection方法來降低閉鎖的計數。。。

其實這裡通過這兩個方法就知道他們是幹嘛的了,先來看看countUpOrAwaitConnection吧:

    //這裡用於增加閉鎖的計數
    protected void countUpOrAwaitConnection() throws InterruptedException {
        if (maxConnections==-1) return;
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) latch.countUpOrAwait();  //增加閉鎖的counter
    }

沒啥意思吧,就是呼叫剛剛建立的閉鎖的countUpOrAwait方法,接下來來看看countDownConnection方法吧:

    //用於減少閉鎖的計數
    protected long countDownConnection() {
        if (maxConnections==-1) return -1;
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) {
            long result = latch.countDown();
            if (result<0) {
                getLog().warn("Incorrect connection count, multiple socket.close called on the same socket." );
            }
            return result;
        } else return -1;
    }

這個也沒啥意思吧。。。就是呼叫閉鎖的countDown方法。。。

嗯,到這裡整個Tomcat如何控制連線的數量就算是比較清楚了吧。。。

最後,我們知道是通過呼叫endpoint的cancelledKey方法來關閉一個連線的,來看看它的實現吧:

     //取消一個註冊
        public void cancelledKey(SelectionKey key, SocketStatus status) {
            try {
                if ( key == null ) return;//nothing to do
                KeyAttachment ka = (KeyAttachment) key.attachment();
                if (ka != null && ka.isComet() && status != null) {
                    ka.setComet(false);//to avoid a loop
                    if (status == SocketStatus.TIMEOUT ) {
                        if (processSocket(ka.getChannel(), status, true)) {
                            return; // don't close on comet timeout
                        }
                    } else {
                        // Don't dispatch if the lines below are canceling the key
                        processSocket(ka.getChannel(), status, false);
                    }
                }
                key.attach(null);  //將附件設定為null
                if (ka!=null) handler.release(ka);  //可以取消這個attachment了
                else handler.release((SocketChannel)key.channel());
                if (key.isValid()) key.cancel();  //取消key
                if (key.channel().isOpen()) {  //如果channel還是開啟的,那麼需要關閉channel
                    try {
                        key.channel().close();
                    } catch (Exception e) {
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString(
                                    "endpoint.debug.channelCloseFail"), e);
                        }
                    }
                }
                try {
                    if (ka!=null) {
                        ka.getSocket().close(true); //關閉sockt
                    }
                } catch (Exception e){
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString(
                                "endpoint.debug.socketCloseFail"), e);
                    }
                }
                try {
                    if (ka != null && ka.getSendfileData() != null
                            && ka.getSendfileData().fchannel != null
                            && ka.getSendfileData().fchannel.isOpen()) {
                        ka.getSendfileData().fchannel.close();
                    }
                } catch (Exception ignore) {
                }
                if (ka!=null) {
                    ka.reset();
                    countDownConnection();  //降低用於維護連線數量的閉鎖
                }
            } catch (Throwable e) {
                ExceptionUtils.handleThrowable(e);
                if (log.isDebugEnabled()) log.error("",e);
            }
        }

這裡可以看到呼叫了countDownConnection方法來降低閉鎖的計數。。

最後總結:Tomcat通過在acceptor中對閉鎖的獲取來控制總連線的數量,如果連線數量達到了最大的限制,那麼將會被阻塞。。直到有連線關閉為止。。。這樣acceptor的執行緒就又被喚醒了。。。

相關推薦

Tomcat原始碼閱讀閉鎖實現連線數量控制

嗯,今天其實在看HtttpProcessor的實現,但是突然想到了以前在看poller的時候看到了有閉鎖,用於控制當前connector的連線數量,嗯,那就順便把這部分來看了。。。 在Tomcat中,通過繼承AbstractQueuedSynchronizer來實現了自己的

String原始碼閱讀contains實現原理

本文將對String部分原始碼進行閱讀分析的記錄。 contains 對String中的contains方法進行分析,瞭解其採用的是什麼演算法進行匹配。 //用於判斷源字串是否包含目標字元序列 CharSequence s public bo

Tomcat原始碼閱讀初始化聯結器元件

Server元件初始化之後,接著就該初始化Service元件。 public void initialize() throws LifecycleException { initialized = true

jdk原始碼閱讀StringBufferStringBuilder

StringBuffer與StringBuilder非常類似,兩者的很多實現方式上都是一樣的,先看兩者的繼承層次 public final class StringBuilder extends AbstractStringBuilder implements java.

jdk原始碼閱讀ObjectCloneable

Object 物件 (1)clone方法,是否可以克隆 protected native Object clone() throws CloneNotSupportedException; 這個方法是個native方法,是呼叫系統底層c/c++程式碼來實現拷貝複製的,這個方法丟擲異常,Obj

Tomcat原始碼分析:ServletOutputStream的實現

貌似很久都沒有寫部落格了,tomcat8的程式碼已經看了很多,主體部分的程式碼也都看得差不多了,發現在tomcat8中已經完全支援非阻塞的方式接收以及傳送資料了。。。。但是比較遺憾的是,以前遺留下來的太多的老程式碼都不支援這種新的方式來發送資料。。。木有辦法。。。 這裡來看

Promise原始碼閱讀建構函式+then過程

前言 Promise是非同步程式設計的一種方案,ES6規範中將其寫入規範標準中,統一了用法。 考慮到瀏覽器的相容性,Vue專案中使用promise,就具體閱讀promise原始碼,看看內部的具體實現。 具體分析 通過具體例項來閱讀promise原始碼的實現,例項如下: new

Dubbo原始碼解析服務釋出註冊

準備 dubbo版本:2.5.4 Spring自定義擴充套件 dubbo 是基於 spring 配置來實現服務釋出,並基於 spring 的擴充套件機制定義了一套自定義標籤,要實現自定義擴充套件, spring 中提供了 NamespaceHandler 、BeanDefinit

Netty 原始碼閱讀初始環境搭建

推薦 netty 系列原始碼解析合集 http://www.iocoder.cn/Netty/Netty-collection/?aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3R6c18xMDQxMjE4MTI5L2FydGljbGUvZGV0YWlscy83OD

jdk原始碼閱讀——arraylist

首先看一下他的建構函式: public ArrayList() { this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA; } 其實arraylist還有其他的建構函式,可以指定陣列的長度,這裡先從最基本的入

(第一天)每日原始碼除錯旅--實現CQRS模式的AXON框架

demo檔案結構: github地址:https://github.com/zxc1210449483/axondemo 開始除錯:先在ProductController的commandGateway.sendAndWait(command)前打上端點,然後用postman傳送PO

netty原始碼閱讀效能優化工具類Recycle異執行緒獲取物件

在這篇《netty原始碼閱讀之效能優化工具類之Recycler獲取物件》文章裡面,我們還有一個scavenge()方法沒有解析,也就是在別的執行緒裡面回收物件。下面我們開始介紹,從這個方法開始進入: boolean scavenge() { // con

我的原始碼閱讀路:redux原始碼剖析

前言 用過react的小夥伴對redux其實並不陌生,基本大多數的React應用用到它。一般大家用redux的時候基本都不會單獨去使用它,而是配合react-redux一起去使用。剛學習redux的時候很容易弄混淆redux和react-redux,以為他倆是同一個

netty原始碼閱讀解碼值基於固定長度解碼器分析

固定長度解碼器FixedLengthFrameDecoder比較簡單,我們看下它類的註釋: /** * A decoder that splits the received {@link ByteBuf}s by the fixed number * of bytes.

netty原始碼閱讀解碼基於長度域解碼器引數分析

這篇文章我們放鬆一點,只分析基於長度域解碼器的幾個引數, lengthFieldOffset :長度域的偏移量,也就是長度域要從什麼地方開始 lengthFieldLength:長度域的長度,也就是長度域佔多少個位元組 lengthAdjustment:長度域的值的調整

netty原始碼閱讀解碼基於長度域解碼器分析

基於長度域解碼器LengthFieldBasedFrameDecoder我們主要分析以下三點: 1、計算需要抽取的資料包的長度 2、跳過位元組邏輯處理 3、丟棄模式下的處理 首先原始碼還是LengthFieldBasedFrameDecoder的decode方法:

netty原始碼閱讀編碼MessageToByteEncoder

MessageToByteEncoder的write過程,我們分析以下幾步: 1、匹配物件 2、分配記憶體 3、編碼實現 4、釋放物件 5、傳播資料 6、釋放記憶體 原始碼在這裡: @Override public void write(Cha

netty原始碼閱讀效能優化工具類FastThreadLocal的使用

先說明FastThreadLocal使用的效果。 1、比jdk原生的ThreadLocal的快 2、不同執行緒之間能保證執行緒安全 這是我們的使用者程式碼: public class FastThreadLocalTest { private static F

netty原始碼閱讀效能優化工具類FastThreadLocal的建立

建立的話我們直接從FastThreadLocal的構造方法進入: public FastThreadLocal() { index = InternalThreadLocalMap.nextVariableIndex(); } 可見他是現

JAVA原始碼閱讀java.util—List

List List被宣告為一個介面,程式碼量很少,只聲明瞭方法。 public interface List<E> extends Collection<E> { int size(); boolean isEmpty(); boo