1. 程式人生 > >RPC入門總結(三)RMI+Zookeeper實現遠端呼叫框架

RPC入門總結(三)RMI+Zookeeper實現遠端呼叫框架

一、RMI的弊端

RMI是侷限於Java語言中的RPC框架,除了其語言侷限之外,其實現上還有其他的一些弊端。

1. RMI的I/O模型使用BIO模型(偽非同步I/O),使用BIO和執行緒池的方式在大資料量、多連線情況下存在效能瓶頸。

2. RMI 使用了 Java 預設的序列化方式,對於效能要求比較高的系統,可能需要使用其它序列化方案來解決(例如:Protobuf)。
3. RMI 服務在執行時難免會存在出故障,例如,如果 RMI 服務無法連線了,就會導致客戶端無法響應的現象。
在一般的情況下,Java 預設的序列化方式確實已經足以滿足我們的要求了,如果效能方面如果不是問題的話,我們需要解決的實際上是第三點,也就是說,讓使系統具備 HA(High Availability,高可用性

)。

二、RMI+Zookeeper

要想解決 RMI 服務的高可用性問題,我們需要利用 ZooKeeper 充當一個 服務登錄檔(Service Registry),讓多個 服務提供者(Service Provider)形成一個叢集,讓 服務消費者(Service Consumer)通過服務登錄檔獲取具體的服務訪問地址(也就是 RMI 服務地址)去訪問具體的服務提供者。如下圖所示:

輸入圖片說明

需要注意的是,服務登錄檔並不是 Load Balancer(負載均衡器),提供的不是“反向代理”服務,而是“服務註冊”與“心跳檢測”功能。

利用服務登錄檔來註冊 RMI 地址,這個很好理解,那麼“心跳檢測”又如何理解呢?說白了就是通過服務中心定時向各個服務提供者傳送一個請求(實際上建立的是一個Socket 長連線

),如果長期沒有響應,服務中心就認為該服務提供者已經“掛了”,只會從還“活著”的服務提供者中選出一個做為當前的服務提供者。
也許讀者會考慮到,服務中心可能會出現單點故障,如果服務登錄檔都壞掉了,整個系統也就癱瘓了。看來要想實現這個架構,必須保證服務中心也具備高可用性。ZooKeeper 正好能夠滿足我們上面提到的所有需求:
1. 使用 ZooKeeper 的臨時性 ZNode 來存放服務提供者的 RMI 地址,一旦與服務提供者的 Session 中斷,會自動清除相應的 ZNode。
2. 讓服務消費者去監聽這些 ZNode,一旦發現 ZNode 的資料(RMI 地址)有變化,就會重新獲取一份有效資料的拷貝

3. ZooKeeper 與生俱來的叢集能力(例如:資料同步與領導選舉特性),可以確保服務登錄檔的高可用性。

三、RMI+Zookeeper的實現

1 服務提供者

需要編寫一個 ServiceProvider 類,來發布 RMI 服務,並將 RMI 地址註冊到 ZooKeeper 中(實際存放在 ZNode 上):

package com.king.zkrmi;
 
import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.io.IOException;
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.util.concurrent.CountDownLatch;
 
/**
 * RMI服務提供者
 */
public class ServiceProvider {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProvider.class);
 
    // 用於等待 SyncConnected 事件觸發後繼續執行當前執行緒
    private CountDownLatch latch = new CountDownLatch(1);
 
    // 釋出 RMI 服務並註冊 RMI 地址到 ZooKeeper 中
    public void publish(Remote remote, String host, int port) {
        String url = publishService(remote, host, port); // 釋出 RMI 服務並返回 RMI 地址
        if (url != null) {
            ZooKeeper zk = connectServer(); // 連線 ZooKeeper 伺服器並獲取 ZooKeeper 物件
            if (zk != null) {
                createNode(zk, url); // 建立 ZNode 並將 RMI 地址放入 ZNode 上
            }
        }
    }
 
    // 釋出 RMI 服務
    private String publishService(Remote remote, String host, int port) {
        String url = null;
        try {
            url = String.format("rmi://%s:%d/%s", host, port, remote.getClass().getName());
            LocateRegistry.createRegistry(port);
            Naming.rebind(url, remote);
            LOGGER.debug("publish rmi service (url: {})", url);
        } catch (RemoteException | MalformedURLException e) {
            LOGGER.error("", e);
        }
        return url;
    }
 
    // 連線 ZooKeeper 伺服器
    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown(); // 喚醒當前正在執行的執行緒
                    }
                }
            });
            latch.await(); // 使當前執行緒處於等待狀態
        } catch (IOException | InterruptedException e) {
            LOGGER.error("", e);
        }
        return zk;
    }
 
    // 建立 ZNode
    private void createNode(ZooKeeper zk, String url) {
        try {
            byte[] data = url.getBytes();
            String path = zk.create(Constant.ZK_PROVIDER_PATH, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);    // 建立一個臨時性且有序的 ZNode
            LOGGER.debug("create zookeeper node ({} => {})", path, url);
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("", e);
        }
    }
}
涉及到的 Constant 常量,見如下程式碼:
package com.king.zkrmi;
 
/**
 * ZK常量
 */
public interface Constant {
 
    String ZK_CONNECTION_STRING = "localhost:2181";
    int ZK_SESSION_TIMEOUT = 5000;
    String ZK_REGISTRY_PATH = "/registry";
    String ZK_PROVIDER_PATH = ZK_REGISTRY_PATH + "/provider";
}
注意:我們首先需要使用 ZooKeeper 的客戶端工具建立一個永續性 ZNode,名為“/registry”,該節點是不存放任何資料的,可使用如下命令:
create /registry null

2 服務消費者

服務消費者需要在建立的時候連線 ZooKeeper,同時監聽 /registry 節點的NodeChildrenChanged 事件,也就是說,一旦該節點的子節點有變化,就需要重新獲取最新的子節點。這裡提到的子節點,就是存放服務提供者釋出的 RMI 地址。需要強調的是,這些子節點都是臨時性的,當服務提供者與 ZooKeeper 服務登錄檔的 Session 中斷後,該臨時性節會被自動刪除。

package com.king.zkrmi;
 
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.io.IOException;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
 
/**
 * RMI服務消費者
 */
public class ServiceConsumer {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceConsumer.class);
 
    // 用於等待 SyncConnected 事件觸發後繼續執行當前執行緒
    private CountDownLatch latch = new CountDownLatch(1);
 
    // 定義一個 volatile 成員變數,用於儲存最新的 RMI 地址(考慮到該變數或許會被其它執行緒所修改,一旦修改後,該變數的值會影響到所有執行緒)
    private volatile List<String> urlList = new ArrayList<>();
 
    // 構造器
    public ServiceConsumer() {
        ZooKeeper zk = connectServer(); // 連線 ZooKeeper 伺服器並獲取 ZooKeeper 物件
        if (zk != null) {
            watchNode(zk); // 觀察 /registry 節點的所有子節點並更新 urlList 成員變數
        }
    }
 
    // 查詢 RMI 服務
    public <T extends Remote> T lookup() {
        T service = null;
        int size = urlList.size();
        if (size > 0) {
            String url;
            if (size == 1) {
                url = urlList.get(0); // 若 urlList 中只有一個元素,則直接獲取該元素
                LOGGER.debug("using only url: {}", url);
            } else {
                url = urlList.get(ThreadLocalRandom.current().nextInt(size)); // 若 urlList 中存在多個元素,則隨機獲取一個元素
                LOGGER.debug("using random url: {}", url);
            }
            service = lookupService(url); // 從 JNDI 中查詢 RMI 服務
        }
        return service;
    }
 
    // 連線 ZooKeeper 伺服器
    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown(); // 喚醒當前正在執行的執行緒
                    }
                }
            });
            latch.await(); // 使當前執行緒處於等待狀態
        } catch (IOException | InterruptedException e) {
            LOGGER.error("", e);
        }
        return zk;
    }
 
    // 觀察 /registry 節點下所有子節點是否有變化
    private void watchNode(final ZooKeeper zk) {
        try {
            List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getType() == Event.EventType.NodeChildrenChanged) {
                        watchNode(zk); // 若子節點有變化,則重新呼叫該方法(為了獲取最新子節點中的資料)
                    }
                }
            });
            List<String> dataList = new ArrayList<>(); // 用於存放 /registry 所有子節點中的資料
            for (String node : nodeList) {
                byte[] data = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null); // 獲取 /registry 的子節點中的資料
                dataList.add(new String(data));
            }
            LOGGER.debug("node data: {}", dataList);
            urlList = dataList; // 更新最新的 RMI 地址
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("", e);
        }
    }
 
    // 在 JNDI 中查詢 RMI 遠端服務物件
    @SuppressWarnings("unchecked")
    private <T> T lookupService(String url) {
        T remote = null;
        try {
            remote = (T) Naming.lookup(url);
        } catch (NotBoundException | MalformedURLException | RemoteException e) {
            if (e instanceof ConnectException) {
                // 若連線中斷,則使用 urlList 中第一個 RMI 地址來查詢(這是一種簡單的重試方式,確保不會丟擲異常)
                LOGGER.error("ConnectException -> url: {}", url);
                if (urlList.size() != 0) {
                    url = urlList.get(0);
                    return lookupService(url);
                }
            }
            LOGGER.error("", e);
        }
        return remote;
    }
}
3 釋出服務

我們需要呼叫 ServiceProvider 的 publish() 方法來發布 RMI 服務,釋出成功後也會自動在 ZooKeeper 中註冊 RMI 地址:

package com.king.zkrmi;
 
/**
 * 服務釋出
 */
public class Server {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("please using command: java Server <rmi_host> <rmi_port>");
            System.exit(-1);
        }
 
        String host = args[0];
        int port = Integer.parseInt(args[1]);
 
        ServiceProvider provider = new ServiceProvider();
 
        HelloService helloService = new HelloServiceImpl();
        provider.publish(helloService, host, port);
 
        Thread.sleep(Long.MAX_VALUE);
    }
}
注意:在執行 Server 類的 main() 方法時,一定要使用命令列引數來指定 host 與 port,例如:
java Server localhost 1099
java Server localhost 2099
以上兩條 Java 命令可在本地執行兩個 Server 程式,當然也可以同時執行更多的 Server 程式,只要 port 不同就行。
4 呼叫服務

通過呼叫 ServiceConsumer 的 lookup() 方法來查詢 RMI 遠端服務物件。我們使用一個“死迴圈”來模擬每隔 3 秒鐘呼叫一次遠端方法。

package com.king.zkrmi;
 
/**
 * RMI客戶端
 */
public class Client {
 
    public static void main(String[] args) throws Exception {
        ServiceConsumer consumer = new ServiceConsumer();
 
        while (true) {
            HelloService helloService = consumer.lookup();
            String result = helloService.sayHello("Jack");
            System.out.println(result);
            Thread.sleep(3000);
        }
    }
}

5 使用方法

根據以下步驟驗證 RMI 服務的高可用性:
1. 執行兩個 Server 程式,一定要確保 port 是不同的。
2. 執行一個 Client 程式。
3. 停止其中一個 Server 程式,並觀察 Client 控制檯的變化(停止一個 Server 不會導致 Client 端呼叫失敗)。
4. 重新啟動剛才關閉的 Server 程式,繼續觀察 Client 控制檯變化(新啟動的 Server 會加入候選)。
5. 先後停止所有的 Server 程式,還是觀察 Client 控制檯變化(Client 會重試連線,多次連線失敗後,自動關閉)。

四、總結

通過本文,我們嘗試使用 ZooKeeper 實現了一個簡單的 RMI 服務高可用性解決方案,通過 ZooKeeper 註冊所有服務提供者釋出的 RMI 服務,讓服務消費者監聽 ZooKeeper 的 Znode,從而獲取當前可用的 RMI 服務。此方案侷限於 RMI 服務,對於任何形式的服務(比如:WebService),也提供了一定參考。
如果再配合 ZooKeeper 自身的叢集,那才是一個相對完美的解決方案,對於 ZooKeeper 的叢集,請讀者自行實踐。

RMI是Java提供的RPC框架,雖然在I/O上使用簡單的BIO,在服務管理上使用Registry(JNDI)的方式完成,其實現較為簡單,但是在理解RPC框架時當仁不讓的會是年輕人的第一個RPC框架。通過對框架的學習和理解,年輕人會逐漸發現,RPC框架的技術核心點就在以下幾條(見:RPC入門總結(一)RPC定義和原理

1. 服務管理方式:本文中的服務管理使用了Zookeeper,實際上某些成熟RPC也使用Zookeeper作為服務管理和註冊工具(dubbo)

2. 代理物件生成:本文中的代理物件生成實際上採用常規方式,由客戶端和服務端自行建立和持有連線物件,而在成熟框架中可以採用Spring框架進行依賴注入(IoC),便於管理和程式設計。

3. I/O架構:本文中使用的RMI採用其底層BIO實現,在成熟框架中往往採用NIO/Netty/Mina等成熟的I/O多路複用方式實現(dubbo

)。

4. 序列化:本文中使用的RMI採用其底層位元組方式進行Marshell(序列化),在對效能要求較高的場合一般可以採用自定義私有序列化方式(Thrift)或JSON/ProtoBuf等成熟序列化框架完成(dubbo)。