1. 程式人生 > >分布式系列五: RMI通信

分布式系列五: RMI通信

checked rri static 成了 -- 如果 locate pre 連接

RPC(Remote Procedure Call)協議

RPC協議是一種通過網絡從遠程計算機上請求服務, 而不需要了解底層網絡技術的協議, 在OSI模型中處在應用層和網絡層.

作為一個規範, 使用RPC協議的框架有很多, Dubbo,Hessian等均使用這個協議, RMI也使用該協議實現.

RMI(Remote Method Invocation) 遠程方法調用

RMI使用Java遠程消息交換協議JRMP(Java Remote Messaging Protocol)進行通信,JRMP是純java的.

  1. 定義接口, 使其extends Remote接口, 方法需要拋出異常RemoteException
    , Remote是一個標記接口
public interface IRmiTest extends Remote {
    String hello() throws RemoteException;
}
  1. 實現接口, 使其extends UnicastRemoteObject, 需要有構造方法, 並拋出異常RemoteException
public class RmiTest extends UnicastRemoteObject implements IRmiTest {

    public RmiTest() throws RemoteException {

    }

    @Override
    public String hello() {
        return "Hello ....";
    }
}
  1. 定義服務端, 註冊和綁定
public class TestServer {
    public static void main(String[] args) throws RemoteException, AlreadyBoundException, MalformedURLException {
        IRmiTest rmiTest = new RmiTest();
        LocateRegistry.createRegistry(8888);
        Naming.bind("rmi://localhost:8888/hello",rmiTest);
        System.out.println("server started");
    }
}
  1. 定義客戶端, lookup方法的參數url與服務端bind的必須一致. 接口需要定義為與服務端一致.
public class TestClient {
    public static void main(String[] args) throws RemoteException,  MalformedURLException, NotBoundException {
        IRmiTest rmiTest = (IRmiTest) Naming.lookup("rmi://localhost:8888/hello");
        System.out.println(rmiTest.hello());
    }
}

RMI實現機制

RMI屏蔽了底層復雜的網絡調用, 使得遠程對象的方法調用變得透明, 就像調用本地方法一樣方便.
下面深入探究下jdk中rmi的實現原理, 看看底層是如何實現遠程調用的.
首先, 需要了解下比較重要的兩個角色stub和skeleton, 這兩個角色封裝了與網絡相關的代碼. 原始的交互式這樣的,客戶端--網絡--服務器--具體服務. 有了這兩個角色之後的模型變為: 客戶端--stub--網絡--skeleton--服務器--服務.可以參考的圖維基百科

下面來看源碼...

一.實例化RegistryImpl,初始化

LocateRegistry.createRegistry(8888);這句代碼啟動了一個註冊器(其中有個Map對象來存儲名稱和服務的映射,這個後面再細看)

public static Registry createRegistry(int port) throws RemoteException {
    return new RegistryImpl(port);
}

這個方法實例化了一個RegistryImpl的實例,RegistryImpl實現了Registry.

public RegistryImpl(final int var1) throws RemoteException {
    if(var1 == 1099 && System.getSecurityManager() != null) {
        try {
            AccessController.doPrivileged(new PrivilegedExceptionAction() {
                public Void run() throws RemoteException {
                    LiveRef var1x = new LiveRef(RegistryImpl.id, var1);
                    RegistryImpl.this.setup(new UnicastServerRef(var1x));
                    return null;
                }
            }, (AccessControlContext)null, new Permission[]{new SocketPermission("localhost:" + var1, "listen,accept")});
        } catch (PrivilegedActionException var3) {
            throw (RemoteException)var3.getException();
        }
    } else {
        LiveRef var2 = new LiveRef(id, var1);
        this.setup(new UnicastServerRef(var2));
    }
}

兩個分支最終都調用了setup()方法, 主要關註該方法.if分支中var1=1099是指默認端口並且存在安全管理器的時候不做校驗, 這是為了性能考慮.

private void setup(UnicastServerRef var1) throws RemoteException {
    this.ref = var1; // UnicastServerRef繼承了RemoteRef,this.ref的類型就是RemoteRef
    var1.exportObject(this, (Object)null, true); 
}

setup方法的參數是包裝後的UnicastServerRef對象, UnicastServerRef繼承了RemoteRef因此可以賦值給ref變量. 該方法將調用委托給UnicastServerRef的方法exportObject()
如果是拿文章開頭的代碼進行調試, 會發現這個方法會走兩次, 除了RegistryImpl, 還有一次是RmiTest也會走這個方法.不同的是RegistryImpl會走下面代碼中的if(var5 instanceof RemoteStub)分支語句, 這個語句最終將生成一個Skeleton實例並設置給當前實例的域變量skel, 不過自jdk1.2之後skeleton就沒什麽用了.

public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException {
    Class var4 = var1.getClass();

    Remote var5;
    try {
        var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse);
    } catch (IllegalArgumentException var7) {
        throw new ExportException("remote object implements illegal remote interface", var7);
    }

    if(var5 instanceof RemoteStub) {
        // 生成Skeleton實例並設置給當前實例的域變量skel
        this.setSkeleton(var1);
    }

    Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3);
    this.ref.exportObject(var6);  //ref是實例化UnicastServerRef的時候傳入的
    this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4);
    return var5;
}

上面方法首先根據Remote的參數var1創建了一個代理對象var5, var1是RegistryImpl類的實例. 然後實例化一個Target的實例, 從參數可以看到,Target對象包含了幾乎之前代碼的所有對象.然後將這個對象作為參數,調用LiveRef實例ref的exportObject()方法.

二. 網絡連接和對象傳輸

public void exportObject(Target var1) throws RemoteException {
    this.ep.exportObject(var1);
}

接上一步, RemoteRef的方法最終委托給TCPEndpoint的同名方法(委托模式), 到此代碼將控制權傳遞給傳輸層.

    public void exportObject(Target var1) throws RemoteException {
        synchronized(this) {
            this.listen();
            ++this.exportCount;
        }

        boolean var2 = false;
        boolean var12 = false;

        try {
            var12 = true;
            super.exportObject(var1);
            var2 = true;
            var12 = false;
        } finally {
            if (var12) {
                if (!var2) {
                    synchronized(this) {
                        this.decrementExportCount();
                    }
                }

            }
        }

        if (!var2) {
            synchronized(this) {
                this.decrementExportCount();
            }
        }

    }

這個方法實現了網絡通信, 首先linsten()啟動了一個ServerSocket的線程,並開始監聽端口. 然後調用父類的方法將Target對象暴露出去, 此時服務端的初始化就完成了.

三. 註冊服務

Naming.bind("rmi://localhost:8888/hello",rmiTest); 完成名稱和服務對象的綁定.

public static void bind(String name, Remote obj)
    throws AlreadyBoundException,
        java.net.MalformedURLException,
        RemoteException
{
    ParsedNamingURL parsed = parseURL(name);
    Registry registry = getRegistry(parsed);

    if (obj == null)
        throw new NullPointerException("cannot bind to null");

    registry.bind(parsed.name, obj);
}

上面代碼Naming類, 調用的是註冊器Registrybind()方法

public void bind(String var1, Remote var2) throws RemoteException, AlreadyBoundException, AccessException {
    Hashtable var3 = this.bindings;
    synchronized(this.bindings) {
        Remote var4 = (Remote)this.bindings.get(var1);
        if (var4 != null) {
            throw new AlreadyBoundException(var1);
        } else {
            this.bindings.put(var1, var2);
        }
    }
}

註冊使用的容器是一個HashTable, 最終服務的名稱和服務會被註冊到這個map容器中.

到此為止, 服務端的初始化完成. 首先實例化了一個實現Register註冊器的實例, 通過層層組裝, 最終生成一個Target對象, 其中包含了組裝過程中生成的全部狀態, 最後調用RemoteRef的方法將對象轉交給傳輸層對象TCPEndpoint的實例, 最終由這個對象啟動Socket開啟通信連接. 註冊服務是通過Naming的方法委托調用Register註冊器的方法實現, 並將結果最終註冊到Register域的map對象中.

四. 客戶端遠程調用

IRmiTest rmiTest = (IRmiTest) Naming.lookup("rmi://localhost:8888/hello"); 客戶端通過Naming的方法獲取服務的實例

public static Remote lookup(String name)
    throws NotBoundException,
        java.net.MalformedURLException,
        RemoteException{
    ParsedNamingURL parsed = parseURL(name);
    Registry registry = getRegistry(parsed);

    if (parsed.name == null)
        return registry;
    return registry.lookup(parsed.name);
}

與服務端註冊時候使用Naming.bind()方法一樣, 這裏lookup()最終也會委托給Registry的實例. 這個實例的實現不是用的服務端的Register_Impl, 而是使用RegistryImpl_Stub, 下面代碼是lookup()的實現, 可以看出這裏封裝了網絡io的一些邏輯.

public Remote lookup(String var1) throws AccessException, NotBoundException, RemoteException {
    try {
        RemoteCall var2 = this.ref.newCall(this, operations, 2, 4905912898345647071L);

        try {
            ObjectOutput var3 = var2.getOutputStream();
            var3.writeObject(var1);
        } catch (IOException var17) {
            throw new MarshalException("error marshalling arguments", var17);
        }

        this.ref.invoke(var2);

        Remote var22;
        try {
            ObjectInput var4 = var2.getInputStream();
            var22 = (Remote)var4.readObject();
        } catch (IOException var14) {
            throw new UnmarshalException("error unmarshalling return", var14);
        } catch (ClassNotFoundException var15) {
            throw new UnmarshalException("error unmarshalling return", var15);
        } finally {
            this.ref.done(var2);
        }

        return var22;
    } catch (RuntimeException var18) {
        throw var18;
    } catch (RemoteException var19) {
        throw var19;
    } catch (NotBoundException var20) {
        throw var20;
    } catch (Exception var21) {
        throw new UnexpectedException("undeclared checked exception", var21);
    }
}

至此, 服務端和客戶端的連接完成, 可以開始通信了.

RMI自JDK1.1就已經提供了, 它提供了Java語言自己的RPC調用方式, 雖然有些老舊, 但依然經典. 目前有很多跨語言的技術或框架, 如後來的WebService, 再到目前的netty,shrift等基本已經取代了這種原始的調用方式, 他們是非阻塞的,且還能跨語言調用. 但熟悉RMI的實現方式對了解分布式系統的通信的實現原理有很大幫助.

分布式系列五: RMI通信