1. 程式人生 > >異步化,高並發大殺器

異步化,高並發大殺器

ktr apach tco 序列 etc 當前 本質 就會 out

聊聊如何讓項目異步化的一些事。

1.同步和異步,阻塞和非阻塞

同步和異步,阻塞和非阻塞, 這個幾個詞已經是老生常談,當時常常還是有很多同學分不清楚,以為同步肯定就是阻塞,異步肯定就是非阻塞,其他他們不是一回事。

同步和異步關註的是結果消息的通信機制

同步:同步的意思就是調用方需要主動等待結果的返回
異步:異步的意思就是不需要主動等待結果的返回,而是通過其他手段比如,狀態通知,回調函數等。
阻塞和非阻塞主要關註的是等待結果返回調用方的狀態

阻塞:是指結果返回之前,當前線程被掛起,不做任何事
非阻塞:是指結果在返回之前,線程可以做一些其他事,不會被掛起。
可以看見同步和異步,阻塞和非阻塞主要關註的點不同,有人會問同步還能非阻塞,異步還能阻塞?當然是可以的,下面為了更好的說明他們的組合之間的意思,用幾個簡單的例子說明: 1.同步阻塞:同步阻塞基本也是編程中最常見的模型,打個比方你去商店買衣服,你去了之後發現衣服賣完了,那你就在店裏面一直等,期間不做任何事(包括看手機),等著商家進貨,直到有貨為止,這個效率很低。

2.同步非阻塞:同步非阻塞在編程中可以抽象為一個輪詢模式,你去了商店之後,發現衣服賣完了,這個時候不需要傻傻的等著,你可以去其他地方比如奶茶店,買杯水,但是你還是需要時不時的去商店問老板新衣服到了嗎。

3.異步阻塞:異步阻塞這個編程裏面用的較少,有點類似你寫了個線程池,submit然後馬上future.get(),這樣線程其實還是掛起的。有點像你去商店買衣服,這個時候發現衣服沒有了,這個時候你就給老板留給電話,說衣服到了就給我打電話,然後你就守著這個電話,一直等著他響什麽事也不做。這樣感覺的確有點傻,所以這個模式用得比較少。

4.異步非阻塞:異步非阻塞這也是現在高並發編程的一個核心,也是今天主要講的一個核心。好比你去商店買衣服,衣服沒了,你只需要給老板說這是我的電話,衣服到了就打。然後你就隨心所欲的去玩,也不用操心衣服什麽時候到,衣服一到,電話一響就可以去買衣服了。

2.同步阻塞 PK 異步非阻塞

上面已經看到了同步阻塞的效率是多麽的低,如果使用同步阻塞的方式去買衣服,你有可能一天只能買一件衣服,其他什麽事都不能幹,如果用異步非阻塞的方式去買,買衣服只是你一天中進行的一個小事。

我們把這個映射到我們代碼中,當我們的線程發生一次rpc調用或者http調用,又或者其他的一些耗時的IO調用,發起之後,如果是同步阻塞,我們的這個線程就會被阻塞掛起,直到結果返回,試想一下如果IO調用很頻繁那我們的CPU使用率其實是很低很低。正所謂是物盡其用,既然CPU的使用率被IO調用搞得很低,那我們就可以使用異步非阻塞,當發生IO調用時我並不馬上關心結果,我只需要把回調函數寫入這次IO調用,我這個時候線程可以繼續處理新的請求,當IO調用結束結束時,會調用回調函數。而我們的線程始終處於忙碌之中,這樣就能做更多的有意義的事了。

這裏首先要說明的是,異步化不是萬能,異步化並不能縮短你整個鏈路調用時間長的問題,但是他能極大的提升你的最大qps。一般我們的業務中有兩處比較耗時:

cpu: cpu耗時指的是我們的一般的業務處理邏輯,比如一些數據的運算,對象的序列化。這些異步化是不能解決的,得需要靠一些算法的優化,或者一些高性能框架。
iowait: io耗時就像我們上面說的,一般發生在網絡調用,文件傳輸中等等,這個時候線程一般會掛起阻塞。而我們的異步化通常用於解決這部分的問題。
3.哪些可以異步化?

上面說了異步化是用於解決IO阻塞的問題,而我們一般項目中可以使用異步化如下:

servlet異步化,springmvc異步化
rpc調用如(dubbo,thrift),http調用異步化
數據庫調用,緩存調用異步化
下面我會從上面幾個方面進行異步化的介紹.

4.servlet異步化

對於Java開發程序員來說servlet並不陌生吧,在項目中不論你使用struts2,還是使用的springmvc,本質上都是封裝的servlet。但是我們的一般的開發,其實都是使用的同步阻塞模式如下:

上面的模式優點在於編碼簡單,適合在項目啟動初期,訪問量較少,或者是CPU運算較多的項目

缺點在於,業務邏輯線程和servlet容器線程是同一個,一般的業務邏輯總得發生點IO,比如查詢數據庫,比如產生RPC調用,這個時候就會發生阻塞,而我們的servlet容器線程肯定是有限的,當servlet容器線程都被阻塞的時候我們的服務這個時候就會發生拒絕訪問,線程不然我當然們可以通過增加機器的一系列手段來解決這個問題,但是俗話說得好靠人不如靠自己,靠別人替我分擔請求,還不如我自己搞定。所以在servlet3.0之後支持了異步化,我們采用異步化之後就會變成如下:

在這裏我們采用新的線程處理業務邏輯,IO調用的阻塞就不會影響我們的serlvet了,實現異步serlvet的代碼也比較簡單,如下:

@WebServlet
(
name

"WorkServlet"
,
urlPatterns

"/work"
,
asyncSupported

true
)
public

class

WorkServlet

extends

HttpServlet
{

private

static

final

long
serialVersionUID

1L
;

@Override

protected

void
doGet
(
HttpServletRequest
req
,

HttpServletResponse
resp
)

throws

ServletException
,

IOException

{

this
.
doPost
(
req
,
resp
);

}

@Override

protected

void
doPost
(
HttpServletRequest
req
,

HttpServletResponse
resp
)

throws

ServletException
,

IOException

{

//設置ContentType,關閉緩存
resp
.
setContentType
(
"text/plain;charset=UTF-8"
);
resp
.
setHeader
(
"Cache-Control"
,
"private"
);
resp
.
setHeader
(
"Pragma"
,
"no-cache"
);

final

PrintWriter
writer

resp
.
getWriter
();
writer
.
println
(
"老師檢查作業了"
);
writer
.
flush
();

List
<
String

zuoyes

new

ArrayList
<
String

();

for

(
int
i

0;
i
<

10
;
i
++)

{
zuoyes
.
add
(
"zuoye"
+
i
);;

}

//開啟異步請求

final

AsyncContext
ac

req
.
startAsync
();
doZuoye
(
ac
,
zuoyes
);
writer
.
println
(
"老師布置作業"
);
writer
.
flush
();

}

private

void
doZuoye
(
final

AsyncContext
ac
,

final

List
<
String

zuoyes
)

{
ac
.
setTimeout
(
1

60

60
*
1000L
);
ac
.
start
(
new

Runnable
()

{

@Override

public

void
run
()

{

//通過response獲得字符輸出流

try

{

PrintWriter
writer

ac
.
getResponse
().
getWriter
();

for

(
String
zuoye
:
zuoyes
)

{
writer
.
println
(
"\""
+
zuoye
+
"\"請求處理中"
);

Thread
.
sleep
(
1
*
1000L
);
writer
.
flush
();

}
ac
.
complete
();

}

catch

(
Exception
e
)

{
e
.
printStackTrace
();

}

}

});

}
}
實現serlvet的關鍵在於http采取了長連接,也就是當請求打過來的時候就算有返回也不會關閉,因為可能還會有數據,直到返回關閉指令。 AsyncContext ac=req.startAsync(); 用於獲取異步上下文,後續我們通過這個異步上下文進行回調返回數據,有點像我們買衣服的時候,給老板一個電話,而這個上下文也是一個電話,當有衣服到的時候,也就是當有數據準備好的時候就可以打電話發送數據了。 ac.complete(); 用來進行長鏈接的關閉。

4.1springmvc異步化 現在其實很少人來進行serlvet編程,都是直接采用現成的一些框架,比如struts2,springmvc。下面介紹下使用springmvc如何進行異步化:

首先確認你的項目中的Servlet是3.0以上的!!,其次springMVC4.0+
<dependency>

<groupId>
javax.servlet
</groupId>

<artifactId>
javax.servlet-api
</artifactId>

<version>
3.1.0
</version>

<scope>
provided
</scope>

</dependency>

<dependency>

<groupId>
org.springframework
</groupId>

<artifactId>
spring-webmvc
</artifactId>

<version>
4.2.3.RELEASE
</version>

</dependency>
web.xml頭部聲明,必須要3.0,filter和serverlet設置為異步
<?
xml version

"1.0"
encoding

"UTF-8"
?>
<web-app

version

"3.0"

xmlns

"http://java.sun.com/xml/ns/javaee"

xmlns:xsi

"http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation

"http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"

<filter>

<filter-name>
testFilter
</filter-name>

<filter-class>
com.TestFilter
</filter-class>

<async-supported>
true
</async-supported>

</filter>

<servlet>

<servlet-name>
mvc-dispatcher
</servlet-name>

<servlet-class>
org.springframework.web.servlet.DispatcherServlet
</servlet-class>
.........

<async-supported>
true
</async-supported>

使用springmvc封裝了servlet的AsyncContext,使用起來比較簡單。以前我們同步的模式的Controller是返回額ModelAndView,而異步模式直接生成一個defrredResult(支持我們超時擴展)即可保存上下文,下面給出如何和我們HttpClient搭配的簡單demo
@RequestMapping
/servlet>
使用springmvc封裝了servlet的AsyncContext,使用起來比較簡單。以前我們同步的模式的Controller是返回額ModelAndView,而異步模式直接生成一個defrredResult(支持我們超時擴展)即可保存上下文,下面給出如何和我們HttpClient搭配的簡單demo
@RequestMapping
value

"/asynctask"
,
method

RequestMethod
.
GET
)

public

DeferredResult
<
String

asyncTask
()

throws

IOReactorException

{

IOReactorConfig
ioReactorConfig

IOReactorConfig
.
custom
().
setIoThreadCount
(
1
).
build
();

ConnectingIOReactor
ioReactor

new

DefaultConnectingIOReactor
(
ioReactorConfig
);

PoolingNHttpClientConnectionManager
conManager

new

PoolingNHttpClientConnectionManager
(
ioReactor
);
conManager
.
setMaxTotal
(
100
);
conManager
.
setDefaultMaxPerRoute
(
100
);

CloseableHttpAsyncClient
httpclient

HttpAsyncClients
.
custom
().
setConnectionManager
(
conManager
).
build
();

// Start the client
httpclient
.
start
();

//設置超時時間200ms

final

DeferredResult
<
String

deferredResult

new

DeferredResult
<
String

(
200L
);
deferredResult
.
onTimeout
(
new

Runnable
()

{

@Override

public

void
run
()

{

System
.
out
.
println
(
"異步調用執行超時!thread id is : "

+

Thread
.
currentThread
().
getId
());
deferredResult
.
setResult
(
"超時了"
);

}

});

System
.
out
.
println
(
"/asynctask 調用!thread id is : "

+

Thread
.
currentThread
().
getId
());

final

HttpGet
request2

new

HttpGet
(
"http://www.apache.org/"
);
httpclient
.
execute
(
request2
,

new

FutureCallback
<
HttpResponse

()

{

public

void
completed
(
final

HttpResponse
response2
)

{

System
.
out
.
println
(
request2
.
getRequestLine
()

+

"->"

+
response2
.
getStatusLine
());
deferredResult
.
setResult
(
request2
.
getRequestLine
()

+

"->"

+
response2
.
getStatusLine
());

}

public

void
failed
(
final

Exception
ex
)

{

System
.
out
.
println
(
request2
.
getRequestLine
()

+

"->"

+
ex
);

}

public

void
cancelled
()

{

System
.
out
.
println
(
request2
.
getRequestLine
()

+

" cancelled"
);

}

});

return
deferredResult
;

}
註意: 在serlvet異步化中有個問題是filter的後置結果處理,沒法使用,對於我們一些打點,結果統計直接使用serlvet異步是沒法用的。在springmvc中就很好的解決了這個問題,springmvc采用了一個比較取巧的方式通過請求轉發,能讓請求再次過濾器。但是又引入了新的一個問題那就是過濾器會處理兩次,這裏可以通過SpringMVC源碼中自身判斷的方法,我們可以在filter中使用下面這句話來進行判斷是不是屬於springmvc轉發過來的請求,從而不處理filter的前置事件,只處理後置事件:

Object
asyncManagerAttr

servletRequest
.
getAttribute
(
WEB_ASYNC_MANAGER_ATTRIBUTE
);
return
asyncManagerAttr
instanceof

WebAsyncManager

;
5.全鏈路異步化

上面我們介紹了serlvet的異步化,相信細心的同學都看出來似乎並沒有解決根本的問題,我的IO阻塞依然存在,只是換了個位置而已,當IO調用頻繁同樣會讓業務線程池快速變滿,雖然serlvet容器線程不被阻塞,但是這個業務依然會變得不可用。

那麽怎麽才能解決上面的問題呢?答案就是全鏈路異步化,全鏈路異步追求的是沒有阻塞,打滿你的CPU,把機器的性能壓榨到極致模型圖如下:

具體的NIO client到底做了什麽事呢,具體如下面模型:

上面就是我們全鏈路異步的圖了(部分線程池可以優化)。全鏈路的核心在於只要我們遇到IO調用的時候,我們就可以使用NIO,從而避免阻塞,也就解決了之前說的業務線程池被打滿得到尷尬場景。

5.1遠程調用異步化

我們一般遠程調用使用rpc或者http。對於rpc來說一般thrift,http,motan等支持都異步調用,其內部原理也都是采用事件驅動的NIO模型,對於http來說一般的apachehttpclient和okhttp也都提供了異步調用。 下面簡單介紹下Http異步化調用是怎麽做的: 首先來看一個例子:

public

class

HTTPAsyncClientDemo

{

public

static

void
main
(
String
[]
args
)

throws

ExecutionException
,

InterruptedException
,

IOReactorException

{

//具體參數含義下文會講

//apache提供了ioReactor的參數配置,這裏我們配置IO 線程為1

IOReactorConfig
ioReactorConfig

IOReactorConfig
.
custom
().
setIoThreadCount
(
1
).
build
();

//根據這個配置創建一個ioReactor

ConnectingIOReactor
ioReactor

new

DefaultConnectingIOReactor
(
ioReactorConfig
);

//asyncHttpClient使用PoolingNHttpClientConnectionManager管理我們客戶端連接

PoolingNHttpClientConnectionManager
conManager

new

PoolingNHttpClientConnectionManager
(
ioReactor
);

//設置總共的連接的最大數量
conManager
.
setMaxTotal
(
100
);

//設置每個路由的連接的最大數量
conManager
.
setDefaultMaxPerRoute
(
100
);

//創建一個Client

CloseableHttpAsyncClient
httpclient

HttpAsyncClients
.
custom
().
setConnectionManager
(
conManager
).
build
();

// Start the client
httpclient
.
start
();

// Execute request

final

HttpGet
request1

new

HttpGet
(
"http://www.apache.org/"
);

Future
<
HttpResponse

future

httpclient
.
execute
(
request1
,

null
);

// and wait until a response is received

HttpResponse
response1

future
.
get
();

System
.
out
.
println
(
request1
.
getRequestLine
()

+

"->"

+
response1
.
getStatusLine
());

// One most likely would want to use a callback for operation result

final

HttpGet
request2

new

HttpGet
(
"http://www.apache.org/"
);
httpclient
.
execute
(
request2
,

new

FutureCallback
<
HttpResponse

()

{

//Complete成功後會回調這個方法

public

void
completed
(
final

HttpResponse
response2
)

{

System
.
out
.
println
(
request2
.
getRequestLine
()

+

"->"

+
response2
.
getStatusLine
());

}

public

void
failed
(
final

Exception
ex
)

{

System
.
out
.
println
(
request2
.
getRequestLine
()

+

"->"

+
ex
);

}

public

void
cancelled
()

{

System
.
out
.
println
(
request2
.
getRequestLine
()

+

" cancelled"
);

}

});

}
}
下面給出httpAsync的整個類圖:

對於我們的HTTPAysncClient 其實最後使用的是InternalHttpAsyncClient,在InternalHttpAsyncClient中有個ConnectionManager,這個就是我們管理連接的管理器,而在httpAsync中只有一個實現那就是PoolingNHttpClientConnectionManager,這個連接管理器中有兩個我們比較關心的一個是Reactor,一個是Cpool。

Reactor :所有的Reactor這裏都是實現了IOReactor接口。在PoolingNHttpClientConnectionManager中會有擁有一個Reactor,那就是DefaultConnectingIOReactor,這個DefaultConnectingIOReactor,負責處理Acceptor。在DefaultConnectingIOReactor有個excutor方法,生成IOReactor也就是我們圖中的BaseIOReactor,進行IO的操作。這個模型就是我們上面的1.2.2的模型

CPool :在PoolingNHttpClientConnectionManager中有個CPool,主要是負責控制我們連接,我們上面所說的maxTotal和defaultMaxPerRoute,都是由其進行控制,如果每個路由的滿了他會斷開最老的一個鏈接,如果總共的total滿了他會放入leased隊列,釋放空間的時候就會將其重新連接。

5.2數據庫調用異步化

對於數據庫調用一般的框架並沒有提供異步化的方法,這裏推薦自己封裝或者使用網上開源的,這裏我們公司有個開源的 https://github.com/ainilife/zebra-dao/blob/master/README_ZH.md 能很好的支持異步化

6.最後

異步化並不是高並發的銀彈,但是有了異步化的確能提高你機器的qps,吞吐量等等。上述講的一些模型如果能合理的做一些優化,然後進行應用,相信能對你的服務有很大的幫助的。

異步化,高並發大殺器