1. 程式人生 > >微服務 Dubbo + Zookeeper 原理解析

微服務 Dubbo + Zookeeper 原理解析

補充:2018-04-20

值得一說的是:下方的 “透明” 是通過 動態代理對 “負載均衡和容錯”的封裝 。

這裡寫圖片描述

此圖配合下方案例程式碼可以更好的理解 分散式服務框架-RPC原理。

協議:
這裡寫圖片描述

說明 :內容為小編個人見解,同時做備忘用

基礎準備 : java Socket , serverSocket , RPC 協議。

(1) 網路通訊資料傳輸靠的就是 IO 流(byte[] 位元組) 。

(2) RPC 協議是指 : 利用tcp 通訊,對byte[] 加上自己的規則, 服務端和客戶端可以通過此規則進行資料的封裝和解析。

dubbo + zookeeper 程式碼演練

服務提供者 :

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>tony.test</groupId>
    <artifactId
>
dubbo-provider</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>dubbo</artifactId> <version>2.5.3</version> </dependency
>
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.8</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.7</version> </dependency> </dependencies> </project>

provider.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans        http://www.springframework.org/schema/beans/spring-beans.xsd        http://code.alibabatech.com/schema/dubbo        http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

    <!-- 提供方應用資訊,用於計算依賴關係 -->
    <dubbo:application name="hello-world-app"  />

    <!-- 使用zookeeper註冊中心暴露服務地址 -->
    <dubbo:registry address="zookeeper://localhost:2181" />

    <!-- 用dubbo協議在20880埠暴露服務 -->
    <!-- serialization 協議序列化方式,當協議支援多種序列化方式時使用,,預設hessian2
        比如:dubbo協議的dubbo,hessian2,java,compactedjava,以及http協議的json等 
    -->
    <dubbo:protocol name="dubbo" port="20880" serialization="fastjson"/>

    <!-- 宣告需要暴露的服務介面 -->
    <dubbo:service interface="com.tony.test.dubbo.DemoService" ref="demoService" />

    <!-- 和本地bean一樣實現服務 -->
    <bean id="demoService" class="com.tony.test.dubbo.provider.DemoServiceImpl" />

</beans>

服務介面 :

package com.tony.test.dubbo;

public interface DemoService {

    String sayHello(User user, String mark);
}

子類 :

package com.tony.test.dubbo.provider;

import com.tony.test.dubbo.DemoService;
import com.tony.test.dubbo.User;

public class DemoServiceImpl implements DemoService {

    public String sayHello(User user, String mark) {

        System.out.println("進來了----");

        return "agui>" + mark + " name>" + user.getName();
    }
}

main 入口 :

或者直接啟動tomcat

package com.tony.test.dubbo.provider;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ProviderMain {
    // 服務提供者
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                new String[] { "provider.xml" });
        context.start();

        System.in.read();
    }
}

服務消費者

pom.xml 同上 , 介面(interface)同上 。

consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans        http://www.springframework.org/schema/beans/spring-beans.xsd        http://code.alibabatech.com/schema/dubbo        http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

    <!-- 消費方應用名,用於計算依賴關係,不是匹配條件,不要與提供方一樣 -->
    <dubbo:application name="consumer-of-helloworld-app"  />

    <!-- 使用multicast廣播註冊中心暴露發現服務地址 -->
    <dubbo:registry address="zookeeper://localhost:2181" />

    <!-- 生成遠端服務代理,可以和本地bean一樣使用demoService -->
    <dubbo:reference id="demoService" interface="com.tony.test.dubbo.DemoService" timeout="30000"/>

</beans>

main 入口

或者啟動tomcat

package com.tony.test.dubbo.consumer;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.tony.test.dubbo.DemoService;
import com.tony.test.dubbo.User;

public class ConsumerMain {

    // 服務消費者
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "consumer.xml" });
        context.start();
        DemoService demoService = (DemoService) context.getBean("demoService"); 
        User user = new User();
        user.setName("agui");
        String hello = demoService.sayHello(user, "2"); 

        System.out.println("RPC呼叫結果:" + hello); 
    }

}

以上就是 dubbo + zookeeper 簡單demo 實現。

接下來講解一下實現原理

之前提到過:帶有規則的byte[] 位元組 ,那麼阿里的dubbo 對bute[] 規則 如圖 :

這裡寫圖片描述

這裡寫圖片描述

基本結構分為:header , body 兩部分,詳細內容參考上圖。

整體流程 :

1) 啟動配置檔案,spring bean 容器載入我們的介面實現類,服務提供者封裝serverSocket , 並通過 zookeeper客戶端在 zookeeper中建立節點, 節點中記錄服務端的ip, 埠,暴露的介面等資訊。

2) 客戶端通過zookeeper客戶端拿到服務端ip , 埠等資訊。

3)構建header 和 body 的位元組陣列(byte) , 通過socket 拿到輸出流,write() 方法輸出位元組陣列。

4)服務端拿到輸入流之後,會得到介面資訊,呼叫的方法名稱等,可以通過spring的bean 工廠類,拿到 注入到bean 中的 該介面子類物件,然後去掉用具體的方法,拿到返回值後可以進行序列化,在用 socket 和 serverSocket 建立的管道 輸出流 輸出 子類方法返回的值,此時 http 請求,響應完畢。

說明:http 通訊位元組流,轉為字元流,反序列化可以用阿里的

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.7</version>
</dependency>

模擬消費端原始碼 :

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>tony.test</groupId>
    <artifactId>tony-dubbo-client</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.7</version>
        </dependency>
    </dependencies>
</project>

byte[]原資料封裝類 :

package com.agui.test.consumer;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;

import com.alibaba.fastjson.JSON;

/**
 * 發起dubbo呼叫請求
 */
public class AguiDubboRpcRequest {

    String host; // 服務提供者IP
    int port; // 服務提供者埠

    // 協議中header段
    ByteBuffer header = ByteBuffer.allocate(16);

    // 協議中body段
    StringBuffer rpcBody = new StringBuffer();

    public TonyDubboRpcRequest(String host, int port) {
        this.host = host;
        this.port = port;

        // 魔數 da bb
        header.put((byte) 0xda);
        header.put((byte) 0xbb);

        // 標識 固定為 C6
        header.put((byte) 0xC6);

        // 響應狀態 ,這裡沒有
        header.put((byte) 0x00);

        // 訊息ID 編號傳進來

        // 資料長度 這個要在最後計算

    }

    /** 需要是完整的類名 */
    public TonyDubboRpcRequest dubboVersion(String dubboVersion) {
        rpcBody.append(JSON.toJSONString(dubboVersion));
        rpcBody.append("\r\n");
        return this;
    }

    /** 需要是完整的類名 */
    public TonyDubboRpcRequest path(String path) {
        rpcBody.append(JSON.toJSONString(path));
        rpcBody.append("\r\n");
        return this;
    }

    /** 服務版本號 */
    public TonyDubboRpcRequest serviceVersion(String serviceVersion) {
        rpcBody.append(JSON.toJSONString(serviceVersion));
        rpcBody.append("\r\n");
        return this;
    }

    /** 方法名 */
    public TonyDubboRpcRequest method(String method) {
        rpcBody.append(JSON.toJSONString(method));
        rpcBody.append("\r\n");
        return this;
    }

    /** 引數型別 */
    public TonyDubboRpcRequest desc(String desc) {
        rpcBody.append(JSON.toJSONString(desc));
        rpcBody.append("\r\n");
        return this;
    }

    /** 引數值 */
    public TonyDubboRpcRequest values(Object... values) {
        for (Object value : values) {
            rpcBody.append(JSON.toJSONString(value));
            rpcBody.append("\r\n");
        }
        return this;
    }

    /** 隱式傳參 */
    public TonyDubboRpcRequest attachments(Object attachment) {

        rpcBody.append(JSON.toJSONString(attachment));
        rpcBody.append("\r\n");
        return this;
    }

    /** 補齊header */
    public TonyDubboRpcRequest build(long msgId) {
        // 訊息ID
        byte[] msgIdBytes = new byte[8];
        long2bytes(msgId, msgIdBytes, 0);
        this.header.put(msgIdBytes);

        // 計算body長度
        int length = this.rpcBody.toString().getBytes().length;
        byte[] lenBytes = new byte[4];
        int2bytes(length, lenBytes, 0);
        this.header.put(lenBytes);

        return this;
    }

    /** int轉4位元組陣列 */
    private void int2bytes(int v, byte[] b, int off) {
        b[off + 3] = (byte) v;
        b[off + 2] = (byte) (v >>> 8);
        b[off + 1] = (byte) (v >>> 16);
        b[off + 0] = (byte) (v >>> 24);
    }

    /** long轉8位元組陣列 */
    private void long2bytes(long v, byte[] b, int off) {
        b[off + 7] = (byte) v;
        b[off + 6] = (byte) (v >>> 8);
        b[off + 5] = (byte) (v >>> 16);
        b[off + 4] = (byte) (v >>> 24);
        b[off + 3] = (byte) (v >>> 32);
        b[off + 2] = (byte) (v >>> 40);
        b[off + 1] = (byte) (v >>> 48);
        b[off + 0] = (byte) (v >>> 56);
    }

    public String run() throws Exception {
        System.out.println("###########輸出rcpBody內容:##########");
        System.out.println(this.rpcBody.toString());
        System.out.println("###########結束輸出###########");

        Socket socket = new Socket(this.host, this.port);
        try {
            OutputStream rpcOPS = socket.getOutputStream();
            // 發起呼叫請求
            rpcOPS.write(header.array());
            rpcOPS.write(this.rpcBody.toString().getBytes());

            // 輸出RPC呼叫結果
            InputStream rpcRsp = socket.getInputStream();
            byte[] header = new byte[16];
            rpcRsp.read(header);

            byte[] resp = new byte[1024];
            rpcRsp.read(resp);

            // 忽略header,取body
            return new String(resp);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            socket.close();
        }
        return null;
    }
}

程式入口 :

package com.agui.test.consumer;

import com.alibaba.fastjson.JSONObject;

public class AguiConsumerMain {

    // 手寫dubbo客戶端
    public static void main(String[] args) throws Exception {
        // TODO 配置註冊中心
        // TODO 獲取服務資訊
        // 本例項重點在於分析dubbo rpc協議內容,故此消費者假定已經知曉服務端的資訊。

        JSONObject user = JSONObject.parseObject("{\"name\":\"tony\"}");

        TonyDubboRpcRequest tonyDubboRpcRequest = new TonyDubboRpcRequest("127.0.0.1", 20880);
        tonyDubboRpcRequest
        .dubboVersion("2.5.3")
        .path("com.tony.test.dubbo.DemoService")
        .serviceVersion("0.0.0.0")
        .method("sayHello")
        .desc("Lcom/tony/test/dubbo/User;Ljava/lang/String;")
        .values(user, "測試tony_1")
        .attachments(JSONObject.parseObject("{\"path\":\"com.tony.test.dubbo.DemoService\",\"interface\":\"com.tony.test.dubbo.DemoService\",\"version\":\"0.0.0\"}"))
        .build(3);

        String result = tonyDubboRpcRequest.run();

        System.out.println("RPC呼叫結果:");
        System.out.println(result);
    }

}

執行程式的時候需要啟動上面的:服務提供者,zookeeper 。