1. 程式人生 > >實際專案中運用責任鏈模式 | 併發程式設計網

實際專案中運用責任鏈模式 | 併發程式設計網

1 模式概要

1.1 簡介

  • 責任鏈模式為請求建立一個接收者物件鏈,每個接收者都包含對另一個接收者的引用,如果一個物件不能處理該請求,那麼它會把請求傳給下一個接收者,依此類推
  • 責任鏈模式避免了請求的傳送者和接收者耦合在一起,讓多個物件都有可能接收請求,將這些物件連成一條鏈,並且沿著這條鏈傳遞請求,直到有物件處理它為止。

1.2 責任鏈模式優缺點

優點

降低耦合度。它將請求的傳送者和接收者解耦 簡化了物件,使得物件不需要知道鏈的結構 增強給物件指派職責的靈活性,允許動態地新增或者刪除責任鏈 增加新的請求處理類方便

缺點

不能保證請求一定被接收; 系統性能將受到一定影響,除錯時不方便,可能會造成迴圈呼叫

2 模式結構

2.1 物件定義

Handler(抽象處理者): 定義一個處理請求的介面,提供對後續處理者的引用 ConcreteHandler(具體處理者): 抽象處理者的子類,處理使用者請求,可選將請求處理掉還是傳給下家;在具體處理者中可以訪問鏈中下一個物件,以便請求的轉發

2.2 類圖及設計

責任鏈

程式碼詳解:

抽象處理者

public abstract class Handler {

    protected Handler nextHandler; // 下一個責任鏈成員

    public Handler getNextHandler() {
        return
nextHandler; } public void setNextHandler(Handler nextHandler) { this.nextHandler = nextHandler; } // 處理傳遞過來的時間 public abstract void handleMessage(int type); }

具體處理者 在當前處理者物件無法處理時,將執行權傳給下一個處理者物件

public class ConcreteHandler1 extends Handler {

    @Override
    public
void handleMessage(int type)
{ if (type == 1 || type == 3) { System.out.println("ConcreteHandler1解決了問題!"); } else { System.out.println("ConcreteHandler1解決不了問題......"); if (nextHandler != null) { nextHandler.handleMessage(type); } else { System.out.println("沒有人能處理這個訊息"); } } } } public class ConcreteHandler2 extends Handler { @Override public void handleMessage(int type) { if (type == 2 || type == 5) { System.out.println("ConcreteHandler2解決了問題!"); } else { System.out.println("ConcreteHandler2解決不了問題......"); if (nextHandler != null) { nextHandler.handleMessage(type); } else { System.out.println("沒有人能處理這個訊息"); } } } } public class ConcreteHandler3 extends Handler { @Override public void handleMessage(int type) { if (type == 4 || type == 6) { System.out.println("ConcreteHandler3解決了問題!"); } else { System.out.println("ConcreteHandler3解決不了問題......"); if (nextHandler != null) { nextHandler.handleMessage(type); } else { System.out.println("沒有人能處理這個訊息"); } } } }

Client 客戶端呼叫

    // 初始化責任鏈:handler1 -> handler2 -> handler3
        Handler handler1 = new ConcreteHandler1();
        Handler handler2 = new ConcreteHandler2();
        Handler handler3 = new ConcreteHandler3();
        handler2.setNextHandler(handler3);
        handler1.setNextHandler(handler2);
        // 處理事件
        System.out.println("--------------Message1");
        handler1.handleMessage(1);
        System.out.println("--------------Message2");
        handler1.handleMessage(2);
        System.out.println("--------------Message3");
        handler1.handleMessage(4);
        System.out.println("--------------Message4");
        handler1.handleMessage(7);

從上述模式可以知道,當我們需要多個ifelse做邏輯判斷的時候,可以引入,從而提高程式碼可維護性

2.3 適用場景:

  • 有多個物件可以處理同一個請求,具體哪個物件處理該請求由執行時刻自動確定
  • 在不明確指定接收者的情況下,向多個物件中的某一個物件提交一個請求
  • 可動態指定一組物件的處理請求

3 Spring中的過濾器

我們來分析Spring中Filter的載入流程和執行流程

3.1 初始化流程

初始化過濾器載入資料流如下:

filter初始化載入時序圖

關鍵性程式碼

public void onStartup(ServletContext servletContext) throws ServletException {
        Filter filter = getFilter();
        Assert.notNull(filter, "Filter must not be null");
        String name = getOrDeduceName(filter);
        if (!isEnabled()) {
            this.logger.info("Filter " + name + " was not registered (disabled)");
            return;
        }
        //增加過濾器,資料流向 HashMap<String, FilterDef> filterDefs
        FilterRegistration.Dynamic added = servletContext.addFilter(name, filter);
        if (added == null) {
            this.logger.info("Filter " + name + " was not registered "
                    + "(possibly already registered?)");
            return;
        }
                //配置過濾器註冊資訊
        configure(added);
    }

configure()方法主要關注

  if (isMatchAfter) {
                context.addFilterMap(filterMap);
            } else {
                context.addFilterMapBefore(filterMap);
            }

不管是資料走哪裡,最終會通過 System.arraycopy 陣列擴容,增加過濾器資訊到private FilterMap[] array 這個陣列中。 最後呼叫StandardContext類中的 filterStart() 方法完成過濾器的初始化

3.2 執行過程

主要分兩步,建立過濾器責任鏈 和 執行責任鏈

3.2.1 建立過程

建立filterChain方法主要在ApplicationFilterFactory.createFilterChain(request, wrapper, servlet) 中,部分程式碼講解:

{
  // 獲取過濾器上下文
        StandardContext context = (StandardContext) wrapper.getParent();
       //獲取載入的過濾器列表
        FilterMap filterMaps[] = context.findFilterMaps();

        // If there are no filter mappings, we are done
        if ((filterMaps == null) || (filterMaps.length == 0))
            return (filterChain);

        //  獲取匹配的過濾器對映資訊
        DispatcherType dispatcher =
                (DispatcherType) request.getAttribute(Globals.DISPATCHER_TYPE_ATTR);

        String requestPath = null;
        Object attribute = request.getAttribute(Globals.DISPATCHER_REQUEST_PATH_ATTR);
        if (attribute != null){
            requestPath = attribute.toString();
        }

        String servletName = wrapper.getName();

        // 每個過濾器配置對應處理的請求路徑資訊
        for (int i = 0; i < filterMaps.length; i++) {
            if (!matchDispatcher(filterMaps[i] ,dispatcher)) {
                continue;
            }
            if (!matchFiltersURL(filterMaps[i], requestPath))
                continue;
            ApplicationFilterConfig filterConfig = (ApplicationFilterConfig)
                context.findFilterConfig(filterMaps[i].getFilterName());
            if (filterConfig == null) {
                // FIXME - log configuration problem
                continue;
            }
            filterChain.addFilter(filterConfig);
        }

        // 配置對應servletName資訊,最後返回過濾器鏈
        for (int i = 0; i < filterMaps.length; i++) {
            if (!matchDispatcher(filterMaps[i] ,dispatcher)) {
                continue;
            }
            if (!matchFiltersServlet(filterMaps[i], servletName))
                continue;
            ApplicationFilterConfig filterConfig = (ApplicationFilterConfig)
                context.findFilterConfig(filterMaps[i].getFilterName());
            if (filterConfig == null) {
                // FIXME - log configuration problem
                continue;
            }
            filterChain.addFilter(filterConfig);
        }

        // Return the completed filter chain
        return filterChain;
}

在StandardWrapperValue類的invoke()方法中呼叫ApplicationFilterChai類的createFilterChain()方法 在ApplicationFilterChai類的createFilterChain()方法中呼叫ApplicationFilterChain類的addFilter()方法 在ApplicationFilterChain類的addFilter()方法中給ApplicationFilterConfig陣列賦值

生成呼叫鏈

3.2.2 執行責任鏈

呼叫ApplicationFilterChain的 doFilter() 方法中最後會呼叫一個internalDoFilter() 方法,目的就是執行ApplicationFilterChain中的全部過濾器,從程式碼中可以發現它呼叫了 doFilter ,而在 doFilter 又會呼叫internalDoFilter 從而使所有Filter都得以呼叫

private void internalDoFilter(ServletRequest request,ServletResponse response) throws IOException, ServletException {

    // 如果存在下一個,繼續呼叫下一個過濾器
    if (pos < n) {
        ApplicationFilterConfig filterConfig = filters[pos++];
        try {
            Filter filter = filterConfig.getFilter();

            if (request.isAsyncSupported() && "false".equalsIgnoreCase(
                    filterConfig.getFilterDef().getAsyncSupported())) {
                request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE);
            }
            if( Globals.IS_SECURITY_ENABLED ) {
                final ServletRequest req = request;
                final ServletResponse res = response;
                Principal principal =
                    ((HttpServletRequest) req).getUserPrincipal();

                Object[] args = new Object[]{req, res, this};
                SecurityUtil.doAsPrivilege ("doFilter", filter, classType, args, principal);
            } else {
                // 此處呼叫Filter的doFilter()方法  / 而 doFilter 又會呼叫 internalDoFilter 直到呼叫完所有的過濾器
                filter.doFilter(request, response, this);
            }
        } catch (IOException | ServletException | RuntimeException e) {
            throw e;
        } catch (Throwable e) {
            e = ExceptionUtils.unwrapInvocationTargetException(e);
            ExceptionUtils.handleThrowable(e);
            throw new ServletException(sm.getString("filterChain.filter"), e);
        }
        return;
    }

    // 從最後一個開始呼叫
    try {
        if (ApplicationDispatcher.WRAP_SAME_OBJECT) {
            lastServicedRequest.set(request);
            lastServicedResponse.set(response);
        }

        if (request.isAsyncSupported() && !servletSupportsAsync) {
            request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR,
                    Boolean.FALSE);
        }
        // 包裝請求
        if ((request instanceof HttpServletRequest) &&
                (response instanceof HttpServletResponse) &&
                Globals.IS_SECURITY_ENABLED ) {
            final ServletRequest req = request;
            final ServletResponse res = response;
            Principal principal =
                ((HttpServletRequest) req).getUserPrincipal();
            Object[] args = new Object[]{req, res};
            SecurityUtil.doAsPrivilege("service", servlet, classTypeUsedInService,args, principal);
        } else {
            servlet.service(request, response);
        }
    } catch (IOException | ServletException | RuntimeException e) {
        throw e;
    } catch (Throwable e) {
        e = ExceptionUtils.unwrapInvocationTargetException(e);
        ExceptionUtils.handleThrowable(e);
        throw new ServletException(sm.getString("filterChain.servlet"), e);
    } finally {
        if (ApplicationDispatcher.WRAP_SAME_OBJECT) {
            lastServicedRequest.set(null);
            lastServicedResponse.set(null);
        }
    }
}

這樣,一個完整的過濾器鏈就形成,然後進行呼叫

4 專案中的實際運用

業務場景

我們在專案中使用了阿里的MQ訊息中介軟體,來加快請求的響應時間和非同步解耦處理。RocktMQ主要可以按Topic來分割槽,然後按Tag分組,不同的業務區分不同的tag 比如: 簡訊類的訊息 messageTag 手機推送訊息 pushTag 延時任務訊息 delayTag 等等。。。

常規寫法

if(message.getTag() == messageTag){
 //doSomething
}else if(message.getTag() == pushTag){
 //doSomething
}else if (message.getTag() == delayTag){
 //doSomething
}
....

要是ifelse多了,最後會形成箭頭程式碼,最後連自己都不知道邏輯了。所以我就想到了責任鏈模式,剛好符合我們的實際場景。

具體設計方案如下:

設計UML類圖

類圖

抽象公共監聽器,主要用到了單例模式獲取常量

public abstract class AbstractCommonListener {

    private ParametersDO parametersDO;

    protected AbstractCommonListener() {
        //獲取單例物件
        this.parametersDO = ParametersDO.getInstance();
    }

     public final String getAccessKey() {
        return parametersDO.getAccessKey();
    }

    public final String getSecretKey() {
        return  parametersDO.getSecretKey();
    }

    public final String getConsumerId() {
        return parametersDO.getConsumerId();
    }

    public final String getONSAddr() {
        return parametersDO.getONSAddr();
    }

    public final String getTopic() {
        return parametersDO.getTopic();
    }


}


class ParametersDO{

    private static volatile boolean initialize = false;

    private String accessKey;

    private String secretKey;

    private String consumerId;

    private String ONSAddr;

    private String topic;

    private ParametersDO() {

        synchronized (ParametersDO.class) {
            if (!initialize) {
                this.accessKey = BundleUtil.getResult("mq.accesskey");
                this.consumerId = BundleUtil.getResult("mq.public.consumer.id");
                this.ONSAddr = BundleUtil.getResult("mq.ons.addr");
                this.topic = BundleUtil.getResult("mq.public.topic");
                this.secretKey = BundleUtil.getResult("mq.secretkey");
                initialize = !initialize;
            } else {
                throw new RuntimeException("ParametersDO單例已被破壞");
            }

        }

    }

    static ParametersDO getInstance() {
        return ListenerHolder.INSTANCE;
    }

    private static class ListenerHolder{
        private static final ParametersDO INSTANCE = new ParametersDO();
    }


    final String getAccessKey() {
        return accessKey;
    }

    final String getSecretKey() {
        return secretKey;
    }

    final String getConsumerId() {
        return consumerId;
    }

    final String getONSAddr() {
        return ONSAddr;
    }

    final String getTopic() {
        return topic;
    }

}

具體監聽器,監聽器主要用於MQ監聽消費Topic

public class GlobalOrderListener extends AbstractCommonListener implements MessageOrderListener {

    @Override
    public OrderAction consume(Message message, ConsumeOrderContext context) {

        //新增處理消費tag 只需新增Handler
        AbstractMessageHandler<OrderAction, Message> handler = HandlerFactory.getHandlerResponsibilityChain(
                        JpushOrderHandler.class,
                        DelayRemoveOrderHandler.class);
        return handler.handleMessage(message);
    }
}

正常情況下,我們會在consume()方法中區分tag來做不同業務的資料處理

抽象處理者

/**
 * @author nicky_chin [[email protected]]
 * @since --created on 2018/6/26 at 14:42
 * 責任鏈抽象類
 */
public abstract class AbstractMessageHandler<T, R> extends AbstractCommonListener {

    /**
     * 下一個責任鏈成員
     */
    protected AbstractMessageHandler<T, R> nextHandler;

    public AbstractMessageHandler getNextHandler() {
        return nextHandler;
    }

    public void setNextHandler(AbstractMessageHandler<T, R> nextHandler) {
        this.nextHandler = nextHandler;
    }

    /**
     * 處理傳遞過來的tag
     * @param message 表示式
     * @return T
     */
    public abstract T handleMessage(R message);

}

具體處理者 :推送訊息Handler

@Slf4j
public class JpushOrderHandler extends AbstractMessageHandler<OrderAction, Message> {

    @Override
    public OrderAction handleMessage(Message message) {
        String tagList = BundleUtil.getResult("mq.tag");
        String[] tags = tagList.split(",");
        if (message.getTopic().equals(getTopic()) && Arrays.asList(tags).contains(message.getTag())) {  //避免消費到其他訊息 json轉換報錯
            log.info(" 監聽到推送訊息,[topic:" + message.getTopic() + "], [tag:" + message.getTag() + "]。開始解析...");
            try {
                // res 是生產者傳過來的訊息內容
                byte[] body = message.getBody();
                String res = new String(body);
                String substring = res.substring(res.length() -1, res.length());
                PushInfo info = JSON.parseObject(res.substring(0, res.length() - 1), PushInfo.class);
  
                if ("1".equals(substring)){
                    // 分組推送
                    CommonUtil.Jpush2SingleUserMq(info,true);
                 }else {
                 //  多個使用者推送
                    CommonUtil.Jpush2SingleUserMq(info,false);
                }
                return OrderAction.Success;
            }catch (Exception e) {
                log.error("MessageListener.consume error:" + e.getMessage(), e);
                return OrderAction.Suspend;
            }
        } else {
           if (nextHandler == null) {
               log.info("未匹配到topic:{}, tag:{}跳過,",message.getTopic(), message.getTag());
               return OrderAction.Success;
           }
           return nextHandler.handleMessage(message);
       }
    }
}

具體處理者 :延時訂單處理Handler

@Slf4j
public class DelayRemoveOrderHandler extends AbstractMessageHandler<OrderAction, Message> {

    private static Lock lock = new ReentrantLock(true);

    @Override
    public OrderAction handleMessage(Message message) {
        //消費延時訂單tag
        if (message.getTopic().equals(getTopic()) && message.getTag().equals(CommonConstants.TAG)) {
            log.info(" 監聽訂單刪除訊息,[topic:" + message.getTopic() + "], [tag:" + message.getTag() + "]。開始解析...");
            //userId + UNDER_BAR + borrowOrderId
            try {
                String content = new String(message.getBody(), Charsets.UTF_8);
                log.info("消費內容 userId_borrowOrderId :{}", content);
                if (StringUtils.isEmpty(content)) {
                    return OrderAction.Success;
                }
                String[] info = content.split(CommonConstants.UNDER_BAR);
                String userId = info[0];
                String key = CommonConstants.CART_ID_LIST + userId;

                lock.tryLock(3, TimeUnit.SECONDS);
                //查詢使用者購物車列表
                String orders = RedisUtil.getString(key);
                if (StringUtils.isEmpty(orders)) {
                    return OrderAction.Success;
                }
                List<Integer> orderList = JSONObject.parseArray(orders, Integer.class);
                List<Integer> delList;
                String idStr = info[1];
                //判斷是否是批量加入
                if (idStr.startsWith(CommonConstants.LIST_MARK)) {
                    String[] s = content.split(CommonConstants.LIST_MARK);
                    delList = JSONObject.parseArray(s[1], Integer.class);
                } else {
                    delList = Collections.singletonList(Integer.valueOf(info[1]));
                }
                orderList.removeAll(delList);
                RedisUtil.setString(key, GsonUtil.objectConvertJson(orderList));
                log.info("刪除使用者:{},延時訂單:{},成功", userId, delList.toString());
                return OrderAction.Success;
            } catch (Exception e) {
                //消費失敗,掛起當前佇列
                log.error("延時訂單:{}消費異常", new String(message.getBody(), Charsets.UTF_8));
                return OrderAction.Suspend;
            } finally {
                lock.unlock();
            }

        } else {
            if (nextHandler == null) {
                log.info("未匹配到topic:{}, tag:{}跳過,",message.getTopic(), message.getTag());
                return OrderAction.Success;
            }
           return nextHandler.handleMessage(message);
        }
    }
}

模式工廠 HandlerFactory

public final class HandlerFactory {

    private static TypeConverterManager typeConverterManager = JoddBean.get().typeConverterManager();


    public static  <T, R>AbstractMessageHandler newJpushOrderHandler(){
        return new JpushOrderHandler();
    };

    public static <T, R>AbstractMessageHandler newDelayRemoveOrderHandler(){
        return new DelayRemoveOrderHandler();
    }

    /**
     * 責任鏈模式
     */
    @SafeVarargs
    public static <T, R>AbstractMessageHandler<T, R> getHandlerResponsibilityChain(Class< ? extends AbstractMessageHandler<T, R>> ... handlers ) {

        Preconditions.checkNotNull(handlers, "handler列表不能為空");
        if (handlers.length == CommonConstants.TRUE) {
            return BeanUtils.instantiate(handlers[CommonConstants.FIRST_ELEMENT]);
        }
        List<Object> list = Arrays.stream(handlers).map(BeanUtils::instantiate).collect(Collectors.toList());
        AbstractMessageHandler<T, R> result = null;
        for (int i = 1; i < list.size(); i++) {
            AbstractMessageHandler<T, R> pre = typeConverterManager.convertType(list.get(i - 1), handlers[i - 1]);
            AbstractMessageHandler<T, R> cur = typeConverterManager.convertType(list.get(i), handlers[i]);
            cur.setNextHandler(pre);
            result = cur;
        }
        return result;
    }
}

getHandlerResponsibilityChain() 主要是建立責任鏈,動態生成自己想要的邏輯責任鏈

客戶端呼叫

public class RoborderConsumerAdapter{

    private OrderConsumer orderConsumer;

    public RoborderConsumerAdapter(OrderConsumer orderConsumer) {
        Assert.notNull(orderConsumer, "orderConsumer is null");
        this.orderConsumer = orderConsumer;
    }

    /**
     * 消費
     */
    public void consumerMessages(){
        AbstractCommonListener listener = BeanUtils.instantiate(GlobalOrderListener.class);
        this.orderConsumer.subscribe(listener.getTopic(), "*", (MessageOrderListener) listener);
    }

}

按這種設計方式,如果有一個新的業務處理場景,只需新增新的一個Handler實現抽象處理者就好,然後呼叫getHandlerResponsibilityChain()的時候,加入想要使用的Handler,就能處理,這樣不會導致多人維護程式碼時,出現邏輯混亂問題,業務直接解耦,減少開發和維護成本

Reference

nicky_chin野生Java程式設計師逆襲之路個人部落格:https://nicky-chen.github.io/
nicky_chin

Latest posts by nicky_chin