1. 程式人生 > >技術與架構之zuul(spring-cloud-zuul)原始碼解讀

技術與架構之zuul(spring-cloud-zuul)原始碼解讀

  1. zuul原始碼裡面一大堆test程式碼
  2. zuul好多程式碼都是11年的

閘道器是什麼,為什麼需要閘道器

百度百科是這樣解釋的:閘道器(Gateway)又稱網間聯結器、協議轉換器。閘道器在網路層以上實現網路互連,是複雜的網路互連裝置,僅用於兩個高層協議不同的網路互連。仔細看下下面的圖。

  1. 閘道器負責接收請求,並把請求轉發給後端正確的服務。這是說明與上面的圖是不是很像Nginx的功能,不錯。閘道器最基礎的功能是反向代理。

zuul的基本架構設計

zuul核心模組是zuul-core。spring-cloud集成了zuul實現的jar是spring-cloud-netflix-zuul。

zuul-core

  1. filter
  2. FilterRegistry
  3. FilterLoader
  4. FilterProcessor
  5. ZuulRunner

當一個請求來了,這是類的呼叫鏈。

filter

filter是zuul核心執行流程,IZuulFilter是介面,主要是實現run方法。而ZuulFilter又兩個抽象方法filterType( filter型別)和filterOrder(filter的排序),麻煩仔細看下ZuulFilter的runFilter方法,主要是執行run方法,並且對run的執行行為進行處理。

public interface IZuulFilter {

    boolean shouldFilter();

    Object run() throws ZuulException;

}

public abstract class ZuulFilter implements IZuulFilter, Comparable<ZuulFilter> {

    private final AtomicReference<DynamicBooleanProperty> filterDisabledRef = new AtomicReference<>();

    abstract public String filterType();

    abstract public int filterOrder();

    public boolean isStaticFilter() {
        return true;
    }
    public String disablePropertyName() {
        return "zuul." + this.getClass().getSimpleName() + "." + filterType() + ".disable";
    }

    public boolean isFilterDisabled() {
        filterDisabledRef.compareAndSet(null, DynamicPropertyFactory.getInstance().getBooleanProperty(disablePropertyName(), false));
        return filterDisabledRef.get().get();
    }

    public ZuulFilterResult runFilter() {
        ZuulFilterResult zr = new ZuulFilterResult();
        if (!isFilterDisabled()) {
            if (shouldFilter()) {
                Tracer t = TracerFactory.instance().startMicroTracer("ZUUL::" + this.getClass().getSimpleName());
                try {
                    Object res = run();
                    zr = new ZuulFilterResult(res, ExecutionStatus.SUCCESS);
                } catch (Throwable e) {
                    t.setName("ZUUL::" + this.getClass().getSimpleName() + " failed");
                    zr = new ZuulFilterResult(ExecutionStatus.FAILED);
                    zr.setException(e);
                } finally {
                    t.stopAndLog();
                }
            } else {
                zr = new ZuulFilterResult(ExecutionStatus.SKIPPED);
            }
        }
        return zr;
    }
}

public class FilterConstants {
	public static final String ERROR_TYPE = "error";

	public static final String POST_TYPE = "post";

	public static final String PRE_TYPE = "pre";

	public static final String ROUTE_TYPE = "route";
}
FilterRegistry

filterRegistry是filter的註冊物件,非常簡單。只需要注意下靜態成員變數INSTANCE。注意:zuul2.0以後可能會刪除INSTANCE

public class FilterRegistry{
    private static final FilterRegistry INSTANCE = new FilterRegistry();

    public static final FilterRegistry instance() {
        return INSTANCE;
    }

    private final ConcurrentHashMap<String, ZuulFilter> filters = new ConcurrentHashMap<String, ZuulFilter>();

    public ZuulFilter remove(String key) {
        return this.filters.remove(key);
    }

    public ZuulFilter get(String key) {
        return this.filters.get(key);
    }

    public void put(String key, ZuulFilter filter) {
        this.filters.putIfAbsent(key, filter);
    }

    public int size() {
        return this.filters.size();
    }

    public Collection<ZuulFilter> getAllFilters() {
        return this.filters.values();
    }
}
FilterLoader

filterLoader 負責動態載入filter。載入方法有通過className,File物件,Groovy語言原始碼。如果是Netflix這樣的大公司有一套管理系統,負責FilterLoader就很雞肋了。

public class FilterLoader {
    final static FilterLoader INSTANCE = new FilterLoader();

    private static final Logger LOG = LoggerFactory.getLogger(FilterLoader.class);

    private final ConcurrentHashMap<String, Long> filterClassLastModified = new ConcurrentHashMap<String, Long>();
    private final ConcurrentHashMap<String, String> filterClassCode = new ConcurrentHashMap<String, String>();
    private final ConcurrentHashMap<String, String> filterCheck = new ConcurrentHashMap<String, String>();
    private final ConcurrentHashMap<String, List<ZuulFilter>> hashFiltersByType = new ConcurrentHashMap<String, List<ZuulFilter>>();

    private FilterRegistry filterRegistry = FilterRegistry.instance();

    static DynamicCodeCompiler COMPILER;
    
    static FilterFactory FILTER_FACTORY = new DefaultFilterFactory();

    public void setCompiler(DynamicCodeCompiler compiler) {
        COMPILER = compiler;
    }

    public void setFilterRegistry(FilterRegistry r) {
        this.filterRegistry = r;
    }

    public void setFilterFactory(FilterFactory factory) {
        FILTER_FACTORY = factory;
    }
    
    public static FilterLoader getInstance() {
        return INSTANCE;
    }

    public ZuulFilter getFilter(String sCode, String sName) throws Exception {

        if (filterCheck.get(sName) == null) {
            filterCheck.putIfAbsent(sName, sName);
            if (!sCode.equals(filterClassCode.get(sName))) {
                LOG.info("reloading code " + sName);
                filterRegistry.remove(sName);
            }
        }
        ZuulFilter filter = filterRegistry.get(sName);
        if (filter == null) {
            Class clazz = COMPILER.compile(sCode, sName);
            if (!Modifier.isAbstract(clazz.getModifiers())) {
                filter = (ZuulFilter) FILTER_FACTORY.newInstance(clazz);
            }
        }
        return filter;

    }

    public int filterInstanceMapSize() {
        return filterRegistry.size();
    }


    public boolean putFilter(File file) throws Exception {
        String sName = file.getAbsolutePath() + file.getName();
        if (filterClassLastModified.get(sName) != null && (file.lastModified() != filterClassLastModified.get(sName))) {
            LOG.debug("reloading filter " + sName);
            filterRegistry.remove(sName);
        }
        ZuulFilter filter = filterRegistry.get(sName);
        if (filter == null) {
            Class clazz = COMPILER.compile(file);
            if (!Modifier.isAbstract(clazz.getModifiers())) {
                filter = (ZuulFilter) FILTER_FACTORY.newInstance(clazz);
                List<ZuulFilter> list = hashFiltersByType.get(filter.filterType());
                if (list != null) {
                    hashFiltersByType.remove(filter.filterType()); //rebuild this list
                }
                filterRegistry.put(file.getAbsolutePath() + file.getName(), filter);
                filterClassLastModified.put(sName, file.lastModified());
                return true;
            }
        }

        return false;
    }

    public List<ZuulFilter> getFiltersByType(String filterType) {

        List<ZuulFilter> list = hashFiltersByType.get(filterType);
        if (list != null) return list;

        list = new ArrayList<ZuulFilter>();

        Collection<ZuulFilter> filters = filterRegistry.getAllFilters();
        for (Iterator<ZuulFilter> iterator = filters.iterator(); iterator.hasNext(); ) {
            ZuulFilter filter = iterator.next();
            if (filter.filterType().equals(filterType)) {
                list.add(filter);
            }
        }
        Collections.sort(list); // sort by priority

        hashFiltersByType.putIfAbsent(filterType, list);
        return list;
    }
}
FilterProcessor

基本呼叫鏈postRoute,error,route,preRoute --> runFilters-->processZuulFilter【重點方法】 。獲得Filter的方式: runFilters-> FilterLoader.getInstance().getFiltersByType(sType)

public class FilterProcessor {

    static FilterProcessor INSTANCE = new FilterProcessor();
    protected static final Logger logger = LoggerFactory.getLogger(FilterProcessor.class);

    private FilterUsageNotifier usageNotifier;

    public void postRoute() throws ZuulException {
        try {
            runFilters("post");
        } catch (ZuulException e) {
            throw e;
        } catch (Throwable e) {
            throw new ZuulException(e, 500, "UNCAUGHT_EXCEPTION_IN_POST_FILTER_" + e.getClass().getName());
        }
    }


    public void error() {
        try {
            runFilters("error");
        } catch (Throwable e) {
            logger.error(e.getMessage(), e);
        }
    }


    public void route() throws ZuulException {
        try {
            runFilters("route");
        } catch (ZuulException e) {
            throw e;
        } catch (Throwable e) {
            throw new ZuulException(e, 500, "UNCAUGHT_EXCEPTION_IN_ROUTE_FILTER_" + e.getClass().getName());
        }
    }


    public void preRoute() throws ZuulException {
        try {
            runFilters("pre");
        } catch (ZuulException e) {
            throw e;
        } catch (Throwable e) {
            throw new ZuulException(e, 500, "UNCAUGHT_EXCEPTION_IN_PRE_FILTER_" + e.getClass().getName());
        }
    }

    public Object runFilters(String sType) throws Throwable {
        if (RequestContext.getCurrentContext().debugRouting()) {
            Debug.addRoutingDebug("Invoking {" + sType + "} type filters");
        }
        boolean bResult = false;
        List<ZuulFilter> list = FilterLoader.getInstance().getFiltersByType(sType);
        if (list != null) {
            for (int i = 0; i < list.size(); i++) {
                ZuulFilter zuulFilter = list.get(i);
                Object result = processZuulFilter(zuulFilter);
                if (result != null && result instanceof Boolean) {
                    bResult |= ((Boolean) result);
                }
            }
        }
        return bResult;
    }


    public Object processZuulFilter(ZuulFilter filter) throws ZuulException {

        RequestContext ctx = RequestContext.getCurrentContext();
        boolean bDebug = ctx.debugRouting();
        final String metricPrefix = "zuul.filter-";
        long execTime = 0;
        String filterName = "";
        try {
            long ltime = System.currentTimeMillis();
            filterName = filter.getClass().getSimpleName();
            
            RequestContext copy = null;
            Object o = null;
            Throwable t = null;

            if (bDebug) {
                Debug.addRoutingDebug("Filter " + filter.filterType() + " " + filter.filterOrder() + " " + filterName);
                copy = ctx.copy();
            }
            
            ZuulFilterResult result = filter.runFilter();
            ExecutionStatus s = result.getStatus();
            execTime = System.currentTimeMillis() - ltime;

            switch (s) {
                case FAILED:
                    t = result.getException();
                    ctx.addFilterExecutionSummary(filterName, ExecutionStatus.FAILED.name(), execTime);
                    break;
                case SUCCESS:
                    o = result.getResult();
                    ctx.addFilterExecutionSummary(filterName, ExecutionStatus.SUCCESS.name(), execTime);
                    if (bDebug) {
                        Debug.addRoutingDebug("Filter {" + filterName + " TYPE:" + filter.filterType() + " ORDER:" + filter.filterOrder() + "} Execution time = " + execTime + "ms");
                        Debug.compareContextState(filterName, copy);
                    }
                    break;
                default:
                    break;
            }
            
            if (t != null) throw t;

            usageNotifier.notify(filter, s);
            return o;

        } catch (Throwable e) {
            if (bDebug) {
                Debug.addRoutingDebug("Running Filter failed " + filterName + " type:" + filter.filterType() + " order:" + filter.filterOrder() + " " + e.getMessage());
            }
            usageNotifier.notify(filter, ExecutionStatus.FAILED);
            if (e instanceof ZuulException) {
                throw (ZuulException) e;
            } else {
                ZuulException ex = new ZuulException(e, "Filter threw Exception", 500, filter.filterType() + ":" + filterName);
                ctx.addFilterExecutionSummary(filterName, ExecutionStatus.FAILED.name(), execTime);
                throw ex;
            }
        }
    }

FilterUsageNotifier

每個filter執行完成或者異常,失敗。都會呼叫FilterUsageNotifier.notify的方法。這樣可以對執行結果進行監控。

    public static class BasicFilterUsageNotifier implements FilterUsageNotifier {
        private static final String METRIC_PREFIX = "zuul.filter-";

        @Override
        public void notify(ZuulFilter filter, ExecutionStatus status) {
            DynamicCounter.increment(METRIC_PREFIX + filter.getClass().getSimpleName(), "status", status.name(), "filtertype", filter.filterType());
        }
    }
ZuulRunner
  1. zuulRunner代理了FilterProcessor
  2. zuulRunner.init是整個體系最核心的一個方法了,負責把servletRequest放到RequestContext.getCurrentContext上下文中,在當前執行緒可以獲得HttpServletResponse,物件進行操作。同時可以傳遞其他引數。請自信看RibbonRoutingFilter。
public class ZuulRunner {

    private boolean bufferRequests;

    public ZuulRunner() {
        this.bufferRequests = true;
    }

    public ZuulRunner(boolean bufferRequests) {
        this.bufferRequests = bufferRequests;
    }

    public void init(HttpServletRequest servletRequest, HttpServletResponse servletResponse) {

        RequestContext ctx = RequestContext.getCurrentContext();
        if (bufferRequests) {
            ctx.setRequest(new HttpServletRequestWrapper(servletRequest));
        } else {
            ctx.setRequest(servletRequest);
        }

        ctx.setResponse(new HttpServletResponseWrapper(servletResponse));
    }

    public void postRoute() throws ZuulException {
        FilterProcessor.getInstance().postRoute();
    }

    public void route() throws ZuulException {
        FilterProcessor.getInstance().route();
    }

    public void preRoute() throws ZuulException {
        FilterProcessor.getInstance().preRoute();
    }

    public void error() {
        FilterProcessor.getInstance().error();
    }
}

spring-cloud-zuul

spring-cloud-zuul主要是兩個類ZuulServerAutoConfiguration與ZuulProxyAutoConfiguration。分別向spring容器注入很多filter與其他管理物件。

注入的Filter如下
  1. ServletDetectionFilter
  2. FormBodyWrapperFilter
  3. DebugFilter
  4. Servlet30WrapperFilter
  5. SendResponseFilter
  6. SendErrorFilter
  7. SendForwardFilter
  8. PreDecorationFilter
  9. RibbonRoutingFilter
  10. SimpleHostRoutingFilter
RouteLocator

routeLocator是zuul route管理體系。SimpleRouteLocator是最簡單的實現。如何大家需要維護的自己路由,向spring容器注入自己的實現就行了。

routeLocator是zuul route管理體系。SimpleRouteLocator是最簡單的實現。如何大家需要維護的自己路由,向spring容器注入自己的實現就行了。

routeLocator是zuul route管理體系。SimpleRouteLocator是最簡單的實現。如何大家需要維護的自己路由,向spring容器注入自己的實現就行了。

與Eureka整合

DiscoveryClientRouteLocator類負責與eureka進行整合。DiscoveryClientRouteLocator會從eureka伺服器獲取服務,併入住到SimpleRouteLocator

ZuulController

ZuulController負責向spring容器裡面注入ZuulServlet物件

ZuulFilterConfiguration

ZuulFilterConfiguration是整個spring-cloud-zuul核心環節,ZuulFilterInitializer負責把FilterLoader,FilterRegistry,CounterFactory(統計模組),TracerFactory(felter執行跟蹤模組),連結在一起。那麼最基礎的zuul體系元件完成

protected static class ZuulFilterConfiguration {

		@Autowired
		private Map<String, ZuulFilter> filters;

		@Bean
		public ZuulFilterInitializer zuulFilterInitializer(
				CounterFactory counterFactory, TracerFactory tracerFactory) {
			FilterLoader filterLoader = FilterLoader.getInstance();
			FilterRegistry filterRegistry = FilterRegistry.instance();
			return new ZuulFilterInitializer(this.filters, counterFactory, tracerFactory, filterLoader, filterRegistry);
		}

}
public class ZuulFilterInitializer {

	private static final Log log = LogFactory.getLog(ZuulFilterInitializer.class);

	private final Map<String, ZuulFilter> filters;
	private final CounterFactory counterFactory;
	private final TracerFactory tracerFactory;
	private final FilterLoader filterLoader;
	private final FilterRegistry filterRegistry;

	public ZuulFilterInitializer(Map<String, ZuulFilter> filters,CounterFactory counterFactory,TracerFactory tracerFactory, FilterLoader filterLoader,FilterRegistry filterRegistry) {
		this.filters = filters;
		this.counterFactory = counterFactory;
		this.tracerFactory = tracerFactory;
		this.filterLoader = filterLoader;
		this.filterRegistry = filterRegistry;
	}

	@PostConstruct
	public void contextInitialized() {
		log.info("Starting filter initializer");

		TracerFactory.initialize(tracerFactory);
		CounterFactory.initialize(counterFactory);

		for (Map.Entry<String, ZuulFilter> entry : this.filters.entrySet()) {
			filterRegistry.put(entry.getKey(), entry.getValue());
		}
	}

	@PreDestroy
	public void contextDestroyed() {
		log.info("Stopping filter initializer");
		for (Map.Entry<String, ZuulFilter> entry : this.filters.entrySet()) {
			filterRegistry.remove(entry.getKey());
		}
		clearLoaderCache();

		TracerFactory.initialize(null);
		CounterFactory.initialize(null);
	}

	private void clearLoaderCache() {
		Field field = ReflectionUtils.findField(FilterLoader.class, "hashFiltersByType");
		ReflectionUtils.makeAccessible(field);
		@SuppressWarnings("rawtypes")
		Map cache = (Map) ReflectionUtils.getField(field, filterLoader);
		cache.clear();
	}

ZuulServlet

zuulservlet 負責攔截所有請求,執行preRoute,route,postRoute,error行為。

    public void service(javax.servlet.ServletRequest servletRequest, javax.servlet.ServletResponse servletResponse) throws ServletException, IOException {
        try {
            init((HttpServletRequest) servletRequest, (HttpServletResponse) servletResponse);
            RequestContext context = RequestContext.getCurrentContext();
            context.setZuulEngineRan();

            try {
                preRoute();
            } catch (ZuulException e) {
                error(e);
                postRoute();
                return;
            }
            try {
                route();
            } catch (ZuulException e) {
                error(e);
                postRoute();
                return;
            }
            try {
                postRoute();
            } catch (ZuulException e) {
                error(e);
                return;
            }

        } catch (Throwable e) {
            error(new ZuulException(e, 500, "UNHANDLED_EXCEPTION_" + e.getClass().getName()));
        } finally {
            RequestContext.getCurrentContext().unset();