Tomcat原始碼閱讀之閉鎖的實現與連線數量的控制
嗯,今天其實在看HtttpProcessor的實現,但是突然想到了以前在看poller的時候看到了有閉鎖,用於控制當前connector的連線數量,嗯,那就順便把這部分來看了。。。
在Tomcat中,通過繼承AbstractQueuedSynchronizer來實現了自己的同步工具,進而來實現了一個用於控制連線數量的閉鎖。。LimitLatch。。
這裡就需對AbstractQueuedSynchronizer有一些初步的瞭解。。。
首先它concurrent類庫中提供的一個用於構建自己的同步工具的一個工具類。。可以通過繼承他來快速的完成一個同步類的實現
(1)acquireSharedInterruptibly()方法,用於以共享的方式來獲取鎖,如果暫時無法獲取,將會將執行緒掛起到佇列,進行阻塞,對於這個方法是否最終能獲取鎖,是通過tryAcquireShared()方法的返回來定義的,這個方法需要自己實現。。。如果能獲取鎖,那麼返回1,否則返回-1.。。
(2)releaseShared()方法。以共享的方法釋放一個鎖,這樣前面提到的掛起的執行緒將會喚醒,進而重新嘗試獲取鎖。。。
好啦,接下來就來看看LimitLatch的定義吧,直接上程式碼好了,。,。程式碼還是很簡單的。。
//其實是通過AbstractQueuedSynchronizer來構建的 public class LimitLatch { private static final Log log = LogFactory.getLog(LimitLatch.class); //構建Sync型別,實現基本的同步,以及阻塞。。 private class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1L; public Sync() { } @Override //用於增加計數,如果計數增加之後小於最大的,那麼返回1,不會阻塞,否則將會返回-1阻塞 protected int tryAcquireShared(int ignored) { //呼叫acquiredShared方法的時候會呼叫這個方法來返回狀態,如果返回1,那麼表示獲取成功,返回-1表示獲取失敗,將會阻塞 long newCount = count.incrementAndGet(); //先增加計數 if (!released && newCount > limit) { //如果當前已經超過了最大的限制 // Limit exceeded count.decrementAndGet(); //減少計數 return -1; //返回-1,將阻塞當前執行緒 } else { return 1; } } @Override //用於減少計數 protected boolean tryReleaseShared(int arg) { count.decrementAndGet(); return true; } } private final Sync sync; //同步物件 private final AtomicLong count; //計數器 private volatile long limit; //最大的數量 private volatile boolean released = false; //是否全部釋放 /** * Instantiates a LimitLatch object with an initial limit. * @param limit - maximum number of concurrent acquisitions of this latch */ public LimitLatch(long limit) { this.limit = limit; //最大限制 this.count = new AtomicLong(0); this.sync = new Sync(); //sync 物件 } /** * Returns the current count for the latch * @return the current count for latch */ public long getCount() { return count.get(); } /** * Obtain the current limit. */ public long getLimit() { return limit; } /** * Sets a new limit. If the limit is decreased there may be a period where * more shares of the latch are acquired than the limit. In this case no * more shares of the latch will be issued until sufficient shares have been * returned to reduce the number of acquired shares of the latch to below * the new limit. If the limit is increased, threads currently in the queue * may not be issued one of the newly available shares until the next * request is made for a latch. * * @param limit The new limit */ public void setLimit(long limit) { this.limit = limit; } /** * Acquires a shared latch if one is available or waits for one if no shared * latch is current available. */ //增加計數,如果太大,那麼等等待 public void countUpOrAwait() throws InterruptedException { if (log.isDebugEnabled()) { log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount()); } sync.acquireSharedInterruptibly(1); } /** * Releases a shared latch, making it available for another thread to use. * @return the previous counter value */ //減少計數 public long countDown() { sync.releaseShared(0); //釋放 long result = getCount(); if (log.isDebugEnabled()) { log.debug("Counting down["+Thread.currentThread().getName()+"] latch="+result); } return result; } /** * Releases all waiting threads and causes the {@link #limit} to be ignored * until {@link #reset()} is called. */ //通過將released設定為true,將會釋放所有的執行緒,知道reset了 public boolean releaseAll() { released = true; return sync.releaseShared(0); } /** * Resets the latch and initializes the shared acquisition counter to zero. * @see #releaseAll() */ //重製 public void reset() { this.count.set(0); released = false; } /** * Returns <code>true</code> if there is at least one thread waiting to * acquire the shared lock, otherwise returns <code>false</code>. */ //當前是否有執行緒等待 public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** * Provide access to the list of threads waiting to acquire this limited * shared latch. */ //獲取所有等待的執行緒 public Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } }
程式碼應該還是很簡單的吧,而且註釋也算是說的比較清楚。。。其實是構建了一個繼承自AbstractQueuedSynchronizer的Sync物件,通過它來進行真正的同步功能。。。然後通過一個原子的整數計數器,和一個最大值,來判斷當前是否可以獲取鎖
好啦,這裡來看看Tomcat是如何通過LimitLatch來控制連線數量的吧,先來看看NioEndpoint的啟動方法:
//啟動當前的endpoint public void startInternal() throws Exception { if (!running) { running = true; //設定表示為,表示已經看是運行了 paused = false; //沒有暫停 // Create worker collection if ( getExecutor() == null ) { //如果沒有executor,那麼建立 createExecutor(); //建立executor } initializeConnectionLatch(); //初始化閉鎖,用於控制連線的數量 // Start poller threads pollers = new Poller[getPollerThreadCount()]; //根據設定的poller數量來建立poller物件的陣列 for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); // 建立poller物件 Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); // 建立相應的poller執行緒 pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); //啟動poller } startAcceptorThreads(); //啟動acceptor } }
這裡呼叫了initializeConnectionLatch方法來初始化閉鎖,來看看吧:
//初始化閉鎖,用於控制連線的數量
protected LimitLatch initializeConnectionLatch() {
if (maxConnections==-1) return null; //這個是無限的連結數量
if (connectionLimitLatch==null) {
connectionLimitLatch = new LimitLatch(getMaxConnections()); //根據最大的連結數量來建立
}
return connectionLimitLatch;
}
我們知道在Connector的配置中可以設定最大的連結數量,其實這裡也就是通過這個數量來構建LimitLatch物件的。。。
嗯,Tomcat是從哪裡獲取連線呢,這個就要從Accecptor看了。。。
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused && running) { //如果暫停了
state = AcceptorState.PAUSED; //更改當前acceptor的狀態
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) { //如果沒有執行,那麼這裡直接跳過
break;
}
state = AcceptorState.RUNNING; //設定當前acceptor的狀態是running
try {
//if we have reached max connections, wait
countUpOrAwaitConnection(); //增減閉鎖的計數,如果connection數量已經達到了最大,那麼暫停一下,這裡用到的是connectionLimitLatch鎖,可以理解為一個閉鎖吧
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
// socket
socket = serverSock.accept(); //呼叫serversocket的accept方法
} catch (IOException ioe) {
//we didn't get a socket
countDownConnection(); //出了異常,並沒有獲取連結,那麼這裡減少閉鎖的計數
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
}
// Successful accept, reset the error delay
errorDelay = 0;
// setSocketOptions() will add channel to the poller
// if successful
if (running && !paused) {
if (!setSocketOptions(socket)) { //這裡主要是將socket加入到poller物件上面去,而且還要設定引數
countDownConnection(); //加入poller物件失敗了的話,那麼將閉鎖的計數減低
closeSocket(socket); //關閉剛剛 建立的這個socket
}
} else {
countDownConnection();
closeSocket(socket);
}
} catch (SocketTimeoutException sx) {
// Ignore: Normal condition
} catch (IOException x) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), x);
}
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);
}catch ( Throwable oomt ) {
try {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED; //設定acceptor的狀態為ended
}
這裡讀一下Accecptor,的run方法可以知道,每次在呼叫serverSocketChannel的accept方法之前都會呼叫countUpOrAwaitConnection方法來增加閉鎖的計數,如果有問題,那就會呼叫countDownConnection方法來降低閉鎖的計數。。。
其實這裡通過這兩個方法就知道他們是幹嘛的了,先來看看countUpOrAwaitConnection吧:
//這裡用於增加閉鎖的計數
protected void countUpOrAwaitConnection() throws InterruptedException {
if (maxConnections==-1) return;
LimitLatch latch = connectionLimitLatch;
if (latch!=null) latch.countUpOrAwait(); //增加閉鎖的counter
}
沒啥意思吧,就是呼叫剛剛建立的閉鎖的countUpOrAwait方法,接下來來看看countDownConnection方法吧:
//用於減少閉鎖的計數
protected long countDownConnection() {
if (maxConnections==-1) return -1;
LimitLatch latch = connectionLimitLatch;
if (latch!=null) {
long result = latch.countDown();
if (result<0) {
getLog().warn("Incorrect connection count, multiple socket.close called on the same socket." );
}
return result;
} else return -1;
}
這個也沒啥意思吧。。。就是呼叫閉鎖的countDown方法。。。
嗯,到這裡整個Tomcat如何控制連線的數量就算是比較清楚了吧。。。
最後,我們知道是通過呼叫endpoint的cancelledKey方法來關閉一個連線的,來看看它的實現吧:
//取消一個註冊
public void cancelledKey(SelectionKey key, SocketStatus status) {
try {
if ( key == null ) return;//nothing to do
KeyAttachment ka = (KeyAttachment) key.attachment();
if (ka != null && ka.isComet() && status != null) {
ka.setComet(false);//to avoid a loop
if (status == SocketStatus.TIMEOUT ) {
if (processSocket(ka.getChannel(), status, true)) {
return; // don't close on comet timeout
}
} else {
// Don't dispatch if the lines below are canceling the key
processSocket(ka.getChannel(), status, false);
}
}
key.attach(null); //將附件設定為null
if (ka!=null) handler.release(ka); //可以取消這個attachment了
else handler.release((SocketChannel)key.channel());
if (key.isValid()) key.cancel(); //取消key
if (key.channel().isOpen()) { //如果channel還是開啟的,那麼需要關閉channel
try {
key.channel().close();
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString(
"endpoint.debug.channelCloseFail"), e);
}
}
}
try {
if (ka!=null) {
ka.getSocket().close(true); //關閉sockt
}
} catch (Exception e){
if (log.isDebugEnabled()) {
log.debug(sm.getString(
"endpoint.debug.socketCloseFail"), e);
}
}
try {
if (ka != null && ka.getSendfileData() != null
&& ka.getSendfileData().fchannel != null
&& ka.getSendfileData().fchannel.isOpen()) {
ka.getSendfileData().fchannel.close();
}
} catch (Exception ignore) {
}
if (ka!=null) {
ka.reset();
countDownConnection(); //降低用於維護連線數量的閉鎖
}
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
if (log.isDebugEnabled()) log.error("",e);
}
}
這裡可以看到呼叫了countDownConnection方法來降低閉鎖的計數。。
最後總結:Tomcat通過在acceptor中對閉鎖的獲取來控制總連線的數量,如果連線數量達到了最大的限制,那麼將會被阻塞。。直到有連線關閉為止。。。這樣acceptor的執行緒就又被喚醒了。。。
相關推薦
Tomcat原始碼閱讀之閉鎖的實現與連線數量的控制
嗯,今天其實在看HtttpProcessor的實現,但是突然想到了以前在看poller的時候看到了有閉鎖,用於控制當前connector的連線數量,嗯,那就順便把這部分來看了。。。 在Tomcat中,通過繼承AbstractQueuedSynchronizer來實現了自己的
String原始碼閱讀之contains實現原理
本文將對String部分原始碼進行閱讀分析的記錄。 contains 對String中的contains方法進行分析,瞭解其採用的是什麼演算法進行匹配。 //用於判斷源字串是否包含目標字元序列 CharSequence s public bo
Tomcat原始碼閱讀之初始化聯結器元件
Server元件初始化之後,接著就該初始化Service元件。 public void initialize() throws LifecycleException { initialized = true
jdk原始碼閱讀之StringBuffer與StringBuilder
StringBuffer與StringBuilder非常類似,兩者的很多實現方式上都是一樣的,先看兩者的繼承層次 public final class StringBuilder extends AbstractStringBuilder implements java.
jdk原始碼閱讀之Object與Cloneable
Object 物件 (1)clone方法,是否可以克隆 protected native Object clone() throws CloneNotSupportedException; 這個方法是個native方法,是呼叫系統底層c/c++程式碼來實現拷貝複製的,這個方法丟擲異常,Obj
Tomcat原始碼分析之:ServletOutputStream的實現
貌似很久都沒有寫部落格了,tomcat8的程式碼已經看了很多,主體部分的程式碼也都看得差不多了,發現在tomcat8中已經完全支援非阻塞的方式接收以及傳送資料了。。。。但是比較遺憾的是,以前遺留下來的太多的老程式碼都不支援這種新的方式來發送資料。。。木有辦法。。。 這裡來看
Promise原始碼閱讀之建構函式+then過程
前言 Promise是非同步程式設計的一種方案,ES6規範中將其寫入規範標準中,統一了用法。 考慮到瀏覽器的相容性,Vue專案中使用promise,就具體閱讀promise原始碼,看看內部的具體實現。 具體分析 通過具體例項來閱讀promise原始碼的實現,例項如下: new
Dubbo原始碼解析之服務釋出與註冊
準備 dubbo版本:2.5.4 Spring自定義擴充套件 dubbo 是基於 spring 配置來實現服務釋出,並基於 spring 的擴充套件機制定義了一套自定義標籤,要實現自定義擴充套件, spring 中提供了 NamespaceHandler 、BeanDefinit
Netty 原始碼閱讀之初始環境搭建
推薦 netty 系列原始碼解析合集 http://www.iocoder.cn/Netty/Netty-collection/?aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3R6c18xMDQxMjE4MTI5L2FydGljbGUvZGV0YWlscy83OD
jdk原始碼閱讀之——arraylist
首先看一下他的建構函式: public ArrayList() { this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA; } 其實arraylist還有其他的建構函式,可以指定陣列的長度,這裡先從最基本的入
(第一天)每日原始碼除錯之旅--實現CQRS模式的AXON框架
demo檔案結構: github地址:https://github.com/zxc1210449483/axondemo 開始除錯:先在ProductController的commandGateway.sendAndWait(command)前打上端點,然後用postman傳送PO
netty原始碼閱讀之效能優化工具類之Recycle異執行緒獲取物件
在這篇《netty原始碼閱讀之效能優化工具類之Recycler獲取物件》文章裡面,我們還有一個scavenge()方法沒有解析,也就是在別的執行緒裡面回收物件。下面我們開始介紹,從這個方法開始進入: boolean scavenge() { // con
我的原始碼閱讀之路:redux原始碼剖析
前言 用過react的小夥伴對redux其實並不陌生,基本大多數的React應用用到它。一般大家用redux的時候基本都不會單獨去使用它,而是配合react-redux一起去使用。剛學習redux的時候很容易弄混淆redux和react-redux,以為他倆是同一個
netty原始碼閱讀之解碼值基於固定長度解碼器分析
固定長度解碼器FixedLengthFrameDecoder比較簡單,我們看下它類的註釋: /** * A decoder that splits the received {@link ByteBuf}s by the fixed number * of bytes.
netty原始碼閱讀之解碼之基於長度域解碼器引數分析
這篇文章我們放鬆一點,只分析基於長度域解碼器的幾個引數, lengthFieldOffset :長度域的偏移量,也就是長度域要從什麼地方開始 lengthFieldLength:長度域的長度,也就是長度域佔多少個位元組 lengthAdjustment:長度域的值的調整
netty原始碼閱讀之解碼之基於長度域解碼器分析
基於長度域解碼器LengthFieldBasedFrameDecoder我們主要分析以下三點: 1、計算需要抽取的資料包的長度 2、跳過位元組邏輯處理 3、丟棄模式下的處理 首先原始碼還是LengthFieldBasedFrameDecoder的decode方法:
netty原始碼閱讀之編碼之MessageToByteEncoder
MessageToByteEncoder的write過程,我們分析以下幾步: 1、匹配物件 2、分配記憶體 3、編碼實現 4、釋放物件 5、傳播資料 6、釋放記憶體 原始碼在這裡: @Override public void write(Cha
netty原始碼閱讀之效能優化工具類之FastThreadLocal的使用
先說明FastThreadLocal使用的效果。 1、比jdk原生的ThreadLocal的快 2、不同執行緒之間能保證執行緒安全 這是我們的使用者程式碼: public class FastThreadLocalTest { private static F
netty原始碼閱讀之效能優化工具類之FastThreadLocal的建立
建立的話我們直接從FastThreadLocal的構造方法進入: public FastThreadLocal() { index = InternalThreadLocalMap.nextVariableIndex(); } 可見他是現
JAVA原始碼閱讀之java.util—List
List List被宣告為一個介面,程式碼量很少,只聲明瞭方法。 public interface List<E> extends Collection<E> { int size(); boolean isEmpty(); boo