基於AsyncRestTemplate非同步HTTP請求的一種輕量級技術實現
Ⅰ、前言
在上一篇部落格中講述ListenableFuture通過非同步回撥機制來實現請求的非阻塞。通常情況下,客戶端獲取資料並不會只發送一次http請求,可能會有多個http請求。這樣,使用上一篇部落格中的方法,就會產生大量的冗餘程式碼,因為請求處理的程式碼除了一些引數不同外,其它地方都大致相同。我們發現不同請求之間的區別在於:請求地址的不同、響應型別的不同,可能還會有額外請求引數的不同。我們可以將請求資料和響應資料進行封裝,這樣,只需要一個欄位來標識每一次http請求屬於哪一個業務就可以實現批量傳送http請求,整個過程是非同步非阻塞的,一旦獲取到資料就會觸發回撥函式,進而獲取到響應資料,最後再進行業務邏輯相關處理。
Ⅱ、RestTemplate簡介
1、定義
RestTemplate是Spring3.0中出現的新類,其可以簡化HTTP伺服器通訊,處理HTTP連線,使應用程式程式碼通過提供url和響應型別(可能的模板變數)便可提取結果。
2、方法
//get方法
//其中url為請求地址,responseType為響應類(需要自己依據響應格式來確定)
//urlVariables為陣列變數
public <T> T getForObject(String url, Class<T> responseType, Object... urlVariables) throws RestClientException
//urlVariables為Map型別變數,其中key為請求欄位名,value為請求欄位值
public <T> T getForObject(String url, Class<T> responseType, Map<String, ?> urlVariables)
public <T> T getForObject(URI url, Class<T> responseType) throws RestClientException
//ResponseEntity
public <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Object ... urlVariables) throws RestClientException
public <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Map<String, ?> urlVariables) throws RestClientException
public <T> ResponseEntity<T> getForEntity(URI url, Class<T> responseType) throws RestClientException
//post
public <T> T postForObject(String url, Object request, Class<T> responseType, Object... uriVariables) throws RestClientException
public <T> T postForObject(String url, Object request, Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException
public <T> T postForObject(URI url, Object request, Class<T> responseType) throws RestClientException
public <T> ResponseEntity<T> postForEntity(String url, Object request, Class<T> responseType, Object... uriVariables) throws RestClientException
public <T> ResponseEntity<T> postForEntity(String url, Object request, Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException
public <T> ResponseEntity<T> postForEntity(URI url, Object request, Class<T> responseType) throws RestClientException
3、說明
Spring提供的RestTemplate可用於訪問Rest服務的客戶端,其提供了多種便捷訪問遠端Http服務的方法,能夠大大提高客戶端的編寫效率,但其並沒有實現非同步呼叫的功能。下面將引入Spring4.0提供的AsyncRestTemplate,該類可實現非同步非阻塞處理http請求。
Ⅲ、AsyncRestTemplate簡介
1、定義
AsyncRestTemplate是在Spring4.0中對RestTemplate進行擴充套件產生的新類,其為客戶端提供了非同步http請求處理的一種機制,通過返回ListenableFuture物件生成回撥機制,以達到非同步非阻塞傳送http請求。
2、方法
//get
public <T> ListenableFuture<ResponseEntity<T>> getForEntity(String url, Class<T> responseType, Object... uriVariables) throws RestClientException
public <T> ListenableFuture<ResponseEntity<T>> getForEntity(String url, Class<T> responseType, Map<String, ?> urlVariables) throws RestClientException
public <T> ListenableFuture<ResponseEntity<T>> getForEntity(URI url, Class<T> responseType) throws RestClientException
//post
public <T> ListenableFuture<ResponseEntity<T>> postForEntity(String url, HttpEntity<?> request, Class<T> responseType, Object... uriVariables) throws RestClientException
public <T> ListenableFuture<ResponseEntity<T>> postForEntity(String url, HttpEntity<?> request, Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException
public <T> ListenableFuture<ResponseEntity<T>> postForEntity(URI url, HttpEntity<?> request, Class<T> responseType) throws RestClientException
3、說明
相比於RestTemplate,AsyncRestTemplate通過回撥機制能夠很好地非同步處理多個http請求,使得客戶端在主方法中不必等待伺服器響應,而是繼續執行後續程式碼,這樣就較大地提高了程式碼的執行效率,減少響應時間。
Ⅳ、基於AsyncRestTemplate實現批量非同步呼叫
下面將介紹基於AsyncRestTemplate非同步呼叫的輕量級框架,說框架有點吹牛皮的感覺,不過程式碼結構整體上看起來還是挺清晰的,如有不妥之處,請提供寶貴建議。其主要分為5個部分:業務標識、請求、響應,非同步呼叫、請求處理。對應的類如下所示:
業務標識:IEnum、UserEnum(具體業務標識)
請求:BaseRequest、UserRequest(具體業務請求)、ConcreateWapper(請求包裝)
響應:BaseResponse、UserRequest(具體業務響應)
非同步呼叫:Templete、AbstractTemplete、AsynClientTemplete、CommonListenableCallBack
請求處理:FutureTpDao
1、業務標識(使用列舉類來標識業務請求)
使用列舉類能夠比較好地標識具體業務,但是列舉類無法繼承,這裡通過定義一個空的介面IEnum對其進行抽象。可能看起來會有所不妥,但是也算是一種解決方法吧。
//空的介面
package acync;
public interface IEnum {
}
//具體業務標識列舉類,實現了IEnum介面
//
public enum UserEnum implements IEnum {
ADD,
UPDATE,
DELETE,
MODIFY;
}
2、請求
通常情況下,客戶端都是傳送http請求(使用url的方式)來獲取資料,這樣,我們主需要獲取請求的url地址即可。這裡,定義介面BaseRequest提供build方法來構建請求介面,對於具體的業務請求只需實現介面並構建請求url即可。
//基礎請求介面,提供構建URL方法
package acync;
public interface BaseRequest {
public String build();
}
//具體的請求類,依據業務情況自行構建URL地址
package acync;
public class UserRequest implements BaseRequest {
private static final String REQ_URL = "http://www.126.com";
@Override
public String build() {
return REQ_URL;
}
}
3、響應
對於請求響應這裡也是定義抽象類BaseResponse,提供status來表示請求的響應狀態,而具體的業務響應只需要實現抽象類,自定義實現即可。(其中,BaseResponse抽象類可依據具體的業務框架來定義實現)
//基礎響應抽象類,提供狀態碼
package acync;
import java.io.Serializable;
public abstract class BaseResponse implements Serializable{
private String status;
}
//具體業務響應類
package acync;
public class UserResponse extends BaseResponse{
//TODO
}
4、非同步呼叫
下面的所列程式碼是整個請求的核心程式碼。首先,定義模版介面,介面中只提供了若干主要方法,從整體上看,方法的引數為業務請求類和響應型別,返回值為泛型型別的ListenableFuture物件;其次,定義抽象類和具體的實現類;最後,進過請求處理即可獲取請求介面。這裡不累贅,見下方程式碼。
//非同步呼叫模板介面
package acync;
import java.util.Map;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFuture;
public interface Templete {
<T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType) throws Exception;
<T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
ParameterizedTypeReference<T> responseType) throws Exception;
<T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType,
Map<String, ?> uriVariables) throws Exception;
<T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
ParameterizedTypeReference<T> responseType, Map<String, ?> uriVariables) throws Exception;
}
//非同步呼叫抽象類
//這裡僅僅提供少量的調取方法,可以自行擴充套件
package acync;
import java.util.Map;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.client.AsyncRestTemplate;
public abstract class AbstractTemplete implements Templete{
public AsyncRestTemplate asyncRestTemplate;
@Override
public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType)
throws Exception {
String url = baserequest.build();
try {
ListenableFuture<ResponseEntity<T>> t = asyncRestTemplate.getForEntity(url, responseType);
return t;
} catch (Exception e) {
throw e;
}
}
@Override
public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
ParameterizedTypeReference<T> responseType) throws Exception {
String url = baserequest.build();
try {
ListenableFuture<ResponseEntity<T>> t = asyncRestTemplate.exchange(url, HttpMethod.GET, null, responseType);
return t;
} catch (Exception e) {
throw e;
}
}
@Override
public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType,
Map<String, ?> uriVariables) throws Exception {
String url = baserequest.build();
ListenableFuture<ResponseEntity<T>> t = null;
try {
t = asyncRestTemplate.exchange(url, HttpMethod.GET, null, responseType, uriVariables);
return t;
} catch (Exception e) {
throw e;
}
}
@Override
public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
ParameterizedTypeReference<T> responseType, Map<String, ?> uriVariables) throws Exception {
String url = baserequest.build();
ListenableFuture<ResponseEntity<T>> t = null;
try {
t = asyncRestTemplate.exchange(url, HttpMethod.GET, null, responseType, uriVariables);
return t;
} catch (Exception e) {
throw e;
}
}
abstract void setTemplete(AsyncRestTemplate asyncRestTemplate);
}
// 具體的非同步呼叫實現類
package acync;
import java.util.Map;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.client.AsyncRestTemplate;
public class AsynClientTemplete extends AbstractTemplete {
public AsynClientTemplete(AsyncRestTemplate template) {
setTemplete(template);
}
public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType)
throws Exception {
return super.getAsyncForObject(baserequest, responseType);
}
public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
ParameterizedTypeReference<T> responseType) throws Exception {
return super.getAsyncForObject(baserequest, responseType);
}
public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType,
Map<String, ?> uriVariables) throws Exception {
return super.getAsyncForObject(baserequest, responseType, uriVariables);
}
public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
ParameterizedTypeReference<T> responseType, Map<String, ?> uriVariables) throws Exception {
return super.getAsyncForObject(baserequest, responseType, uriVariables);
}
@Override
void setTemplete(AsyncRestTemplate template) {
asyncRestTemplate = template == null ? new AsyncRestTemplate() : template;
}
}
5、請求處理
上述四步都是為這一步做準備。請求處理這一步是請求的入口,在FutureTpDao中,通過getHttpData方法傳入請求包裝類ConcreateWapper,返回的Map物件Map
//包裝了具體的請求資訊
//其中的每一個Concreate對應一個具體的請求,baseEnum對應業務標識,variables為請求的額外引數,request為請求類和響應類組成的map
package acync;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ConcreateWapper {
private List<Concreate> wrapper = new ArrayList<Concreate>();
public ConcreateWapper(){}
public void setParams(IEnum baseEnum, Map<String, ?> variables, Map<BaseRequest, ?> request) {
wrapper.add(new Concreate(baseEnum, variables, request));
}
public List<Concreate> getWrapper() {
return wrapper;
}
public static class Concreate {
private IEnum baseEnum;
private Map<String, ?> variables;
private Map<BaseRequest, ?> request;
public Concreate(IEnum baseEnum, Map<String, ?> variables, Map<BaseRequest, ?> request) {
this.baseEnum = baseEnum;
this.variables = variables;
this.request = request;
}
public IEnum getBaseEnum() {
return baseEnum;
}
public void setBaseEnum(IEnum baseEnum) {
this.baseEnum = baseEnum;
}
public Map<String, ?> getVariables() {
return variables;
}
public void setVariables(Map<String, ?> variables) {
this.variables = variables;
}
public Map<BaseRequest, ?> getRequest() {
return request;
}
public void setRequest(Map<BaseRequest, ?> request) {
this.request = request;
}
}
}
//實現ListenableFutureCallback,實現回撥功能
package acync;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFutureCallback;
public class CommonListenableCallBack<T> implements ListenableFutureCallback<T> {
private IEnum type;
private Map<IEnum, Object> resultValue;
private volatile CountDownLatch latch;
public CommonListenableCallBack(IEnum type, Map<IEnum, Object> resultValue, CountDownLatch latch) {
this.type = type;
this.resultValue = resultValue;
this.latch = latch;
}
@Override
public void onSuccess(T result) {
ResponseEntity<T> re = (ResponseEntity<T>) result;
if (re != null && re.getBody() != null) {
T body = re.getBody();
if (type != null) {
resultValue.put(type, body);
}
}
latch.countDown();
}
@Override
public void onFailure(Throwable ex) {
latch.countDown();
}
}
//FutureTpDao的建構函式可以傳入自定義的AsyncRestTemplate,不傳的話就是預設的
//其中的getHttpData()方法傳入多個請求的包裝類ConcreateWapper,返回資料組成的Map
//其中Map中的key對應的是業務標識,value對應的是請求對應的結果類
package acync;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.client.AsyncRestTemplate;
import acync.ConcreateWapper.Concreate;
public class FutureTpDao {
public AsynClientTemplete asynHttpClient;
public FutureTpDao(){
asynHttpClient = new AsynClientTemplete(null);
}
public FutureTpDao(AsyncRestTemplate tp) {
asynHttpClient = new AsynClientTemplete(tp);
}
//獲取資料
public Map<IEnum, Object> getHttpData(ConcreateWapper wapper) {
if (wapper == null)
return new HashMap<IEnum, Object>();
final CountDownLatch latch = new CountDownLatch(wapper.getWrapper().size());
final Map<IEnum, Object> result = new HashMap<IEnum, Object>();
if (wapper.getWrapper() != null) {
for (final Concreate wp : wapper.getWrapper()) {
try {
Map<BaseRequest, ?> requestMap = wp.getRequest();
for (final BaseRequest tpRequestInfo : requestMap.keySet()) {
getHttpdata(wp, tpRequestInfo, latch, requestMap, result);
}
} catch (Exception e) {
e.printStackTrace();
}
}
try {
latch.await();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
return result;
}
//傳送http請求,獲取請求結果
private void getHttpdata(Concreate wp, BaseRequest tpRequestInfo, CountDownLatch latch,
Map<BaseRequest, ?> requestMap, Map<IEnum, Object> result) throws Exception {
ListenableFuture<?> statResponse = null;
if (requestMap.get(tpRequestInfo) instanceof ParameterizedTypeReference<?>) {
ParameterizedTypeReference<?> responseType = (ParameterizedTypeReference<?>) requestMap.get(tpRequestInfo);
statResponse = asynHttpClient.getAsyncForObject(tpRequestInfo, responseType, wp.getVariables());
} else if (requestMap.get(tpRequestInfo) instanceof Class<?>) {
Class<?> responseType = (Class<?>) requestMap.get(tpRequestInfo);
statResponse = asynHttpClient.getAsyncForObject(tpRequestInfo, responseType);
} else {
throw new RuntimeException("requestType error...");
}
addCallBack(statResponse, wp.getBaseEnum(), latch, result);
}
//增加回調
private <T> void addCallBack(ListenableFuture<T> statResponse, IEnum baseEnum, CountDownLatch latch,
Map<IEnum, Object> result) {
if (statResponse != null) {
statResponse.addCallback(new CommonListenableCallBack<T>(baseEnum, result, latch));
}
}
}
6、示例
package acync;
import java.util.HashMap;
import java.util.Map;
/**
* 示例
* 示例僅僅是一個樣板,無法執行
* 需要在web環境下執行,例如啟動tomcat伺服器並進行相關配置
* @author liqqc
*
*/
public class Demo {
public static void main(String[] args) {
ConcreateWapper wapper = new ConcreateWapper();
Map<BaseRequest, Class<? extends BaseResponse>> request = new HashMap<BaseRequest, Class<? extends BaseResponse>>();
request.put(new UserRequest(), new UserResponse().getClass());
wapper.setParams(UserEnum.ADD, null, request);
wapper.setParams(UserEnum.DELETE, null, request);
wapper.setParams(UserEnum.UPDATE, null, request);
wapper.setParams(UserEnum.MODIFY, null, request);
FutureTpDao futureTpDao = new FutureTpDao();
Map<IEnum, Object> futureData = futureTpDao.getHttpData(wapper);
for (IEnum ienum : futureData.keySet()) {
System.err.println(ienum + "=" + futureData.get(ienum));
}
}
}
Ⅴ、總結
本文提供了一種基於AsyncRestTemplate來實現批量請求處理的一種方法。整個框架的結構還是比較清晰,由於技術能力有限,若干地方可能考慮有所欠缺,還需進一步深入研究改進。不管怎樣,希望本文對你有所幫助。如有疑問可以留言或郵件,謝謝。