1. 程式人生 > >hadoop(2.7.3) 原始碼分析--RPC部分

hadoop(2.7.3) 原始碼分析--RPC部分

hadoop(2.7.3) 原始碼分析–RPC部分

序列化

hadoop 自帶了Writable序列化方法,可序列化的物件需實現 Writable 介面。
Hadoop common下org.apache.hadoop.io 大量的可序列化物件,他們都實現了Writable 介面。

  /** 
   * Serialize the fields of this object to <code>out</code>.
   * 
   * @param out <code>DataOuput</code> to serialize this object into.
   * @throws
IOException */
void write(DataOutput out) throws IOException; /** * Deserialize the fields of this object from <code>in</code>. * * <p>For efficiency, implementations should attempt to re-use storage in the * existing object where possible.</p> * * @param
in <code>DataInput</code> to deseriablize this object from. * @throws IOException */
void readFields(DataInput in) throws IOException;

其中最關鍵的是ObjectWritable,他儲存了一個可以在RPC上傳輸的物件和物件的型別資訊,是萬能的。它會往流裡會寫如下資訊:物件類名,物件自己的序列化結果,程式碼見ObjectWritable.java

UTF8.writeString(out, declaredClass.getName()); // always write declared
if (declaredClass.isArray()) { // non-primitive or non-compact array int length = Array.getLength(instance); out.writeInt(length); for (int i = 0; i < length; i++) { writeObject(out, Array.get(instance, i), declaredClass.getComponentType(), conf, allowCompactArrays); } } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) { ((ArrayPrimitiveWritable.Internal) instance).write(out); } else if (declaredClass == String.class) { // String UTF8.writeString(out, (String)instance); }

通訊部分

既然是RPC,當然就有客戶端和伺服器,當然,org.apache.hadoop.ipc也就有了類Client和類Server。但是類Server是一個抽象類,類RPC封裝了Server,利用反射,把某個物件的方法開放出來,發成RPC中的伺服器。
由於Client可能和多個Server通訊,典型的一次HDFS讀,需要和NameNode打交道,也需要和某個/某些DataNode通訊。這就意味著某一個Client需要維護多個連線。同時,為了減少不必要的連線,現在Client的做法是拿ConnectionId來做為Connection 的ID。ConnectionId 包括一個InetSocketAddress 和UserGroupInformation(ticket)及protocol,同一個使用者與同一個InetSocketAddress 的通訊將共享同一個連線。

  private Hashtable<ConnectionId, Connection> connections =
new Hashtable<ConnectionId, Connection>();

為了區分在同一個Connection 上的不同調用,每個呼叫都有唯一的id。呼叫是否結束也需要一個標記,所有的這些都體現在物件Client.Call 中。Connection 物件通過一個Hash 表,維護在某個連線上的所有Call

private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();

每個Client.Connection 繼承Thread,所以會開啟一個執行緒,不斷去讀取socket,並將收結果解包,找出對應的Call,設定Call 並通知結果已獲取。
Hadoop 的Server 採用了Java 的NIO,返樣的話就不需要為每一個socket 連線建立一個執行緒,讀取socket 上的資料。在Server 中,Listener及其巢狀類Reader 負責accept 新的連線請求和讀取socket 上的資料。
Responder負責在NIO可寫時,傳送rpc的結果。

RPC部分

客戶端和服務端呼叫時,都需要用的Call類,表示一次請求。在客戶端Client.Call 類裡,包括rpcRequest和rpcResponse,在Server.Call,除了剛才的rpcRequest和rpcResponse,還主要包括一下屬性:
connection 是該Call 來自的連線,當然,當請求處理結束時,相應的結果會通過相同的connection,傳送給客戶端。
timestamp 是請求到達的時間戳,如果請求長時間沒被處理,對應的連線會被關閉,客戶端也就知道出錯了

rpc部分的關鍵是RpcEngine,預設為WritableRpcEngine。主要包括Invocation、Invoker和WritableRpcInvoker三部分。

Invocation封裝了一個迖程呼叫的所有相關資訊,它的主要屬性有: methodName,呼叫方法名,parameterClasses,呼叫方法引數的型別列表和parameters,呼叫方法引數。注意,它實現了Writable介面,可以序列化。

 @Override
    @SuppressWarnings("deprecation")
    public void write(DataOutput out) throws IOException {
      out.writeLong(rpcVersion);
      UTF8.writeString(out, declaringClassProtocolName);
      UTF8.writeString(out, methodName);
      out.writeLong(clientVersion);
      out.writeInt(clientMethodsHash);
      out.writeInt(parameterClasses.length);
      for (int i = 0; i < parameterClasses.length; i++) {
        ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
                                   conf, true);
      }
}

Invoker實現了InvocationHandler的invoke方法(invoke方法也是InvocationHandler的唯一方法)。Invoker會通過Invocation把所有跟這次呼叫相關的呼叫方法名,引數型別列表,引數列表打包,然後引用前面我們分析過的Client,通過socket傳遞到伺服器端。就是說,你在proxy類上的任何呼叫,都通過Client傳送到對方的伺服器上。

@Override
    public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      long startTime = 0;
      if (LOG.isDebugEnabled()) {
        startTime = Time.now();
      }
      TraceScope traceScope = null;
      if (Trace.isTracing()) {
        traceScope = Trace.startSpan(RpcClientUtil.methodToTraceString(method));
      }
      ObjectWritable value;
      try {
        value = (ObjectWritable)
          client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
            remoteId, fallbackToSimpleAuth);
      } finally {
        if (traceScope != null) traceScope.close();
      }
      if (LOG.isDebugEnabled()) {
        long callTime = Time.now() - startTime;
        LOG.debug("Call: " + method.getName() + " " + callTime);
      }
      return value.get();
}

WritableRpcInvoker在服務端的被呼叫,主要功能包括獲取傳過來的Invocation物件,呼叫程式碼並將返回值返回。