dubbo原始碼解析(三十)遠端呼叫——rest協議
遠端呼叫——rest協議
目標:介紹rest協議的設計和實現,介紹dubbo-rpc-rest的原始碼。
前言
REST的英文名是RepresentationalState Transfer,它是一種開發風格,關於REST不清楚的朋友可以瞭解一下。在dubbo中利用的是紅帽子RedHat公司的Resteasy來使dubbo支援REST風格的開發使用。在本文中主要講解的是基於Resteasy來實現rest協議的實現。
原始碼分析
(一)RestServer
該介面是rest協議的伺服器介面。定義了伺服器相關的方法。
public interface RestServer { /** * 伺服器啟動 * @param url */ void start(URL url); /** * 部署伺服器 * @param resourceDef it could be either resource interface or resource impl */ void deploy(Class resourceDef, Object resourceInstance, String contextPath); /** * 取消伺服器部署 * @param resourceDef */ void undeploy(Class resourceDef); /** * 停止伺服器 */ void stop(); }
(二)BaseRestServer
該類實現了RestServer介面,是rest服務的抽象類,把getDeployment和doStart方法進行抽象,讓子類專注於中這兩個方法的實現。
1.start
@Override public void start(URL url) { // 支援兩種 Content-Type getDeployment().getMediaTypeMappings().put("json", "application/json"); getDeployment().getMediaTypeMappings().put("xml", "text/xml"); //server.getDeployment().getMediaTypeMappings().put("xml", "application/xml"); // 新增攔截器 getDeployment().getProviderClasses().add(RpcContextFilter.class.getName()); // TODO users can override this mapper, but we just rely on the current priority strategy of resteasy // 異常類對映 getDeployment().getProviderClasses().add(RpcExceptionMapper.class.getName()); // 新增需要載入的類 loadProviders(url.getParameter(Constants.EXTENSION_KEY, "")); // 開啟伺服器 doStart(url); }
2.deploy
@Override public void deploy(Class resourceDef, Object resourceInstance, String contextPath) { // 如果 if (StringUtils.isEmpty(contextPath)) { // 新增自定義資源實現端點,部署伺服器 getDeployment().getRegistry().addResourceFactory(new DubboResourceFactory(resourceInstance, resourceDef)); } else { // 新增自定義資源實現端點。指定contextPath getDeployment().getRegistry().addResourceFactory(new DubboResourceFactory(resourceInstance, resourceDef), contextPath); } }
3.undeploy
@Override public void undeploy(Class resourceDef) { // 取消伺服器部署 getDeployment().getRegistry().removeRegistrations(resourceDef); }
4.loadProviders
protected void loadProviders(String value) { for (String clazz : Constants.COMMA_SPLIT_PATTERN.split(value)) { if (!StringUtils.isEmpty(clazz)) { getDeployment().getProviderClasses().add(clazz.trim()); } } }
該方法是把類都加入到ResteasyDeployment的providerClasses中,加入各類元件。
(三)DubboHttpServer
該類繼承了BaseRestServer,實現了doStart和getDeployment方法,當配置選擇servlet、jetty或者tomcat作為遠端通訊的實現時,實現的伺服器類
1.屬性
/** * HttpServletDispatcher例項 */ private final HttpServletDispatcher dispatcher = new HttpServletDispatcher(); /** * Resteasy的服務部署器 */ private final ResteasyDeployment deployment = new ResteasyDeployment(); /** * http繫結者 */ private HttpBinder httpBinder; /** * http伺服器 */ private HttpServer httpServer;
2.doStart
@Override protected void doStart(URL url) { // TODO jetty will by default enable keepAlive so the xml config has no effect now // 建立http伺服器 httpServer = httpBinder.bind(url, new RestHandler()); // 獲得ServletContext ServletContext servletContext = ServletManager.getInstance().getServletContext(url.getPort()); // 如果為空 ,則獲得預設埠對應的ServletContext物件 if (servletContext == null) { servletContext = ServletManager.getInstance().getServletContext(ServletManager.EXTERNAL_SERVER_PORT); } // 如果還是為空 ,則丟擲異常 if (servletContext == null) { throw new RpcException("No servlet context found. If you are using server='servlet', " + "make sure that you've configured " + BootstrapListener.class.getName() + " in web.xml"); } // 設定屬性部署器 servletContext.setAttribute(ResteasyDeployment.class.getName(), deployment); try { // 初始化 dispatcher.init(new SimpleServletConfig(servletContext)); } catch (ServletException e) { throw new RpcException(e); } }
3.RestHandler
private class RestHandler implements HttpHandler { @Override public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { // 設定遠端地址 RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort()); // 請求相關的服務 dispatcher.service(request, response); } }
該內部類是服務請求的處理器
4.SimpleServletConfig
private static class SimpleServletConfig implements ServletConfig { // ServletContext物件 private final ServletContext servletContext; public SimpleServletConfig(ServletContext servletContext) { this.servletContext = servletContext; } @Override public String getServletName() { return "DispatcherServlet"; } @Override public ServletContext getServletContext() { return servletContext; } @Override public String getInitParameter(String s) { return null; } @Override public Enumeration getInitParameterNames() { return new Enumeration() { @Override public boolean hasMoreElements() { return false; } @Override public Object nextElement() { return null; } }; } }
該內部類是配置類。
(四)NettyServer
該類繼承了BaseRestServer,當配置了netty作為遠端通訊的實現時,實現的伺服器。
public class NettyServer extends BaseRestServer { /** * NettyJaxrsServer物件 */ private final NettyJaxrsServer server = new NettyJaxrsServer(); @Override protected void doStart(URL url) { // 獲得ip String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost()); if (!url.isAnyHost() && NetUtils.isValidLocalHost(bindIp)) { // 設定服務的ip server.setHostname(bindIp); } // 設定埠 server.setPort(url.getParameter(Constants.BIND_PORT_KEY, url.getPort())); // 通道選項集合 Map<ChannelOption, Object> channelOption = new HashMap<ChannelOption, Object>(); // 保持連線檢測對方主機是否崩潰 channelOption.put(ChannelOption.SO_KEEPALIVE, url.getParameter(Constants.KEEP_ALIVE_KEY, Constants.DEFAULT_KEEP_ALIVE)); // 設定配置 server.setChildChannelOptions(channelOption); // 設定執行緒數,預設為200 server.setExecutorThreadCount(url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS)); // 設定核心執行緒數 server.setIoWorkerCount(url.getParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); // 設定最大的請求數 server.setMaxRequestSize(url.getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD)); // 啟動服務 server.start(); } @Override public void stop() { server.stop(); } @Override protected ResteasyDeployment getDeployment() { return server.getDeployment(); } }
(五)DubboResourceFactory
該類實現了ResourceFactory介面,是資源工程實現類,封裝了以下兩個屬性,實現比較簡單。
/** * 資源類 */ private Object resourceInstance; /** * 掃描的型別 */ private Class scannableClass;
(六)RestConstraintViolation
該類是當約束違反的實體類,封裝了以下三個屬性,具體使用可以看下面的介紹。
/** * 地址 */ private String path; /** * 訊息 */ private String message; /** * 值 */ private String value;
(七)RestServerFactory
該類是伺服器工程類,用來提供相應的例項,裡面邏輯比較簡單。
public class RestServerFactory { /** * http繫結者 */ private HttpBinder httpBinder; public void setHttpBinder(HttpBinder httpBinder) { this.httpBinder = httpBinder; } /** * 建立伺服器 * @param name * @return */ public RestServer createServer(String name) { // TODO move names to Constants // 如果是servlet或者jetty或者tomcat,則建立DubboHttpServer if ("servlet".equalsIgnoreCase(name) || "jetty".equalsIgnoreCase(name) || "tomcat".equalsIgnoreCase(name)) { return new DubboHttpServer(httpBinder); } else if ("netty".equalsIgnoreCase(name)) { // 如果是netty,那麼直接建立netty伺服器 return new NettyServer(); } else { // 否則 丟擲異常 throw new IllegalArgumentException("Unrecognized server name: " + name); } } }
可以看到,根據配置的不同,來建立不同的伺服器實現。
(八)RpcContextFilter
該類是過濾器。增加了對協議頭大小的限制。
public class RpcContextFilter implements ContainerRequestFilter, ClientRequestFilter { /** * 附加值key */ private static final String DUBBO_ATTACHMENT_HEADER = "Dubbo-Attachments"; // currently we use a single header to hold the attachments so that the total attachment size limit is about 8k /** * 目前我們使用單個標頭來儲存附件,以便總附件大小限制大約為8k */ private static final int MAX_HEADER_SIZE = 8 * 1024; @Override public void filter(ContainerRequestContext requestContext) throws IOException { // 獲得request HttpServletRequest request = ResteasyProviderFactory.getContextData(HttpServletRequest.class); // 把它放到rpc上下文中 RpcContext.getContext().setRequest(request); // this only works for servlet containers if (request != null && RpcContext.getContext().getRemoteAddress() == null) { // 設定遠端地址 RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort()); } // 設定response RpcContext.getContext().setResponse(ResteasyProviderFactory.getContextData(HttpServletResponse.class)); // 獲得協議頭資訊 String headers = requestContext.getHeaderString(DUBBO_ATTACHMENT_HEADER); // 分割協議頭資訊,把附加值分解開存入上下文中 if (headers != null) { for (String header : headers.split(",")) { int index = header.indexOf("="); if (index > 0) { String key = header.substring(0, index); String value = header.substring(index + 1); if (!StringUtils.isEmpty(key)) { RpcContext.getContext().setAttachment(key.trim(), value.trim()); } } } } } @Override public void filter(ClientRequestContext requestContext) throws IOException { int size = 0; // 遍歷附加值 for (Map.Entry<String, String> entry : RpcContext.getContext().getAttachments().entrySet()) { // 如果key或者value有出現=或者,則丟擲異常 if (entry.getValue().contains(",") || entry.getValue().contains("=") || entry.getKey().contains(",") || entry.getKey().contains("=")) { throw new IllegalArgumentException("The attachments of " + RpcContext.class.getSimpleName() + " must not contain ',' or '=' when using rest protocol"); } // TODO for now we don't consider the differences of encoding and server limit // 加入UTF-8配置,計算協議頭大小 size += entry.getValue().getBytes("UTF-8").length; // 如果大於限制,則丟擲異常 if (size > MAX_HEADER_SIZE) { throw new IllegalArgumentException("The attachments of " + RpcContext.class.getSimpleName() + " is too big"); } // 拼接 String attachments = entry.getKey() + "=" + entry.getValue(); // 加入到請求頭上 requestContext.getHeaders().add(DUBBO_ATTACHMENT_HEADER, attachments); } } }
可以看到有兩個filter的方法實現,第一個是解析對於附加值,並且放入上下文中。第二個是對協議頭大小的限制。
(九)RpcExceptionMapper
該類是異常的處理類。
public class RpcExceptionMapper implements ExceptionMapper<RpcException> { @Override public Response toResponse(RpcException e) { // TODO do more sophisticated exception handling and output // 如果是約束違反異常 if (e.getCause() instanceof ConstraintViolationException) { return handleConstraintViolationException((ConstraintViolationException) e.getCause()); } // we may want to avoid exposing the dubbo exception details to certain clients // TODO for now just do plain text output return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Internal server error: " + e.getMessage()).type(ContentType.TEXT_PLAIN_UTF_8).build(); } /** * 處理引數不合法的異常 * @param cve * @return */ protected Response handleConstraintViolationException(ConstraintViolationException cve) { // 建立約束違反記錄 ViolationReport report = new ViolationReport(); // 遍歷約束違反 for (ConstraintViolation cv : cve.getConstraintViolations()) { // 新增記錄 report.addConstraintViolation(new RestConstraintViolation( cv.getPropertyPath().toString(), cv.getMessage(), cv.getInvalidValue() == null ? "null" : cv.getInvalidValue().toString())); } // TODO for now just do xml output // 只支援xml輸出 return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(report).type(ContentType.TEXT_XML_UTF_8).build(); } }
主要是處理引數不合法的異常。
(十)ViolationReport
該類是約束違反的記錄類,其中就封裝了一個約束違反的集合。
public class ViolationReport implements Serializable { private static final long serialVersionUID = -130498234L; /** * 約束違反集合 */ private List<RestConstraintViolation> constraintViolations; public List<RestConstraintViolation> getConstraintViolations() { return constraintViolations; } public void setConstraintViolations(List<RestConstraintViolation> constraintViolations) { this.constraintViolations = constraintViolations; } public void addConstraintViolation(RestConstraintViolation constraintViolation) { if (constraintViolations == null) { constraintViolations = new LinkedList<RestConstraintViolation>(); } constraintViolations.add(constraintViolation); } }
(十一)RestProtocol
該類繼承了AbstractProxyProtocol,是rest協議實現的核心。
1.屬性
/** * 預設埠號 */ private static final int DEFAULT_PORT = 80; /** * 伺服器集合 */ private final Map<String, RestServer> servers = new ConcurrentHashMap<String, RestServer>(); /** * 伺服器工廠 */ private final RestServerFactory serverFactory = new RestServerFactory(); // TODO in the future maybe we can just use a single rest client and connection manager /** * 客戶端集合 */ private final List<ResteasyClient> clients = Collections.synchronizedList(new LinkedList<ResteasyClient>()); /** * 連線監控 */ private volatile ConnectionMonitor connectionMonitor;
2.doExport
@Override protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException { // 獲得地址 String addr = getAddr(url); // 獲得實現類 Class implClass = (Class) StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).get(url.getServiceKey()); // 獲得服務 RestServer server = servers.get(addr); if (server == null) { // 建立伺服器 server = serverFactory.createServer(url.getParameter(Constants.SERVER_KEY, "jetty")); // 開啟伺服器 server.start(url); // 加入集合 servers.put(addr, server); } // 獲得contextPath String contextPath = getContextPath(url); // 如果以servlet的方式 if ("servlet".equalsIgnoreCase(url.getParameter(Constants.SERVER_KEY, "jetty"))) { // 獲得ServletContext ServletContext servletContext = ServletManager.getInstance().getServletContext(ServletManager.EXTERNAL_SERVER_PORT); // 如果為空,則丟擲異常 if (servletContext == null) { throw new RpcException("No servlet context found. Since you are using server='servlet', " + "make sure that you've configured " + BootstrapListener.class.getName() + " in web.xml"); } String webappPath = servletContext.getContextPath(); if (StringUtils.isNotEmpty(webappPath)) { // 檢測配置是否正確 webappPath = webappPath.substring(1); if (!contextPath.startsWith(webappPath)) { throw new RpcException("Since you are using server='servlet', " + "make sure that the 'contextpath' property starts with the path of external webapp"); } contextPath = contextPath.substring(webappPath.length()); if (contextPath.startsWith("/")) { contextPath = contextPath.substring(1); } } } // 獲得資源 final Class resourceDef = GetRestful.getRootResourceClass(implClass) != null ? implClass : type; // 部署伺服器 server.deploy(resourceDef, impl, contextPath); final RestServer s = server; return new Runnable() { @Override public void run() { // TODO due to dubbo's current architecture, // it will be called from registry protocol in the shutdown process and won't appear in logs s.undeploy(resourceDef); } }; }
該方法是服務暴露的方法。
3.doRefer
protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException { // 如果連線監控為空,則建立 if (connectionMonitor == null) { connectionMonitor = new ConnectionMonitor(); } // TODO more configs to add // 建立http連線池 PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); // 20 is the default maxTotal of current PoolingClientConnectionManager // 最大連線數 connectionManager.setMaxTotal(url.getParameter(Constants.CONNECTIONS_KEY, 20)); // 最大的路由數 connectionManager.setDefaultMaxPerRoute(url.getParameter(Constants.CONNECTIONS_KEY, 20)); // 新增監控 connectionMonitor.addConnectionManager(connectionManager); // 新建請求配置 RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT)) .setSocketTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)) .build(); // 設定socket配置 SocketConfig socketConfig = SocketConfig.custom() .setSoKeepAlive(true) .setTcpNoDelay(true) .build(); // 建立http客戶端 CloseableHttpClient httpClient = HttpClientBuilder.create() .setKeepAliveStrategy(new ConnectionKeepAliveStrategy() { @Override public long getKeepAliveDuration(HttpResponse response, HttpContext context) { HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE)); while (it.hasNext()) { HeaderElement he = it.nextElement(); String param = he.getName(); String value = he.getValue(); if (value != null && param.equalsIgnoreCase("timeout")) { return Long.parseLong(value) * 1000; } } // TODO constant return 30 * 1000; } }) .setDefaultRequestConfig(requestConfig) .setDefaultSocketConfig(socketConfig) .build(); // 建立ApacheHttpClient4Engine對應,為了使用resteasy ApacheHttpClient4Engine engine = new ApacheHttpClient4Engine(httpClient/*, localContext*/); //建立ResteasyClient物件 ResteasyClient client = new ResteasyClientBuilder().httpEngine(engine).build(); // 加入集合 clients.add(client); // 設定過濾器 client.register(RpcContextFilter.class); // 註冊各類元件 for (String clazz : Constants.COMMA_SPLIT_PATTERN.split(url.getParameter(Constants.EXTENSION_KEY, ""))) { if (!StringUtils.isEmpty(clazz)) { try { client.register(Thread.currentThread().getContextClassLoader().loadClass(clazz.trim())); } catch (ClassNotFoundException e) { throw new RpcException("Error loading JAX-RS extension class: " + clazz.trim(), e); } } } // TODO protocol // 建立 Service Proxy 物件。 ResteasyWebTarget target = client.target("http://" + url.getHost() + ":" + url.getPort() + "/" + getContextPath(url)); return target.proxy(serviceType); }
該方法是服務引用的實現。
4.ConnectionMonitor
protected class ConnectionMonitor extends Thread { /** * 是否關閉 */ private volatile boolean shutdown; /** * 連線池集合 */ private final List<PoolingHttpClientConnectionManager> connectionManagers = Collections.synchronizedList(new LinkedList<PoolingHttpClientConnectionManager>()); public void addConnectionManager(PoolingHttpClientConnectionManager connectionManager) { connectionManagers.add(connectionManager); } @Override public void run() { try { while (!shutdown) { synchronized (this) { wait(1000); for (PoolingHttpClientConnectionManager connectionManager : connectionManagers) { // 關閉池中所有過期的連線 connectionManager.closeExpiredConnections(); // TODO constant // 關閉池中的空閒連線 connectionManager.closeIdleConnections(30, TimeUnit.SECONDS); } } } } catch (InterruptedException ex) { // 關閉 shutdown(); } } public void shutdown() { shutdown = true; connectionManagers.clear(); synchronized (this) { notifyAll(); } } }
該內部類是處理連線的監控類,當連線過期獲取空間的時候,關閉它。
後記
該部分相關的原始碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了遠端呼叫中關於rest協議實現的部分,關鍵是要對Resteasy的使用需要有所瞭解,其他的思路跟其他協議實現差距不大。接下來我將開始對rpc模組關於rmi協議部分進行講解。