dubbo原始碼解析(二十九)遠端呼叫——redis協議
遠端呼叫——redis協議
目標:介紹redis協議的設計和實現,介紹dubbo-rpc-redis的原始碼。
前言
dubbo支援的redis協議是基於Redis的,Redis 是一個高效的 KV 儲存伺服器,跟memcached協議實現差不多,在dubbo中也沒有涉及到關於redis協議的服務暴露,只有服務引用,因為在訪問伺服器時,Redis客戶端可以在伺服器上儲存也可以獲取。
原始碼分析
(一)RedisProtocol
該類繼承了AbstractProtocol類,是redis協議實現的核心。
1.屬性
/** * 預設埠號 */ public static final int DEFAULT_PORT = 6379;
2.export
@Override public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException { // 不支援redis協議的服務暴露,丟擲異常 throw new UnsupportedOperationException("Unsupported export redis service. url: " + invoker.getUrl()); }
可以看到不支援服務暴露。
3.refer
@Override public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException { try { // 例項化物件池 GenericObjectPoolConfig config = new GenericObjectPoolConfig(); // 如果 testOnBorrow 被設定,pool 會在 borrowObject 返回物件之前使用 PoolableObjectFactory的 validateObject 來驗證這個物件是否有效 // 要是物件沒通過驗證,這個物件會被丟棄,然後重新選擇一個新的物件。 config.setTestOnBorrow(url.getParameter("test.on.borrow", true)); // 如果 testOnReturn 被設定, pool 會在 returnObject 的時候通過 PoolableObjectFactory 的validateObject 方法驗證物件 // 如果物件沒通過驗證,物件會被丟棄,不會被放到池中。 config.setTestOnReturn(url.getParameter("test.on.return", false)); // 指定空閒物件是否應該使用 PoolableObjectFactory 的 validateObject 校驗,如果校驗失敗,這個物件會從物件池中被清除。 // 這個設定僅在 timeBetweenEvictionRunsMillis 被設定成正值( >0) 的時候才會生效。 config.setTestWhileIdle(url.getParameter("test.while.idle", false)); if (url.getParameter("max.idle", 0) > 0) // 控制一個pool最多有多少個狀態為空閒的jedis例項。 config.setMaxIdle(url.getParameter("max.idle", 0)); if (url.getParameter("min.idle", 0) > 0) // 控制一個pool最少有多少個狀態為空閒的jedis例項。 config.setMinIdle(url.getParameter("min.idle", 0)); if (url.getParameter("max.active", 0) > 0) // 控制一個pool最多有多少個jedis例項。 config.setMaxTotal(url.getParameter("max.active", 0)); if (url.getParameter("max.total", 0) > 0) config.setMaxTotal(url.getParameter("max.total", 0)); if (url.getParameter("max.wait", 0) > 0) //表示當引入一個jedis例項時,最大的等待時間,如果超過等待時間,則直接丟擲JedisConnectionException; config.setMaxWaitMillis(url.getParameter("max.wait", 0)); if (url.getParameter("num.tests.per.eviction.run", 0) > 0) // 設定驅逐執行緒每次檢測物件的數量。這個設定僅在 timeBetweenEvictionRunsMillis 被設定成正值( >0)的時候才會生效。 config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0)); if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) // 指定驅逐執行緒的休眠時間。如果這個值不是正數( >0),不會有驅逐執行緒執行。 config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0)); if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) // 指定最小的空閒驅逐的時間間隔(空閒超過指定的時間的物件,會被清除掉)。 // 這個設定僅在 timeBetweenEvictionRunsMillis 被設定成正值( >0)的時候才會生效。 config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0)); // 建立redis連線池 final JedisPool jedisPool = new JedisPool(config, url.getHost(), url.getPort(DEFAULT_PORT), url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), StringUtils.isBlank(url.getPassword()) ? null : url.getPassword(), url.getParameter("db.index", 0)); // 獲得值的過期時間 final int expiry = url.getParameter("expiry", 0); // 獲得get命令 final String get = url.getParameter("get", "get"); // 獲得set命令 final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set"); // 獲得delete命令 final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete"); return new AbstractInvoker<T>(type, url) { @Override protected Result doInvoke(Invocation invocation) throws Throwable { Jedis resource = null; try { resource = jedisPool.getResource(); // 如果是get命令 if (get.equals(invocation.getMethodName())) { // get 命令必須只有一個引數 if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // 獲得值 byte[] value = resource.get(String.valueOf(invocation.getArguments()[0]).getBytes()); if (value == null) { return new RpcResult(); } // 反序列化 ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value)); return new RpcResult(oin.readObject()); } else if (set.equals(invocation.getMethodName())) { // 如果是set命令,引數長度必須是2 if (invocation.getArguments().length != 2) { throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes(); ByteArrayOutputStream output = new ByteArrayOutputStream(); // 對需要存入對值進行序列化 ObjectOutput value = getSerialization(url).serialize(url, output); value.writeObject(invocation.getArguments()[1]); // 存入值 resource.set(key, output.toByteArray()); // 設定該key過期時間,不能大於1000s if (expiry > 1000) { resource.expire(key, expiry / 1000); } return new RpcResult(); } else if (delete.equals(invocation.getMethodName())) { // 如果是刪除命令,則引數長度必須是1 if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // 刪除該值 resource.del(String.valueOf(invocation.getArguments()[0]).getBytes()); return new RpcResult(); } else { // 否則丟擲該操作不支援的異常 throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service."); } } catch (Throwable t) { RpcException re = new RpcException("Failed to invoke redis service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t); if (t instanceof TimeoutException || t instanceof SocketTimeoutException) { // 丟擲超時異常 re.setCode(RpcException.TIMEOUT_EXCEPTION); } else if (t instanceof JedisConnectionException || t instanceof IOException) { // 丟擲網路異常 re.setCode(RpcException.NETWORK_EXCEPTION); } else if (t instanceof JedisDataException) { // 丟擲序列化異常 re.setCode(RpcException.SERIALIZATION_EXCEPTION); } throw re; } finally { if (resource != null) { try { jedisPool.returnResource(resource); } catch (Throwable t) { logger.warn("returnResource error: " + t.getMessage(), t); } } } } @Override public void destroy() { super.destroy(); try { // 關閉連線池 jedisPool.destroy(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } }; } catch (Throwable t) { throw new RpcException("Failed to refer redis service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t); } }
可以看到首先是對連線池的配置賦值,然後建立連線池後,根據redis的get、set、delete命令來進行相關操作。
後記
該部分相關的原始碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了遠端呼叫中關於redis協議實現的部分,邏輯比較簡單。接下來我將開始對rpc模組關於rest協議部分進行講解。