異步化,高並發大殺器
1.同步和異步,阻塞和非阻塞
同步和異步,阻塞和非阻塞, 這個幾個詞已經是老生常談,當時常常還是有很多同學分不清楚,以為同步肯定就是阻塞,異步肯定就是非阻塞,其他他們不是一回事。
同步和異步關註的是結果消息的通信機制
同步:同步的意思就是調用方需要主動等待結果的返回
異步:異步的意思就是不需要主動等待結果的返回,而是通過其他手段比如,狀態通知,回調函數等。
阻塞和非阻塞主要關註的是等待結果返回調用方的狀態
阻塞:是指結果返回之前,當前線程被掛起,不做任何事
非阻塞:是指結果在返回之前,線程可以做一些其他事,不會被掛起。
可以看見同步和異步,阻塞和非阻塞主要關註的點不同,有人會問同步還能非阻塞,異步還能阻塞?當然是可以的,下面為了更好的說明他們的組合之間的意思,用幾個簡單的例子說明: 1.同步阻塞:同步阻塞基本也是編程中最常見的模型,打個比方你去商店買衣服,你去了之後發現衣服賣完了,那你就在店裏面一直等,期間不做任何事(包括看手機),等著商家進貨,直到有貨為止,這個效率很低。
2.同步非阻塞:同步非阻塞在編程中可以抽象為一個輪詢模式,你去了商店之後,發現衣服賣完了,這個時候不需要傻傻的等著,你可以去其他地方比如奶茶店,買杯水,但是你還是需要時不時的去商店問老板新衣服到了嗎。
3.異步阻塞:異步阻塞這個編程裏面用的較少,有點類似你寫了個線程池,submit然後馬上future.get(),這樣線程其實還是掛起的。有點像你去商店買衣服,這個時候發現衣服沒有了,這個時候你就給老板留給電話,說衣服到了就給我打電話,然後你就守著這個電話,一直等著他響什麽事也不做。這樣感覺的確有點傻,所以這個模式用得比較少。
4.異步非阻塞:異步非阻塞這也是現在高並發編程的一個核心,也是今天主要講的一個核心。好比你去商店買衣服,衣服沒了,你只需要給老板說這是我的電話,衣服到了就打。然後你就隨心所欲的去玩,也不用操心衣服什麽時候到,衣服一到,電話一響就可以去買衣服了。
2.同步阻塞 PK 異步非阻塞
上面已經看到了同步阻塞的效率是多麽的低,如果使用同步阻塞的方式去買衣服,你有可能一天只能買一件衣服,其他什麽事都不能幹,如果用異步非阻塞的方式去買,買衣服只是你一天中進行的一個小事。
我們把這個映射到我們代碼中,當我們的線程發生一次rpc調用或者http調用,又或者其他的一些耗時的IO調用,發起之後,如果是同步阻塞,我們的這個線程就會被阻塞掛起,直到結果返回,試想一下如果IO調用很頻繁那我們的CPU使用率其實是很低很低。正所謂是物盡其用,既然CPU的使用率被IO調用搞得很低,那我們就可以使用異步非阻塞,當發生IO調用時我並不馬上關心結果,我只需要把回調函數寫入這次IO調用,我這個時候線程可以繼續處理新的請求,當IO調用結束結束時,會調用回調函數。而我們的線程始終處於忙碌之中,這樣就能做更多的有意義的事了。
這裏首先要說明的是,異步化不是萬能,異步化並不能縮短你整個鏈路調用時間長的問題,但是他能極大的提升你的最大qps。一般我們的業務中有兩處比較耗時:
cpu: cpu耗時指的是我們的一般的業務處理邏輯,比如一些數據的運算,對象的序列化。這些異步化是不能解決的,得需要靠一些算法的優化,或者一些高性能框架。
iowait: io耗時就像我們上面說的,一般發生在網絡調用,文件傳輸中等等,這個時候線程一般會掛起阻塞。而我們的異步化通常用於解決這部分的問題。
3.哪些可以異步化?
上面說了異步化是用於解決IO阻塞的問題,而我們一般項目中可以使用異步化如下:
servlet異步化,springmvc異步化
rpc調用如(dubbo,thrift),http調用異步化
數據庫調用,緩存調用異步化
下面我會從上面幾個方面進行異步化的介紹.
4.servlet異步化
對於Java開發程序員來說servlet並不陌生吧,在項目中不論你使用struts2,還是使用的springmvc,本質上都是封裝的servlet。但是我們的一般的開發,其實都是使用的同步阻塞模式如下:
上面的模式優點在於編碼簡單,適合在項目啟動初期,訪問量較少,或者是CPU運算較多的項目
缺點在於,業務邏輯線程和servlet容器線程是同一個,一般的業務邏輯總得發生點IO,比如查詢數據庫,比如產生RPC調用,這個時候就會發生阻塞,而我們的servlet容器線程肯定是有限的,當servlet容器線程都被阻塞的時候我們的服務這個時候就會發生拒絕訪問,線程不然我當然們可以通過增加機器的一系列手段來解決這個問題,但是俗話說得好靠人不如靠自己,靠別人替我分擔請求,還不如我自己搞定。所以在servlet3.0之後支持了異步化,我們采用異步化之後就會變成如下:
在這裏我們采用新的線程處理業務邏輯,IO調用的阻塞就不會影響我們的serlvet了,實現異步serlvet的代碼也比較簡單,如下:
@WebServlet
(
name
"WorkServlet"
,
urlPatterns
"/work"
,
asyncSupported
true
)
public
class
WorkServlet
extends
HttpServlet
{
private
static
final
long
serialVersionUID
1L
;
@Override
protected
void
doGet
(
HttpServletRequest
req
,
HttpServletResponse
resp
)
throws
ServletException
,
IOException
{
this
.
doPost
(
req
,
resp
);
}
@Override
protected
void
doPost
(
HttpServletRequest
req
,
HttpServletResponse
resp
)
throws
ServletException
,
IOException
{
//設置ContentType,關閉緩存
resp
.
setContentType
(
"text/plain;charset=UTF-8"
);
resp
.
setHeader
(
"Cache-Control"
,
"private"
);
resp
.
setHeader
(
"Pragma"
,
"no-cache"
);
final
PrintWriter
writer
resp
.
getWriter
();
writer
.
println
(
"老師檢查作業了"
);
writer
.
flush
();
List
<
String
zuoyes
new
ArrayList
<
String
();
for
(
int
i
0;
i
<
10
;
i
++)
{
zuoyes
.
add
(
"zuoye"
+
i
);;
}
//開啟異步請求
final
AsyncContext
ac
req
.
startAsync
();
doZuoye
(
ac
,
zuoyes
);
writer
.
println
(
"老師布置作業"
);
writer
.
flush
();
}
private
void
doZuoye
(
final
AsyncContext
ac
,
final
List
<
String
zuoyes
)
{
ac
.
setTimeout
(
1
60
60
*
1000L
);
ac
.
start
(
new
Runnable
()
{
@Override
public
void
run
()
{
//通過response獲得字符輸出流
try
{
PrintWriter
writer
ac
.
getResponse
().
getWriter
();
for
(
String
zuoye
:
zuoyes
)
{
writer
.
println
(
"\""
+
zuoye
+
"\"請求處理中"
);
Thread
.
sleep
(
1
*
1000L
);
writer
.
flush
();
}
ac
.
complete
();
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
});
}
}
實現serlvet的關鍵在於http采取了長連接,也就是當請求打過來的時候就算有返回也不會關閉,因為可能還會有數據,直到返回關閉指令。 AsyncContext ac=req.startAsync(); 用於獲取異步上下文,後續我們通過這個異步上下文進行回調返回數據,有點像我們買衣服的時候,給老板一個電話,而這個上下文也是一個電話,當有衣服到的時候,也就是當有數據準備好的時候就可以打電話發送數據了。 ac.complete(); 用來進行長鏈接的關閉。
4.1springmvc異步化 現在其實很少人來進行serlvet編程,都是直接采用現成的一些框架,比如struts2,springmvc。下面介紹下使用springmvc如何進行異步化:
首先確認你的項目中的Servlet是3.0以上的!!,其次springMVC4.0+
<dependency>
<groupId>
javax.servlet
</groupId>
<artifactId>
javax.servlet-api
</artifactId>
<version>
3.1.0
</version>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
org.springframework
</groupId>
<artifactId>
spring-webmvc
</artifactId>
<version>
4.2.3.RELEASE
</version>
</dependency>
web.xml頭部聲明,必須要3.0,filter和serverlet設置為異步
<?
xml version
"1.0"
encoding
"UTF-8"
?>
<web-app
version
"3.0"
xmlns
"http://java.sun.com/xml/ns/javaee"
xmlns:xsi
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation
"http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
<filter>
<filter-name>
testFilter
</filter-name>
<filter-class>
com.TestFilter
</filter-class>
<async-supported>
true
</async-supported>
</filter>
<servlet>
<servlet-name>
mvc-dispatcher
</servlet-name>
<servlet-class>
org.springframework.web.servlet.DispatcherServlet
</servlet-class>
.........
<async-supported>
true
</async-supported>
使用springmvc封裝了servlet的AsyncContext,使用起來比較簡單。以前我們同步的模式的Controller是返回額ModelAndView,而異步模式直接生成一個defrredResult(支持我們超時擴展)即可保存上下文,下面給出如何和我們HttpClient搭配的簡單demo
@RequestMapping
/servlet>
使用springmvc封裝了servlet的AsyncContext,使用起來比較簡單。以前我們同步的模式的Controller是返回額ModelAndView,而異步模式直接生成一個defrredResult(支持我們超時擴展)即可保存上下文,下面給出如何和我們HttpClient搭配的簡單demo
@RequestMapping
value
"/asynctask"
,
method
RequestMethod
.
GET
)
public
DeferredResult
<
String
asyncTask
()
throws
IOReactorException
{
IOReactorConfig
ioReactorConfig
IOReactorConfig
.
custom
().
setIoThreadCount
(
1
).
build
();
ConnectingIOReactor
ioReactor
new
DefaultConnectingIOReactor
(
ioReactorConfig
);
PoolingNHttpClientConnectionManager
conManager
new
PoolingNHttpClientConnectionManager
(
ioReactor
);
conManager
.
setMaxTotal
(
100
);
conManager
.
setDefaultMaxPerRoute
(
100
);
CloseableHttpAsyncClient
httpclient
HttpAsyncClients
.
custom
().
setConnectionManager
(
conManager
).
build
();
// Start the client
httpclient
.
start
();
//設置超時時間200ms
final
DeferredResult
<
String
deferredResult
new
DeferredResult
<
String
(
200L
);
deferredResult
.
onTimeout
(
new
Runnable
()
{
@Override
public
void
run
()
{
System
.
out
.
println
(
"異步調用執行超時!thread id is : "
+
Thread
.
currentThread
().
getId
());
deferredResult
.
setResult
(
"超時了"
);
}
});
System
.
out
.
println
(
"/asynctask 調用!thread id is : "
+
Thread
.
currentThread
().
getId
());
final
HttpGet
request2
new
HttpGet
(
"http://www.apache.org/"
);
httpclient
.
execute
(
request2
,
new
FutureCallback
<
HttpResponse
()
{
public
void
completed
(
final
HttpResponse
response2
)
{
System
.
out
.
println
(
request2
.
getRequestLine
()
+
"->"
+
response2
.
getStatusLine
());
deferredResult
.
setResult
(
request2
.
getRequestLine
()
+
"->"
+
response2
.
getStatusLine
());
}
public
void
failed
(
final
Exception
ex
)
{
System
.
out
.
println
(
request2
.
getRequestLine
()
+
"->"
+
ex
);
}
public
void
cancelled
()
{
System
.
out
.
println
(
request2
.
getRequestLine
()
+
" cancelled"
);
}
});
return
deferredResult
;
}
註意: 在serlvet異步化中有個問題是filter的後置結果處理,沒法使用,對於我們一些打點,結果統計直接使用serlvet異步是沒法用的。在springmvc中就很好的解決了這個問題,springmvc采用了一個比較取巧的方式通過請求轉發,能讓請求再次過濾器。但是又引入了新的一個問題那就是過濾器會處理兩次,這裏可以通過SpringMVC源碼中自身判斷的方法,我們可以在filter中使用下面這句話來進行判斷是不是屬於springmvc轉發過來的請求,從而不處理filter的前置事件,只處理後置事件:
Object
asyncManagerAttr
servletRequest
.
getAttribute
(
WEB_ASYNC_MANAGER_ATTRIBUTE
);
return
asyncManagerAttr
instanceof
WebAsyncManager
;
5.全鏈路異步化
上面我們介紹了serlvet的異步化,相信細心的同學都看出來似乎並沒有解決根本的問題,我的IO阻塞依然存在,只是換了個位置而已,當IO調用頻繁同樣會讓業務線程池快速變滿,雖然serlvet容器線程不被阻塞,但是這個業務依然會變得不可用。
那麽怎麽才能解決上面的問題呢?答案就是全鏈路異步化,全鏈路異步追求的是沒有阻塞,打滿你的CPU,把機器的性能壓榨到極致模型圖如下:
具體的NIO client到底做了什麽事呢,具體如下面模型:
上面就是我們全鏈路異步的圖了(部分線程池可以優化)。全鏈路的核心在於只要我們遇到IO調用的時候,我們就可以使用NIO,從而避免阻塞,也就解決了之前說的業務線程池被打滿得到尷尬場景。
5.1遠程調用異步化
我們一般遠程調用使用rpc或者http。對於rpc來說一般thrift,http,motan等支持都異步調用,其內部原理也都是采用事件驅動的NIO模型,對於http來說一般的apachehttpclient和okhttp也都提供了異步調用。 下面簡單介紹下Http異步化調用是怎麽做的: 首先來看一個例子:
public
class
HTTPAsyncClientDemo
{
public
static
void
main
(
String
[]
args
)
throws
ExecutionException
,
InterruptedException
,
IOReactorException
{
//具體參數含義下文會講
//apache提供了ioReactor的參數配置,這裏我們配置IO 線程為1
IOReactorConfig
ioReactorConfig
IOReactorConfig
.
custom
().
setIoThreadCount
(
1
).
build
();
//根據這個配置創建一個ioReactor
ConnectingIOReactor
ioReactor
new
DefaultConnectingIOReactor
(
ioReactorConfig
);
//asyncHttpClient使用PoolingNHttpClientConnectionManager管理我們客戶端連接
PoolingNHttpClientConnectionManager
conManager
new
PoolingNHttpClientConnectionManager
(
ioReactor
);
//設置總共的連接的最大數量
conManager
.
setMaxTotal
(
100
);
//設置每個路由的連接的最大數量
conManager
.
setDefaultMaxPerRoute
(
100
);
//創建一個Client
CloseableHttpAsyncClient
httpclient
HttpAsyncClients
.
custom
().
setConnectionManager
(
conManager
).
build
();
// Start the client
httpclient
.
start
();
// Execute request
final
HttpGet
request1
new
HttpGet
(
"http://www.apache.org/"
);
Future
<
HttpResponse
future
httpclient
.
execute
(
request1
,
null
);
// and wait until a response is received
HttpResponse
response1
future
.
get
();
System
.
out
.
println
(
request1
.
getRequestLine
()
+
"->"
+
response1
.
getStatusLine
());
// One most likely would want to use a callback for operation result
final
HttpGet
request2
new
HttpGet
(
"http://www.apache.org/"
);
httpclient
.
execute
(
request2
,
new
FutureCallback
<
HttpResponse
()
{
//Complete成功後會回調這個方法
public
void
completed
(
final
HttpResponse
response2
)
{
System
.
out
.
println
(
request2
.
getRequestLine
()
+
"->"
+
response2
.
getStatusLine
());
}
public
void
failed
(
final
Exception
ex
)
{
System
.
out
.
println
(
request2
.
getRequestLine
()
+
"->"
+
ex
);
}
public
void
cancelled
()
{
System
.
out
.
println
(
request2
.
getRequestLine
()
+
" cancelled"
);
}
});
}
}
下面給出httpAsync的整個類圖:
對於我們的HTTPAysncClient 其實最後使用的是InternalHttpAsyncClient,在InternalHttpAsyncClient中有個ConnectionManager,這個就是我們管理連接的管理器,而在httpAsync中只有一個實現那就是PoolingNHttpClientConnectionManager,這個連接管理器中有兩個我們比較關心的一個是Reactor,一個是Cpool。
Reactor :所有的Reactor這裏都是實現了IOReactor接口。在PoolingNHttpClientConnectionManager中會有擁有一個Reactor,那就是DefaultConnectingIOReactor,這個DefaultConnectingIOReactor,負責處理Acceptor。在DefaultConnectingIOReactor有個excutor方法,生成IOReactor也就是我們圖中的BaseIOReactor,進行IO的操作。這個模型就是我們上面的1.2.2的模型
CPool :在PoolingNHttpClientConnectionManager中有個CPool,主要是負責控制我們連接,我們上面所說的maxTotal和defaultMaxPerRoute,都是由其進行控制,如果每個路由的滿了他會斷開最老的一個鏈接,如果總共的total滿了他會放入leased隊列,釋放空間的時候就會將其重新連接。
5.2數據庫調用異步化
對於數據庫調用一般的框架並沒有提供異步化的方法,這裏推薦自己封裝或者使用網上開源的,這裏我們公司有個開源的 https://github.com/ainilife/zebra-dao/blob/master/README_ZH.md 能很好的支持異步化
6.最後
異步化並不是高並發的銀彈,但是有了異步化的確能提高你機器的qps,吞吐量等等。上述講的一些模型如果能合理的做一些優化,然後進行應用,相信能對你的服務有很大的幫助的。
異步化,高並發大殺器