1. 程式人生 > >使用Zookeeper實現動態負載均衡

使用Zookeeper實現動態負載均衡

一、負載均衡概述 

二、Dubbo原理 

生產者:
<!-- 提供方應用資訊,用於計算依賴關係 -->
<dubbo:application name="provider"/>

<!-- 使用zookeeper註冊中心暴露服務地址 -->
<dubbo:registry address="${zookeeper.register.address}" protocol="zookeeper" client="zkclient"/>

<!-- 用dubbo協議在20880埠暴露服務 -->
<dubbo:protocol name="dubbo" port="${dubbo.provider.port}"/>

<!-- 測試介面-->
<dubbo:service interface="com.dubbo.TestService" ref="testService" version="1.0.1"/>

消費者:
<!-- 提供方應用資訊,用於計算依賴關係 -->
<dubbo:application name="consumer"/>

<dubbo:consumer timeout="300000" retries="0"/>

<!-- 使用zookeeper註冊中心暴露服務地址 -->
<dubbo:registry address="${zookeeper.register.address}" protocol="zookeeper" client="zkclient"/>

<!-- 測試介面-->
<dubbo:reference id="testService" interface="com.dubbo.TestService" version="1.0.1"/>


三、Zookeeper概述 


四、Java操作Zookeeper 

啟動zookeeper: 將zoo_sample.cfg重新命名zoo.cfg 執行zkServer.cmd

Zookeeper資料檢視工具ZooInspector

1、下載https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip

2、解壓,進入目錄ZooInspector\build,執行zookeeper-dev-ZooInspector.jar

引入pom.xml依賴

<dependency
> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>

建立永久節點:PERSISTENT

import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;

/**
 * Created by yz on 2018/03/31.
 */
public class 
TestZookeeper { public static void main(String[] args) { // 60000 session超時時間;1000 連線超時時間 ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); // 節點(路徑);值;節點型別 PERSISTENT永久節點 zkClient.create("/user","xiaoming", CreateMode.PERSISTENT); zkClient.close(); System.out.println("###註冊成功###"); } }


節點不允許有重複,再次建立/user節點,會報錯


建立子節點:

public static void main(String[] args) {
    // 60000 session超時時間;1000 連線超時時間
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000);
    // 節點(路徑);值;節點型別 PERSISTENT永久節點
    zkClient.create("/user/zk01","xiaohong", CreateMode.PERSISTENT);
    zkClient.close();
    System.out.println("###註冊成功###");
}

 

建立臨時節點:EPHEMERAL

/**
 * Created by yz on 2018/03/31.
 */
public class TestZookeeper {
    public static void main(String[] args) {
        // 60000 session超時時間;1000 連線超時時間
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000);
        // 節點(路徑);值;節點型別 PERSISTENT永久節點
        zkClient.create("/user_temp","xiaoming", CreateMode.EPHEMERAL);
        zkClient.close();
        System.out.println("###註冊成功###");
    }
}

臨時節點可以建立成功,但是zk會話關閉之後,會刪除。

Thread.sleep(10000);
zkClient.close();
五、Dubbo負載均衡原理 


啟動兩個provider  埠號20880,埠號20881



啟動consumer呼叫provider,輪詢機制呼叫,dubbo預設有負載均衡演算法。



dubbo怎麼實現負載均衡的?
生產者將服務介面地址註冊到註冊中心,在註冊中心會有兩個叢集地址
消費者只需要通過“com.alibaba.dubbo.demo.DemoService” 這個節點名稱,先找下面的子節點,訂單服務能夠獲取到會員服務整個叢集地址,通過本地負載均衡演算法,算出來去呼叫哪一臺會員服務。

dubboDemo github地址:https://github.com/yangzeng1211/dubboDemo.git

六、服務註冊到Zookeeper節點上 

使用程式碼去實現在Zookeeper上註冊多節點(模擬dubbo註冊到Zookeeper) 步驟:
1.建兩個伺服器端,把會員服務資訊地址註冊到註冊中心上去
2.訂單服務訂閱註冊中心節點地址,當有新的地址之後,及時更新

3.本地通過負載均衡演算法去呼叫哪一臺伺服器

伺服器端(生產者):

import org.I0Itec.zkclient.ZkClient;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 服務端 生產者  每啟一個服務都註冊到zookeeper中心上去
 * socket伺服器端
 * Created by yz on 2018/03/31.
 */
public class ZkServerSocket implements Runnable{
    private static int port = 8081;

    public ZkServerSocket(int port) {
        this.port = port;
    }

    // 多執行緒執行緒安全計數器
    AtomicInteger ai=new AtomicInteger(0);

    public static void main(String[] args) {
        ZkServerSocket server = new ZkServerSocket(port);
        Thread thread = new Thread(server);
        thread.start();
    }

    /**
     * 將服務註冊到zookeeper,子節點不要用持久節點,用臨時節點
     */
    public void regZkServer(){
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000);
        // 1.建立父節點/server  首先執行TestZookeeper 建立/server父節點 持久節點
        String path = "/server/server"+port;
        // 判斷節點是否在zookeeper上存在,如果存在刪除
        if(zkClient.exists(path)){
            zkClient.delete(path);
        }
        // 2.將每臺服務啟動,將所有子節點全部註冊到/server節點下
        // key:表示當前節點(path)  value:表示伺服器IP地址和埠號
        zkClient.createEphemeral(path,"127.0.0.1:"+port);
    }

    public void run() {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);
            System.out.println("Server start port:"+port);
            // 當前服務啟起來後,把當前所有的啟動資訊註冊到zookeeper上,建立節點
            regZkServer();
            Socket socket = null;
            while (true){
                socket = serverSocket.accept();
                // 客戶端傳送的訊息全部放在ServerHandler中,它開啟一個執行緒,提高整個程式的一個執行效率
                new Thread(new ServerHandler(socket,ai)).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                if(serverSocket !=null){
                    serverSocket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by yz on 2018/03/31.
 */
public class ServerHandler implements Runnable{
    private Socket socket;

    private AtomicInteger ai;

    public ServerHandler(Socket socket,AtomicInteger ai) {
        this.socket = socket;
        this.ai = ai;
    }

    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(),true);
            String body = null;
            while (true){
                body = in.readLine();
                if(body  == null){
                    break;
                }
                System.out.println("接收到客戶端傳送來的訊息: "+body);
                out.println("這裡是伺服器端傳送的訊息,Hello, "+body+",count:"+ai.getAndIncrement());
            }
        } catch (IOException e) {
            if(in !=null){
                try {
                    in.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            if(out !=null){
                out.close();
            }
        }
    }
}
客戶端(消費者):
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

/**
 * 客戶端 消費者
 * socket客戶端連線
 * Created by yz on 2018/03/31.
 */
public class ZkServerClient {

    // 用來存放所有伺服器端IP地址
    public static List<String> listServer = new ArrayList<String>();

    public static void main(String[] args) {
        initServer();
        ZkServerClient client = new ZkServerClient();
        // 讀取鍵盤輸入資料
        BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
        while (true){
            String name;
            try {
                name = console.readLine();
                if("exit".equals(name)){
                    System.exit(0);
                }
                // 給伺服器端傳送訊息
                client.send(name);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 註冊所有server
     */
    public static void initServer(){
        listServer.clear();
        listServer.add("127.0.0.1:8080");
    }

    /**
     * 獲取當前server資訊
     * @return
     */
    public static String getServer(){
        return listServer.get(0);
    }
    
    public void send(String name){
        String server = ZkServerClient.getServer();
        String[] cfg = server.split(":");
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;

        try {
            // 引數1:伺服器端IP  引數2:埠號
            socket = new Socket(cfg[0],Integer.parseInt(cfg[1]));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(),true);
            out.println(name);
            while (true){
                String resp = in.readLine();
                if(resp == null){
                    break;
                }else if(resp.length()>0){
                    System.out.println("接收到伺服器端傳回來的訊息: "+resp);
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(in !=null){
                try {
                    in.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            if(out != null){
                out.close();
            }
        }
    }
    
}

啟動兩個伺服器端,埠號分別為:8080、8081 效果如下:




七、消費者使用Zookeeper實現動態負載均衡 

消費者使用Zookeeper實現動態負載均衡,即所有伺服器地址都不是寫死的,而是實時去Zookeeper註冊中心上去獲取到的。

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

/**
 * 客戶端 消費者
 * socket客戶端連線
 * Created by yz on 2018/03/31.
 */
public class ZkServerClient {

    // 用來存放所有伺服器端IP地址
    public static List<String> listServer = new ArrayList<String>();

    public static void main(String[] args) {
        initServer();
        ZkServerClient client = new ZkServerClient();
        // 讀取鍵盤輸入資料
        BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
        while (true){
            String name;
            try {
                name = console.readLine();
                if("exit".equals(name)){
                    System.exit(0);
                }
                // 給伺服器端傳送訊息
                client.send(name);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 註冊所有server
     */
    public static void initServer(){
        final ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000);
        // 讀取叢集伺服器下節點資訊
        final String path = "/server";
        // 獲取/server 節點下所有子節點
        final List<String> children = zkClient.getChildren(path);
        for (String chipath : children) {
            // 根據子節點完整路徑讀取value,新增到集合
            listServer.add((String) zkClient.readData(path+"/"+chipath));
        }
        System.out.println("###listServer:"+listServer.toString());

        // 重要的一步,當節點發生變化的時候 重新讀取,比如服務端有一臺伺服器掛掉,客戶端也要做更新,通過zk事件監聽
        // 引數1表示訂閱的監聽地址 引數2 監聽
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            // 引數1父節點 引數2所有子節點
            public void handleChildChange(String parentPath, List<String> parentPathchilist) throws Exception {
                // 監聽到節點發生變化,更新節點資訊
                listServer.clear();
                for (String chipath : parentPathchilist) {
                    // 根據子節點完整路徑讀取value,新增到集合
                    listServer.add((String) zkClient.readData(parentPath+"/"+chipath));
                }
                System.out.println("###事件通知,listServer:"+listServer.toString());
            }
        });

//        listServer.clear();
//        listServer.add("127.0.0.1:8080");
    }

    /**
     * 獲取當前server資訊
     * @return
     */
    public static String getServer(){
        // 還沒有使用負載均衡
        return listServer.get(0);
    }
    
    public void send(String name){
        String server = ZkServerClient.getServer();
        String[] cfg = server.split(":");
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;

        try {
            // 引數1:伺服器端IP  引數2:埠號
            socket = new Socket(cfg[0],Integer.parseInt(cfg[1]));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(),true);
            out.println(name);
            while (true){
                String resp = in.readLine();
                if(resp == null){
                    break;
                }else if(resp.length()>0){
                    System.out.println("接收到伺服器端傳回來的訊息: "+resp);
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(in !=null){
                try {
                    in.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            if(out != null){
                out.close();
            }
        }
    }
}
啟動消費者端,檢視通過zk註冊中心訂閱效果:

現在手動強制停掉8081服務端(會有延遲)


現在消費者再次傳送訊息,只會發給8080埠伺服器端




八、消費者加入輪訓機制演算法 

負載均衡輪詢機制演算法---請求次數%伺服器數量=機器位置
第一臺伺服器,位置0 8080
第二臺伺服器,位置1 8081
第一次請求
1%2=1  找8081
第二次請求
2%2=0  找8080
第三次請求
3%2=1  找8081
......

程式碼實現:

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 客戶端 消費者
 * socket客戶端連線
 * Created by yz on 2018/03/31.
 */
public class ZkServerClient {

    // 用來存放所有伺服器端IP地址
    public static List<String> listServer = new ArrayList<String>();

    public static void main(String[] args) {
        initServer();
        ZkServerClient client = new ZkServerClient();
        // 讀取鍵盤輸入資料
        BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
        while (true){
            String name;
            try {
                name = console.readLine();
                if("exit".equals(name)){
                    System.exit(0);
                }
                // 給伺服器端傳送訊息
                client.send(name);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 註冊所有server
     */
    public static void initServer(){
        final ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000);
        // 讀取叢集伺服器下節點資訊
        final String path = "/server";
        // 獲取/server 節點下所有子節點
        final List<String> children = zkClient.getChildren(path);
        for (String chipath : children) {
            // 根據子節點完整路徑讀取value,新增到集合
            listServer.add((String) zkClient.readData(path+"/"+chipath));
        }
        System.out.println("###listServer:"+listServer.toString());

        // 重要的一步,當節點發生變化的時候 重新讀取,比如服務端有一臺伺服器掛掉,客戶端也要做更新,通過zk事件監聽
        // 引數1表示訂閱的監聽地址 引數2 監聽
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            // 引數1父節點 引數2所有子節點
            public void handleChildChange(String parentPath, List<String> parentPathchilist) throws Exception {
                // 監聽到節點發生變化,更新節點資訊
                listServer.clear();
                for (String chipath : parentPathchilist) {
                    // 根據子節點完整路徑讀取value,新增到集合
                    listServer.add((String) zkClient.readData(parentPath+"/"+chipath));
                }
                System.out.println("###事件通知,listServer:"+listServer.toString());
            }
        });

//        listServer.clear();
//        listServer.add("127.0.0.1:8080");
    }

    // 定義請求次數 int count = 0;--> 多執行緒時需要加同步
    // 多執行緒執行緒安全計數器
    static AtomicInteger ai = new AtomicInteger(0);
    /**
     * 獲取當前server資訊
     * @return
     */
    public static String getServer(){
        // 獲取叢集伺服器數量
        int listServerCount = listServer.size();
        // 負載均衡輪詢演算法,算出要呼叫的機器. 請求數%伺服器數量=機器位置
        // getAndIncrement 表示count++操作
        String serverhost = listServer.get(ai.getAndIncrement() % listServerCount);
        return serverhost;
    }
    
    public void send(String name){
        String server = ZkServerClient.getServer();
        String[] cfg = server.split(":");
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;

        try {
            // 引數1:伺服器端IP  引數2:埠號
            socket = new Socket(cfg[0],Integer.parseInt(cfg[1]));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(),true);
            out.println(name);
            while (true){
                String resp = in.readLine();
                if(resp == null){
                    break;
                }else if(resp.length()>0){
                    System.out.println("接收到伺服器端傳回來的訊息: "+resp);
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(in !=null){
                try {
                    in.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            if(out != null){
                out.close();
            }
        }
    }
}

將8081伺服器啟起來


重啟消費者端,檢視消費者端使用負載均衡輪詢機制演算法呼叫效果 ,見證奇蹟的時刻: