1. 程式人生 > >基於AsyncRestTemplate非同步HTTP請求的一種輕量級技術實現

基於AsyncRestTemplate非同步HTTP請求的一種輕量級技術實現

Ⅰ、前言

          在上一篇部落格中講述ListenableFuture通過非同步回撥機制來實現請求的非阻塞。通常情況下,客戶端獲取資料並不會只發送一次http請求,可能會有多個http請求。這樣,使用上一篇部落格中的方法,就會產生大量的冗餘程式碼,因為請求處理的程式碼除了一些引數不同外,其它地方都大致相同。我們發現不同請求之間的區別在於:請求地址的不同、響應型別的不同,可能還會有額外請求引數的不同。我們可以將請求資料和響應資料進行封裝,這樣,只需要一個欄位來標識每一次http請求屬於哪一個業務就可以實現批量傳送http請求,整個過程是非同步非阻塞的,一旦獲取到資料就會觸發回撥函式,進而獲取到響應資料,最後再進行業務邏輯相關處理。

Ⅱ、RestTemplate簡介

1、定義

          RestTemplate是Spring3.0中出現的新類,其可以簡化HTTP伺服器通訊,處理HTTP連線,使應用程式程式碼通過提供url和響應型別(可能的模板變數)便可提取結果。

2、方法

//get方法
//其中url為請求地址,responseType為響應類(需要自己依據響應格式來確定)
//urlVariables為陣列變數
public <T> T getForObject(String url, Class<T> responseType, Object... urlVariables) throws RestClientException 

//urlVariables為Map型別變數,其中key為請求欄位名,value為請求欄位值
public <T> T getForObject(String url, Class<T> responseType, Map<String, ?> urlVariables) public <T> T getForObject(URI url, Class<T> responseType) throws RestClientException //ResponseEntity public <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Object
... urlVariables) throws RestClientException public <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Map<String, ?> urlVariables) throws RestClientException public <T> ResponseEntity<T> getForEntity(URI url, Class<T> responseType) throws RestClientException //post public <T> T postForObject(String url, Object request, Class<T> responseType, Object... uriVariables) throws RestClientException public <T> T postForObject(String url, Object request, Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException public <T> T postForObject(URI url, Object request, Class<T> responseType) throws RestClientException public <T> ResponseEntity<T> postForEntity(String url, Object request, Class<T> responseType, Object... uriVariables) throws RestClientException public <T> ResponseEntity<T> postForEntity(String url, Object request, Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException public <T> ResponseEntity<T> postForEntity(URI url, Object request, Class<T> responseType) throws RestClientException

3、說明

          Spring提供的RestTemplate可用於訪問Rest服務的客戶端,其提供了多種便捷訪問遠端Http服務的方法,能夠大大提高客戶端的編寫效率,但其並沒有實現非同步呼叫的功能。下面將引入Spring4.0提供的AsyncRestTemplate,該類可實現非同步非阻塞處理http請求。

Ⅲ、AsyncRestTemplate簡介

1、定義

          AsyncRestTemplate是在Spring4.0中對RestTemplate進行擴充套件產生的新類,其為客戶端提供了非同步http請求處理的一種機制,通過返回ListenableFuture物件生成回撥機制,以達到非同步非阻塞傳送http請求。

2、方法

//get
public <T> ListenableFuture<ResponseEntity<T>> getForEntity(String url, Class<T> responseType, Object... uriVariables) throws RestClientException

public <T> ListenableFuture<ResponseEntity<T>> getForEntity(String url, Class<T> responseType, Map<String, ?> urlVariables) throws RestClientException

public <T> ListenableFuture<ResponseEntity<T>> getForEntity(URI url, Class<T> responseType) throws RestClientException


//post
public <T> ListenableFuture<ResponseEntity<T>> postForEntity(String url, HttpEntity<?> request, Class<T> responseType, Object... uriVariables) throws RestClientException

public <T> ListenableFuture<ResponseEntity<T>> postForEntity(String url, HttpEntity<?> request, Class<T> responseType, Map<String, ?> uriVariables) throws RestClientException 

public <T> ListenableFuture<ResponseEntity<T>> postForEntity(URI url, HttpEntity<?> request, Class<T> responseType) throws RestClientException 

3、說明

          相比於RestTemplate,AsyncRestTemplate通過回撥機制能夠很好地非同步處理多個http請求,使得客戶端在主方法中不必等待伺服器響應,而是繼續執行後續程式碼,這樣就較大地提高了程式碼的執行效率,減少響應時間。

Ⅳ、基於AsyncRestTemplate實現批量非同步呼叫

          下面將介紹基於AsyncRestTemplate非同步呼叫的輕量級框架,說框架有點吹牛皮的感覺,不過程式碼結構整體上看起來還是挺清晰的,如有不妥之處,請提供寶貴建議。其主要分為5個部分:業務標識、請求、響應,非同步呼叫、請求處理。對應的類如下所示:

         業務標識:IEnum、UserEnum(具體業務標識)

         請求:BaseRequest、UserRequest(具體業務請求)、ConcreateWapper(請求包裝)

         響應:BaseResponse、UserRequest(具體業務響應)

         非同步呼叫:Templete、AbstractTemplete、AsynClientTemplete、CommonListenableCallBack

         請求處理:FutureTpDao

1、業務標識(使用列舉類來標識業務請求)

          使用列舉類能夠比較好地標識具體業務,但是列舉類無法繼承,這裡通過定義一個空的介面IEnum對其進行抽象。可能看起來會有所不妥,但是也算是一種解決方法吧。

//空的介面
package acync;

public interface IEnum {
}
//具體業務標識列舉類,實現了IEnum介面
//
public enum UserEnum implements IEnum {
    ADD,
    UPDATE,
    DELETE,
    MODIFY;
}

2、請求

          通常情況下,客戶端都是傳送http請求(使用url的方式)來獲取資料,這樣,我們主需要獲取請求的url地址即可。這裡,定義介面BaseRequest提供build方法來構建請求介面,對於具體的業務請求只需實現介面並構建請求url即可。

//基礎請求介面,提供構建URL方法
package acync;

public interface BaseRequest {
    public String build();
}
//具體的請求類,依據業務情況自行構建URL地址
package acync;

public class UserRequest implements BaseRequest {
    private static final String REQ_URL = "http://www.126.com";

    @Override
    public String build() {
        return REQ_URL;
    }

}

3、響應

          對於請求響應這裡也是定義抽象類BaseResponse,提供status來表示請求的響應狀態,而具體的業務響應只需要實現抽象類,自定義實現即可。(其中,BaseResponse抽象類可依據具體的業務框架來定義實現)

//基礎響應抽象類,提供狀態碼
package acync;

import java.io.Serializable;

public abstract class  BaseResponse implements Serializable{
    private String status;
}
//具體業務響應類
package acync;

public class UserResponse extends BaseResponse{
//TODO
}

4、非同步呼叫

          下面的所列程式碼是整個請求的核心程式碼。首先,定義模版介面,介面中只提供了若干主要方法,從整體上看,方法的引數為業務請求類和響應型別,返回值為泛型型別的ListenableFuture物件;其次,定義抽象類和具體的實現類;最後,進過請求處理即可獲取請求介面。這裡不累贅,見下方程式碼。

//非同步呼叫模板介面
package acync;

import java.util.Map;

import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFuture;

public interface Templete {
    <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType) throws Exception;

    <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
            ParameterizedTypeReference<T> responseType) throws Exception;

    <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType,
            Map<String, ?> uriVariables) throws Exception;

    <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
            ParameterizedTypeReference<T> responseType, Map<String, ?> uriVariables) throws Exception;
}
//非同步呼叫抽象類
//這裡僅僅提供少量的調取方法,可以自行擴充套件

package acync;

import java.util.Map;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.client.AsyncRestTemplate;

public abstract class AbstractTemplete implements Templete{
    public AsyncRestTemplate asyncRestTemplate;

    @Override
    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType)
            throws Exception {
        String url = baserequest.build();
        try {
            ListenableFuture<ResponseEntity<T>> t = asyncRestTemplate.getForEntity(url, responseType);
            return t;
        } catch (Exception e) {
            throw e;
        }
    }

    @Override
    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
            ParameterizedTypeReference<T> responseType) throws Exception {
        String url = baserequest.build();
        try {
            ListenableFuture<ResponseEntity<T>> t = asyncRestTemplate.exchange(url, HttpMethod.GET, null, responseType);
            return t;
        } catch (Exception e) {
            throw e;
        }
    }

    @Override
    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType,
            Map<String, ?> uriVariables) throws Exception {
        String url = baserequest.build();
        ListenableFuture<ResponseEntity<T>> t = null;
        try {
            t = asyncRestTemplate.exchange(url, HttpMethod.GET, null, responseType, uriVariables);
            return t;
        } catch (Exception e) {
            throw e;
        }
    }

    @Override
    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
            ParameterizedTypeReference<T> responseType, Map<String, ?> uriVariables) throws Exception {
        String url = baserequest.build();
        ListenableFuture<ResponseEntity<T>> t = null;
        try {
            t = asyncRestTemplate.exchange(url, HttpMethod.GET, null, responseType, uriVariables);
            return t;
        } catch (Exception e) {
            throw e;
        }
    }

    abstract void setTemplete(AsyncRestTemplate asyncRestTemplate);

}
// 具體的非同步呼叫實現類
package acync;

import java.util.Map;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.client.AsyncRestTemplate;

public class AsynClientTemplete extends AbstractTemplete {

    public AsynClientTemplete(AsyncRestTemplate template) {
        setTemplete(template);
    }

    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType)
            throws Exception {
        return super.getAsyncForObject(baserequest, responseType);
    }

    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
            ParameterizedTypeReference<T> responseType) throws Exception {
        return super.getAsyncForObject(baserequest, responseType);
    }

    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest, Class<T> responseType,
            Map<String, ?> uriVariables) throws Exception {
        return super.getAsyncForObject(baserequest, responseType, uriVariables);
    }

    public <T> ListenableFuture<ResponseEntity<T>> getAsyncForObject(BaseRequest baserequest,
            ParameterizedTypeReference<T> responseType, Map<String, ?> uriVariables) throws Exception {
        return super.getAsyncForObject(baserequest, responseType, uriVariables);
    }

    @Override
    void setTemplete(AsyncRestTemplate template) {
        asyncRestTemplate = template == null ? new AsyncRestTemplate() : template;
    }

}

5、請求處理

          上述四步都是為這一步做準備。請求處理這一步是請求的入口,在FutureTpDao中,通過getHttpData方法傳入請求包裝類ConcreateWapper,返回的Map物件Map<IEnum, Object>即為響應結果,只需依據具體的業務列舉類即可獲取對應的業務請求資料。

//包裝了具體的請求資訊
//其中的每一個Concreate對應一個具體的請求,baseEnum對應業務標識,variables為請求的額外引數,request為請求類和響應類組成的map

package acync;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class ConcreateWapper {
    private List<Concreate> wrapper = new ArrayList<Concreate>();

    public ConcreateWapper(){}

    public void setParams(IEnum baseEnum, Map<String, ?> variables, Map<BaseRequest, ?> request) {
        wrapper.add(new Concreate(baseEnum, variables, request));
    }

    public List<Concreate> getWrapper() {
        return wrapper;
    }

    public static class Concreate {
        private IEnum baseEnum;
        private Map<String, ?> variables;
        private Map<BaseRequest, ?> request;

        public Concreate(IEnum baseEnum, Map<String, ?> variables, Map<BaseRequest, ?> request) {
            this.baseEnum = baseEnum;
            this.variables = variables;
            this.request = request;
        }

        public IEnum getBaseEnum() {
            return baseEnum;
        }

        public void setBaseEnum(IEnum baseEnum) {
            this.baseEnum = baseEnum;
        }

        public Map<String, ?> getVariables() {
            return variables;
        }

        public void setVariables(Map<String, ?> variables) {
            this.variables = variables;
        }

        public Map<BaseRequest, ?> getRequest() {
            return request;
        }

        public void setRequest(Map<BaseRequest, ?> request) {
            this.request = request;
        }
    }
}
//實現ListenableFutureCallback,實現回撥功能
package acync;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.springframework.http.ResponseEntity;
import org.springframework.util.concurrent.ListenableFutureCallback;


public class CommonListenableCallBack<T> implements ListenableFutureCallback<T> {
    private IEnum type;
    private Map<IEnum, Object> resultValue;
    private volatile CountDownLatch latch;

    public CommonListenableCallBack(IEnum type, Map<IEnum, Object> resultValue, CountDownLatch latch) {
        this.type = type;
        this.resultValue = resultValue;
        this.latch = latch;
    }

    @Override
    public void onSuccess(T result) {
        ResponseEntity<T> re = (ResponseEntity<T>) result;
        if (re != null && re.getBody() != null) {
            T body = re.getBody();
            if (type != null) {
                resultValue.put(type, body);
            }
        }
        latch.countDown();
    }

    @Override
    public void onFailure(Throwable ex) {
        latch.countDown();
    }

}
//FutureTpDao的建構函式可以傳入自定義的AsyncRestTemplate,不傳的話就是預設的
//其中的getHttpData()方法傳入多個請求的包裝類ConcreateWapper,返回資料組成的Map
//其中Map中的key對應的是業務標識,value對應的是請求對應的結果類

package acync;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.client.AsyncRestTemplate;
import acync.ConcreateWapper.Concreate;

public class FutureTpDao {
    public AsynClientTemplete asynHttpClient;

    public FutureTpDao(){
        asynHttpClient = new AsynClientTemplete(null);
    }

    public FutureTpDao(AsyncRestTemplate tp) {
        asynHttpClient = new AsynClientTemplete(tp);
    }

    //獲取資料
    public Map<IEnum, Object> getHttpData(ConcreateWapper wapper) {
        if (wapper == null)
            return new HashMap<IEnum, Object>();
        final CountDownLatch latch = new CountDownLatch(wapper.getWrapper().size());
        final Map<IEnum, Object> result = new HashMap<IEnum, Object>();

        if (wapper.getWrapper() != null) {
            for (final Concreate wp : wapper.getWrapper()) {
                try {
                    Map<BaseRequest, ?> requestMap = wp.getRequest();
                    for (final BaseRequest tpRequestInfo : requestMap.keySet()) {
                        getHttpdata(wp, tpRequestInfo, latch, requestMap, result);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            try {
                latch.await();
            } catch (Exception e) {
                throw new RuntimeException(e.getMessage());
            }
        }
        return result;
    }

   //傳送http請求,獲取請求結果
    private void getHttpdata(Concreate wp, BaseRequest tpRequestInfo, CountDownLatch latch,
            Map<BaseRequest, ?> requestMap, Map<IEnum, Object> result) throws Exception {
        ListenableFuture<?> statResponse = null;

        if (requestMap.get(tpRequestInfo) instanceof ParameterizedTypeReference<?>) {
            ParameterizedTypeReference<?> responseType = (ParameterizedTypeReference<?>) requestMap.get(tpRequestInfo);
            statResponse = asynHttpClient.getAsyncForObject(tpRequestInfo, responseType, wp.getVariables());
        } else if (requestMap.get(tpRequestInfo) instanceof Class<?>) {
            Class<?> responseType = (Class<?>) requestMap.get(tpRequestInfo);
            statResponse = asynHttpClient.getAsyncForObject(tpRequestInfo, responseType);
        } else {
            throw new RuntimeException("requestType error...");
        }

        addCallBack(statResponse, wp.getBaseEnum(), latch, result);
    }

    //增加回調
    private <T> void addCallBack(ListenableFuture<T> statResponse, IEnum baseEnum, CountDownLatch latch,
            Map<IEnum, Object> result) {
        if (statResponse != null) {
            statResponse.addCallback(new CommonListenableCallBack<T>(baseEnum, result, latch));
        }
    }
}

6、示例

package acync;

import java.util.HashMap;
import java.util.Map;

/**
 * 示例
 * 示例僅僅是一個樣板,無法執行
 * 需要在web環境下執行,例如啟動tomcat伺服器並進行相關配置
 * @author liqqc
 *
 */
public class Demo {

    public static void main(String[] args) {
        ConcreateWapper wapper = new ConcreateWapper();

        Map<BaseRequest, Class<? extends BaseResponse>> request = new HashMap<BaseRequest, Class<? extends BaseResponse>>();
        request.put(new UserRequest(), new UserResponse().getClass());
        wapper.setParams(UserEnum.ADD, null, request);
        wapper.setParams(UserEnum.DELETE, null, request);
        wapper.setParams(UserEnum.UPDATE, null, request);
        wapper.setParams(UserEnum.MODIFY, null, request);

        FutureTpDao futureTpDao = new FutureTpDao();
        Map<IEnum, Object> futureData = futureTpDao.getHttpData(wapper);
        for (IEnum ienum : futureData.keySet()) {
            System.err.println(ienum + "=" + futureData.get(ienum));
        }
    }
}

Ⅴ、總結

          本文提供了一種基於AsyncRestTemplate來實現批量請求處理的一種方法。整個框架的結構還是比較清晰,由於技術能力有限,若干地方可能考慮有所欠缺,還需進一步深入研究改進。不管怎樣,希望本文對你有所幫助。如有疑問可以留言或郵件,謝謝。