實際專案中運用責任鏈模式
1 模式概要
1.1 簡介
- 責任鏈模式為請求建立一個接收者物件鏈,每個接收者都包含對另一個接收者的引用,如果一個物件不能處理該請求,那麼它會把請求傳給下一個接收者,依此類推
- 責任鏈模式避免了請求的傳送者和接收者耦合在一起,讓多個物件都有可能接收請求,將這些物件連成一條鏈,並且沿著這條鏈傳遞請求,直到有物件處理它為止。
1.2 責任鏈模式優缺點
優點
降低耦合度。它將請求的傳送者和接收者解耦
簡化了物件,使得物件不需要知道鏈的結構
增強給物件指派職責的靈活性,允許動態地新增或者刪除責任鏈
增加新的請求處理類方便
缺點
不能保證請求一定被接收;
系統性能將受到一定影響,除錯時不方便,可能會造成迴圈呼叫
2 模式結構
2.1 物件定義
Handler(抽象處理者) : 定義一個處理請求的介面,提供對後續處理者的引用
ConcreteHandler(具體處理者) : 抽象處理者的子類,處理使用者請求,可選將請求處理掉還是傳給下家;在具體處理者中可以訪問鏈中下一個物件,以便請求的轉發
2.2 類圖及設計

責任鏈
程式碼詳解:
抽象處理者
public abstract class Handler { protected Handler nextHandler; // 下一個責任鏈成員 public Handler getNextHandler() { return nextHandler; } public void setNextHandler(Handler nextHandler) { this.nextHandler = nextHandler; } // 處理傳遞過來的時間 public abstract void handleMessage(int type); }
具體處理者
在當前處理者物件無法處理時,將執行權傳給下一個處理者物件
public class ConcreteHandler1 extends Handler { @Override public void handleMessage(int type) { if (type == 1 || type == 3) { System.out.println("ConcreteHandler1解決了問題!"); } else { System.out.println("ConcreteHandler1解決不了問題......"); if (nextHandler != null) { nextHandler.handleMessage(type); } else { System.out.println("沒有人能處理這個訊息"); } } } } public class ConcreteHandler2 extends Handler { @Override public void handleMessage(int type) { if (type == 2 || type == 5) { System.out.println("ConcreteHandler2解決了問題!"); } else { System.out.println("ConcreteHandler2解決不了問題......"); if (nextHandler != null) { nextHandler.handleMessage(type); } else { System.out.println("沒有人能處理這個訊息"); } } } } public class ConcreteHandler3 extends Handler { @Override public void handleMessage(int type) { if (type == 4 || type == 6) { System.out.println("ConcreteHandler3解決了問題!"); } else { System.out.println("ConcreteHandler3解決不了問題......"); if (nextHandler != null) { nextHandler.handleMessage(type); } else { System.out.println("沒有人能處理這個訊息"); } } } }
Client 客戶端呼叫
// 初始化責任鏈:handler1 -> handler2 -> handler3 Handler handler1 = new ConcreteHandler1(); Handler handler2 = new ConcreteHandler2(); Handler handler3 = new ConcreteHandler3(); handler2.setNextHandler(handler3); handler1.setNextHandler(handler2); // 處理事件 System.out.println("--------------Message1"); handler1.handleMessage(1); System.out.println("--------------Message2"); handler1.handleMessage(2); System.out.println("--------------Message3"); handler1.handleMessage(4); System.out.println("--------------Message4"); handler1.handleMessage(7);
從上述模式可以知道,當我們需要多個 ifelse
做邏輯判斷的時候,可以引入,從而提高程式碼可維護性
2.3 適用場景:
- 有多個物件可以處理同一個請求,具體哪個物件處理該請求由執行時刻自動確定
- 在不明確指定接收者的情況下,向多個物件中的某一個物件提交一個請求
- 可動態指定一組物件的處理請求
3 Spring中的過濾器
我們來分析Spring中Filter的 載入流程和執行流程
3.1 初始化流程
初始化過濾器載入資料流如下:

filter初始化載入時序圖
關鍵性程式碼
public void onStartup(ServletContext servletContext) throws ServletException { Filter filter = getFilter(); Assert.notNull(filter, "Filter must not be null"); String name = getOrDeduceName(filter); if (!isEnabled()) { this.logger.info("Filter " + name + " was not registered (disabled)"); return; } //增加過濾器,資料流向 HashMap<String, FilterDef> filterDefs FilterRegistration.Dynamic added = servletContext.addFilter(name, filter); if (added == null) { this.logger.info("Filter " + name + " was not registered " + "(possibly already registered?)"); return; } //配置過濾器註冊資訊 configure(added); }
configure()
方法主要關注
if (isMatchAfter) { context.addFilterMap(filterMap); } else { context.addFilterMapBefore(filterMap); }
不管是資料走哪裡,最終會通過 System.arraycopy 陣列擴容,增加過濾器資訊到 private FilterMap[] array
這個陣列中。
最後呼叫StandardContext類中的 filterStart()
方法完成過濾器的初始化
3.2 執行過程
主要分兩步, 建立過濾器責任鏈 和 執行責任鏈
3.2.1 建立過程
建立filterChain方法主要在 ApplicationFilterFactory.createFilterChain(request, wrapper, servlet)
中,部分程式碼講解:
{ // 獲取過濾器上下文 StandardContext context = (StandardContext) wrapper.getParent(); //獲取載入的過濾器列表 FilterMap filterMaps[] = context.findFilterMaps(); // If there are no filter mappings, we are done if ((filterMaps == null) || (filterMaps.length == 0)) return (filterChain); //獲取匹配的過濾器對映資訊 DispatcherType dispatcher = (DispatcherType) request.getAttribute(Globals.DISPATCHER_TYPE_ATTR); String requestPath = null; Object attribute = request.getAttribute(Globals.DISPATCHER_REQUEST_PATH_ATTR); if (attribute != null){ requestPath = attribute.toString(); } String servletName = wrapper.getName(); // 每個過濾器配置對應處理的請求路徑資訊 for (int i = 0; i < filterMaps.length; i++) { if (!matchDispatcher(filterMaps[i] ,dispatcher)) { continue; } if (!matchFiltersURL(filterMaps[i], requestPath)) continue; ApplicationFilterConfig filterConfig = (ApplicationFilterConfig) context.findFilterConfig(filterMaps[i].getFilterName()); if (filterConfig == null) { // FIXME - log configuration problem continue; } filterChain.addFilter(filterConfig); } // 配置對應servletName資訊,最後返回過濾器鏈 for (int i = 0; i < filterMaps.length; i++) { if (!matchDispatcher(filterMaps[i] ,dispatcher)) { continue; } if (!matchFiltersServlet(filterMaps[i], servletName)) continue; ApplicationFilterConfig filterConfig = (ApplicationFilterConfig) context.findFilterConfig(filterMaps[i].getFilterName()); if (filterConfig == null) { // FIXME - log configuration problem continue; } filterChain.addFilter(filterConfig); } // Return the completed filter chain return filterChain; }
在StandardWrapperValue類的 invoke()
方法中呼叫ApplicationFilterChai類的 createFilterChain()
方法
在ApplicationFilterChai類的 createFilterChain()
方法中呼叫ApplicationFilterChain類的 addFilter()
方法
在ApplicationFilterChain類的 addFilter()
方法中給ApplicationFilterConfig陣列賦值

生成呼叫鏈
3.2.2 執行責任鏈
呼叫ApplicationFilterChain的 doFilter()
方法中最後會呼叫一個 internalDoFilter()
方法,目的就是執行ApplicationFilterChain中的全部過濾器,從程式碼中可以發現它呼叫了 doFilter
,而在 doFilter
又會呼叫 internalDoFilter
從而使所有Filter都得以呼叫
private void internalDoFilter(ServletRequest request,ServletResponse response) throws IOException, ServletException { // 如果存在下一個,繼續呼叫下一個過濾器 if (pos < n) { ApplicationFilterConfig filterConfig = filters[pos++]; try { Filter filter = filterConfig.getFilter(); if (request.isAsyncSupported() && "false".equalsIgnoreCase( filterConfig.getFilterDef().getAsyncSupported())) { request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE); } if( Globals.IS_SECURITY_ENABLED ) { final ServletRequest req = request; final ServletResponse res = response; Principal principal = ((HttpServletRequest) req).getUserPrincipal(); Object[] args = new Object[]{req, res, this}; SecurityUtil.doAsPrivilege ("doFilter", filter, classType, args, principal); } else { // 此處呼叫Filter的doFilter()方法/ 而 doFilter 又會呼叫 internalDoFilter 直到呼叫完所有的過濾器 filter.doFilter(request, response, this); } } catch (IOException | ServletException | RuntimeException e) { throw e; } catch (Throwable e) { e = ExceptionUtils.unwrapInvocationTargetException(e); ExceptionUtils.handleThrowable(e); throw new ServletException(sm.getString("filterChain.filter"), e); } return; } // 從最後一個開始呼叫 try { if (ApplicationDispatcher.WRAP_SAME_OBJECT) { lastServicedRequest.set(request); lastServicedResponse.set(response); } if (request.isAsyncSupported() && !servletSupportsAsync) { request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE); } // 包裝請求 if ((request instanceof HttpServletRequest) && (response instanceof HttpServletResponse) && Globals.IS_SECURITY_ENABLED ) { final ServletRequest req = request; final ServletResponse res = response; Principal principal = ((HttpServletRequest) req).getUserPrincipal(); Object[] args = new Object[]{req, res}; SecurityUtil.doAsPrivilege("service", servlet, classTypeUsedInService,args, principal); } else { servlet.service(request, response); } } catch (IOException | ServletException | RuntimeException e) { throw e; } catch (Throwable e) { e = ExceptionUtils.unwrapInvocationTargetException(e); ExceptionUtils.handleThrowable(e); throw new ServletException(sm.getString("filterChain.servlet"), e); } finally { if (ApplicationDispatcher.WRAP_SAME_OBJECT) { lastServicedRequest.set(null); lastServicedResponse.set(null); } } }
這樣,一個完整的過濾器鏈就形成,然後進行呼叫
4 專案中的實際運用
業務場景
我們在專案中使用了阿里的MQ訊息中介軟體,來加快請求的響應時間和非同步解耦處理。RocktMQ主要可以按Topic來分割槽,然後按Tag分組,不同的業務區分不同的 tag
比如:
簡訊類的訊息 messageTag
手機推送訊息 pushTag
延時任務訊息 delayTag
等等。。。
常規寫法
if(message.getTag() == messageTag){ //doSomething }else if(message.getTag() == pushTag){ //doSomething }else if (message.getTag() == delayTag){ //doSomething } ....
要是 ifelse
多了,最後會形成箭頭程式碼,最後連自己都不知道邏輯了。所以我就想到了責任鏈模式,剛好符合我們的實際場景。
具體設計方案如下:
設計UML類圖

類圖
抽象公共監聽器 ,主要用到了單例模式獲取常量
public abstract class AbstractCommonListener { private ParametersDO parametersDO; protected AbstractCommonListener() { //獲取單例物件 this.parametersDO = ParametersDO.getInstance(); } public final String getAccessKey() { return parametersDO.getAccessKey(); } public final String getSecretKey() { returnparametersDO.getSecretKey(); } public final String getConsumerId() { return parametersDO.getConsumerId(); } public final String getONSAddr() { return parametersDO.getONSAddr(); } public final String getTopic() { return parametersDO.getTopic(); } } class ParametersDO{ private static volatile boolean initialize = false; private String accessKey; private String secretKey; private String consumerId; private String ONSAddr; private String topic; private ParametersDO() { synchronized (ParametersDO.class) { if (!initialize) { this.accessKey = BundleUtil.getResult("mq.accesskey"); this.consumerId = BundleUtil.getResult("mq.public.consumer.id"); this.ONSAddr = BundleUtil.getResult("mq.ons.addr"); this.topic = BundleUtil.getResult("mq.public.topic"); this.secretKey = BundleUtil.getResult("mq.secretkey"); initialize = !initialize; } else { throw new RuntimeException("ParametersDO單例已被破壞"); } } } static ParametersDO getInstance() { return ListenerHolder.INSTANCE; } private static class ListenerHolder{ private static final ParametersDO INSTANCE = new ParametersDO(); } final String getAccessKey() { return accessKey; } final String getSecretKey() { return secretKey; } final String getConsumerId() { return consumerId; } final String getONSAddr() { return ONSAddr; } final String getTopic() { return topic; } }
具體監聽器 ,監聽器主要用於MQ監聽消費Topic
public class GlobalOrderListener extends AbstractCommonListener implements MessageOrderListener { @Override public OrderAction consume(Message message, ConsumeOrderContext context) { //新增處理消費tag 只需新增Handler AbstractMessageHandler<OrderAction, Message> handler = HandlerFactory.getHandlerResponsibilityChain( JpushOrderHandler.class, DelayRemoveOrderHandler.class); return handler.handleMessage(message); } }
正常情況下,我們會在 consume()
方法中區分tag來做不同業務的資料處理
抽象處理者
/** * @author nicky_chin [[email protected]] * @since --created on 2018/6/26 at 14:42 * 責任鏈抽象類 */ public abstract class AbstractMessageHandler<T, R> extends AbstractCommonListener { /** * 下一個責任鏈成員 */ protected AbstractMessageHandler<T, R> nextHandler; public AbstractMessageHandler getNextHandler() { return nextHandler; } public void setNextHandler(AbstractMessageHandler<T, R> nextHandler) { this.nextHandler = nextHandler; } /** * 處理傳遞過來的tag * @param message 表示式 * @return T */ public abstract T handleMessage(R message); }
具體處理者 :推送訊息Handler
@Slf4j public class JpushOrderHandler extends AbstractMessageHandler<OrderAction, Message> { @Override public OrderAction handleMessage(Message message) { String tagList = BundleUtil.getResult("mq.tag"); String[] tags = tagList.split(","); if (message.getTopic().equals(getTopic()) && Arrays.asList(tags).contains(message.getTag())) {//避免消費到其他訊息 json轉換報錯 log.info(" 監聽到推送訊息,[topic:" + message.getTopic() + "], [tag:" + message.getTag() + "]。開始解析..."); try { // res 是生產者傳過來的訊息內容 byte[] body = message.getBody(); String res = new String(body); String substring = res.substring(res.length() -1, res.length()); PushInfo info = JSON.parseObject(res.substring(0, res.length() - 1), PushInfo.class); if ("1".equals(substring)){ // 分組推送 CommonUtil.Jpush2SingleUserMq(info,true); }else { //多個使用者推送 CommonUtil.Jpush2SingleUserMq(info,false); } return OrderAction.Success; }catch (Exception e) { log.error("MessageListener.consume error:" + e.getMessage(), e); return OrderAction.Suspend; } } else { if (nextHandler == null) { log.info("未匹配到topic:{}, tag:{}跳過,",message.getTopic(), message.getTag()); return OrderAction.Success; } return nextHandler.handleMessage(message); } } }
具體處理者 :延時訂單處理Handler
@Slf4j public class DelayRemoveOrderHandler extends AbstractMessageHandler<OrderAction, Message> { private static Lock lock = new ReentrantLock(true); @Override public OrderAction handleMessage(Message message) { //消費延時訂單tag if (message.getTopic().equals(getTopic()) && message.getTag().equals(CommonConstants.TAG)) { log.info(" 監聽訂單刪除訊息,[topic:" + message.getTopic() + "], [tag:" + message.getTag() + "]。開始解析..."); //userId + UNDER_BAR + borrowOrderId try { String content = new String(message.getBody(), Charsets.UTF_8); log.info("消費內容 userId_borrowOrderId :{}", content); if (StringUtils.isEmpty(content)) { return OrderAction.Success; } String[] info = content.split(CommonConstants.UNDER_BAR); String userId = info[0]; String key = CommonConstants.CART_ID_LIST + userId; lock.tryLock(3, TimeUnit.SECONDS); //查詢使用者購物車列表 String orders = RedisUtil.getString(key); if (StringUtils.isEmpty(orders)) { return OrderAction.Success; } List<Integer> orderList = JSONObject.parseArray(orders, Integer.class); List<Integer> delList; String idStr = info[1]; //判斷是否是批量加入 if (idStr.startsWith(CommonConstants.LIST_MARK)) { String[] s = content.split(CommonConstants.LIST_MARK); delList = JSONObject.parseArray(s[1], Integer.class); } else { delList = Collections.singletonList(Integer.valueOf(info[1])); } orderList.removeAll(delList); RedisUtil.setString(key, GsonUtil.objectConvertJson(orderList)); log.info("刪除使用者:{},延時訂單:{},成功", userId, delList.toString()); return OrderAction.Success; } catch (Exception e) { //消費失敗,掛起當前佇列 log.error("延時訂單:{}消費異常", new String(message.getBody(), Charsets.UTF_8)); return OrderAction.Suspend; } finally { lock.unlock(); } } else { if (nextHandler == null) { log.info("未匹配到topic:{}, tag:{}跳過,",message.getTopic(), message.getTag()); return OrderAction.Success; } return nextHandler.handleMessage(message); } } }
模式工廠 HandlerFactory
public final class HandlerFactory { private static TypeConverterManager typeConverterManager = JoddBean.get().typeConverterManager(); public static<T, R>AbstractMessageHandler newJpushOrderHandler(){ return new JpushOrderHandler(); }; public static <T, R>AbstractMessageHandler newDelayRemoveOrderHandler(){ return new DelayRemoveOrderHandler(); } /** * 責任鏈模式 */ @SafeVarargs public static <T, R>AbstractMessageHandler<T, R> getHandlerResponsibilityChain(Class< ? extends AbstractMessageHandler<T, R>> ... handlers ) { Preconditions.checkNotNull(handlers, "handler列表不能為空"); if (handlers.length == CommonConstants.TRUE) { return BeanUtils.instantiate(handlers[CommonConstants.FIRST_ELEMENT]); } List<Object> list = Arrays.stream(handlers).map(BeanUtils::instantiate).collect(Collectors.toList()); AbstractMessageHandler<T, R> result = null; for (int i = 1; i < list.size(); i++) { AbstractMessageHandler<T, R> pre = typeConverterManager.convertType(list.get(i - 1), handlers[i - 1]); AbstractMessageHandler<T, R> cur = typeConverterManager.convertType(list.get(i), handlers[i]); cur.setNextHandler(pre); result = cur; } return result; } }
getHandlerResponsibilityChain()
主要是建立責任鏈,動態生成自己想要的邏輯責任鏈
客戶端呼叫
public class RoborderConsumerAdapter{ private OrderConsumer orderConsumer; public RoborderConsumerAdapter(OrderConsumer orderConsumer) { Assert.notNull(orderConsumer, "orderConsumer is null"); this.orderConsumer = orderConsumer; } /** * 消費 */ public void consumerMessages(){ AbstractCommonListener listener = BeanUtils.instantiate(GlobalOrderListener.class); this.orderConsumer.subscribe(listener.getTopic(), "*", (MessageOrderListener) listener); } }
按這種設計方式,如果有一個新的業務處理場景,只需新增新的一個Handler實現抽象處理者就好,然後呼叫 getHandlerResponsibilityChain()
的時候,加入想要使用的Handler,就能處理,這樣不會導致多人維護程式碼時,出現邏輯混亂問題,業務直接解耦,減少開發和維護成本
Reference
ofollow,noindex">《JAVA與模式》之責任鏈模式
原創文章,轉載請註明:轉載自 併發程式設計網 – ifeve.com 本文連結地址: 實際專案中運用責任鏈模式