使用Zookeeper實現動態負載均衡
二、Dubbo原理
生產者:
<!-- 提供方應用資訊,用於計算依賴關係 -->
<dubbo:application name="provider"/>
<!-- 使用zookeeper註冊中心暴露服務地址 -->
<dubbo:registry address="${zookeeper.register.address}" protocol="zookeeper" client="zkclient"/>
<!-- 用dubbo協議在20880埠暴露服務 -->
<dubbo:protocol name="dubbo" port="${dubbo.provider.port}"/>
<!-- 測試介面-->
<dubbo:service interface="com.dubbo.TestService" ref="testService" version="1.0.1"/>
消費者:
<!-- 提供方應用資訊,用於計算依賴關係 -->
<dubbo:application name="consumer"/>
<dubbo:consumer timeout="300000" retries="0"/>
<!-- 使用zookeeper註冊中心暴露服務地址 -->
<dubbo:registry address="${zookeeper.register.address}" protocol="zookeeper" client="zkclient"/>
<!-- 測試介面-->
<dubbo:reference id="testService" interface="com.dubbo.TestService" version="1.0.1"/>
三、Zookeeper概述
四、Java操作Zookeeper
啟動zookeeper: 將zoo_sample.cfg重新命名zoo.cfg 執行zkServer.cmd
Zookeeper資料檢視工具ZooInspector
1、下載https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
2、解壓,進入目錄ZooInspector\build,執行zookeeper-dev-ZooInspector.jar
引入pom.xml依賴
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
建立永久節點:PERSISTENT
import org.I0Itec.zkclient.ZkClient; import org.apache.zookeeper.CreateMode; /** * Created by yz on 2018/03/31. */ public classTestZookeeper { public static void main(String[] args) { // 60000 session超時時間;1000 連線超時時間 ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); // 節點(路徑);值;節點型別 PERSISTENT永久節點 zkClient.create("/user","xiaoming", CreateMode.PERSISTENT); zkClient.close(); System.out.println("###註冊成功###"); } }
節點不允許有重複,再次建立/user節點,會報錯
建立子節點:
public static void main(String[] args) { // 60000 session超時時間;1000 連線超時時間 ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); // 節點(路徑);值;節點型別 PERSISTENT永久節點 zkClient.create("/user/zk01","xiaohong", CreateMode.PERSISTENT); zkClient.close(); System.out.println("###註冊成功###"); }
建立臨時節點:EPHEMERAL
/** * Created by yz on 2018/03/31. */ public class TestZookeeper { public static void main(String[] args) { // 60000 session超時時間;1000 連線超時時間 ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); // 節點(路徑);值;節點型別 PERSISTENT永久節點 zkClient.create("/user_temp","xiaoming", CreateMode.EPHEMERAL); zkClient.close(); System.out.println("###註冊成功###"); } }
臨時節點可以建立成功,但是zk會話關閉之後,會刪除。
Thread.sleep(10000); zkClient.close();五、Dubbo負載均衡原理
啟動兩個provider 埠號20880,埠號20881
啟動consumer呼叫provider,輪詢機制呼叫,dubbo預設有負載均衡演算法。
dubbo怎麼實現負載均衡的?
生產者將服務介面地址註冊到註冊中心,在註冊中心會有兩個叢集地址
消費者只需要通過“com.alibaba.dubbo.demo.DemoService” 這個節點名稱,先找下面的子節點,訂單服務能夠獲取到會員服務整個叢集地址,通過本地負載均衡演算法,算出來去呼叫哪一臺會員服務。
六、服務註冊到Zookeeper節點上
使用程式碼去實現在Zookeeper上註冊多節點(模擬dubbo註冊到Zookeeper) 步驟:1.建兩個伺服器端,把會員服務資訊地址註冊到註冊中心上去
2.訂單服務訂閱註冊中心節點地址,當有新的地址之後,及時更新
3.本地通過負載均衡演算法去呼叫哪一臺伺服器
伺服器端(生產者):
import org.I0Itec.zkclient.ZkClient; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.atomic.AtomicInteger; /** * 服務端 生產者 每啟一個服務都註冊到zookeeper中心上去 * socket伺服器端 * Created by yz on 2018/03/31. */ public class ZkServerSocket implements Runnable{ private static int port = 8081; public ZkServerSocket(int port) { this.port = port; } // 多執行緒執行緒安全計數器 AtomicInteger ai=new AtomicInteger(0); public static void main(String[] args) { ZkServerSocket server = new ZkServerSocket(port); Thread thread = new Thread(server); thread.start(); } /** * 將服務註冊到zookeeper,子節點不要用持久節點,用臨時節點 */ public void regZkServer(){ ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); // 1.建立父節點/server 首先執行TestZookeeper 建立/server父節點 持久節點 String path = "/server/server"+port; // 判斷節點是否在zookeeper上存在,如果存在刪除 if(zkClient.exists(path)){ zkClient.delete(path); } // 2.將每臺服務啟動,將所有子節點全部註冊到/server節點下 // key:表示當前節點(path) value:表示伺服器IP地址和埠號 zkClient.createEphemeral(path,"127.0.0.1:"+port); } public void run() { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(port); System.out.println("Server start port:"+port); // 當前服務啟起來後,把當前所有的啟動資訊註冊到zookeeper上,建立節點 regZkServer(); Socket socket = null; while (true){ socket = serverSocket.accept(); // 客戶端傳送的訊息全部放在ServerHandler中,它開啟一個執行緒,提高整個程式的一個執行效率 new Thread(new ServerHandler(socket,ai)).start(); } } catch (IOException e) { e.printStackTrace(); }finally { try { if(serverSocket !=null){ serverSocket.close(); } } catch (IOException e) { e.printStackTrace(); } } } }
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.concurrent.atomic.AtomicInteger; /** * Created by yz on 2018/03/31. */ public class ServerHandler implements Runnable{ private Socket socket; private AtomicInteger ai; public ServerHandler(Socket socket,AtomicInteger ai) { this.socket = socket; this.ai = ai; } public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(),true); String body = null; while (true){ body = in.readLine(); if(body == null){ break; } System.out.println("接收到客戶端傳送來的訊息: "+body); out.println("這裡是伺服器端傳送的訊息,Hello, "+body+",count:"+ai.getAndIncrement()); } } catch (IOException e) { if(in !=null){ try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if(out !=null){ out.close(); } } } }客戶端(消費者):
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.ArrayList; import java.util.List; /** * 客戶端 消費者 * socket客戶端連線 * Created by yz on 2018/03/31. */ public class ZkServerClient { // 用來存放所有伺服器端IP地址 public static List<String> listServer = new ArrayList<String>(); public static void main(String[] args) { initServer(); ZkServerClient client = new ZkServerClient(); // 讀取鍵盤輸入資料 BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true){ String name; try { name = console.readLine(); if("exit".equals(name)){ System.exit(0); } // 給伺服器端傳送訊息 client.send(name); } catch (IOException e) { e.printStackTrace(); } } } /** * 註冊所有server */ public static void initServer(){ listServer.clear(); listServer.add("127.0.0.1:8080"); } /** * 獲取當前server資訊 * @return */ public static String getServer(){ return listServer.get(0); } public void send(String name){ String server = ZkServerClient.getServer(); String[] cfg = server.split(":"); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { // 引數1:伺服器端IP 引數2:埠號 socket = new Socket(cfg[0],Integer.parseInt(cfg[1])); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); out.println(name); while (true){ String resp = in.readLine(); if(resp == null){ break; }else if(resp.length()>0){ System.out.println("接收到伺服器端傳回來的訊息: "+resp); break; } } } catch (IOException e) { e.printStackTrace(); }finally { if(in !=null){ try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if(out != null){ out.close(); } } } }
啟動兩個伺服器端,埠號分別為:8080、8081 效果如下:
七、消費者使用Zookeeper實現動態負載均衡
消費者使用Zookeeper實現動態負載均衡,即所有伺服器地址都不是寫死的,而是實時去Zookeeper註冊中心上去獲取到的。
import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.ArrayList; import java.util.List; /** * 客戶端 消費者 * socket客戶端連線 * Created by yz on 2018/03/31. */ public class ZkServerClient { // 用來存放所有伺服器端IP地址 public static List<String> listServer = new ArrayList<String>(); public static void main(String[] args) { initServer(); ZkServerClient client = new ZkServerClient(); // 讀取鍵盤輸入資料 BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true){ String name; try { name = console.readLine(); if("exit".equals(name)){ System.exit(0); } // 給伺服器端傳送訊息 client.send(name); } catch (IOException e) { e.printStackTrace(); } } } /** * 註冊所有server */ public static void initServer(){ final ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); // 讀取叢集伺服器下節點資訊 final String path = "/server"; // 獲取/server 節點下所有子節點 final List<String> children = zkClient.getChildren(path); for (String chipath : children) { // 根據子節點完整路徑讀取value,新增到集合 listServer.add((String) zkClient.readData(path+"/"+chipath)); } System.out.println("###listServer:"+listServer.toString()); // 重要的一步,當節點發生變化的時候 重新讀取,比如服務端有一臺伺服器掛掉,客戶端也要做更新,通過zk事件監聽 // 引數1表示訂閱的監聽地址 引數2 監聽 zkClient.subscribeChildChanges(path, new IZkChildListener() { // 引數1父節點 引數2所有子節點 public void handleChildChange(String parentPath, List<String> parentPathchilist) throws Exception { // 監聽到節點發生變化,更新節點資訊 listServer.clear(); for (String chipath : parentPathchilist) { // 根據子節點完整路徑讀取value,新增到集合 listServer.add((String) zkClient.readData(parentPath+"/"+chipath)); } System.out.println("###事件通知,listServer:"+listServer.toString()); } }); // listServer.clear(); // listServer.add("127.0.0.1:8080"); } /** * 獲取當前server資訊 * @return */ public static String getServer(){ // 還沒有使用負載均衡 return listServer.get(0); } public void send(String name){ String server = ZkServerClient.getServer(); String[] cfg = server.split(":"); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { // 引數1:伺服器端IP 引數2:埠號 socket = new Socket(cfg[0],Integer.parseInt(cfg[1])); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); out.println(name); while (true){ String resp = in.readLine(); if(resp == null){ break; }else if(resp.length()>0){ System.out.println("接收到伺服器端傳回來的訊息: "+resp); break; } } } catch (IOException e) { e.printStackTrace(); }finally { if(in !=null){ try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if(out != null){ out.close(); } } } }啟動消費者端,檢視通過zk註冊中心訂閱效果:
現在手動強制停掉8081服務端(會有延遲)
現在消費者再次傳送訊息,只會發給8080埠伺服器端
八、消費者加入輪訓機制演算法
負載均衡輪詢機制演算法---請求次數%伺服器數量=機器位置
第一臺伺服器,位置0 8080
第二臺伺服器,位置1 8081
第一次請求
1%2=1 找8081
第二次請求
2%2=0 找8080
第三次請求
3%2=1 找8081
......
程式碼實現:
import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** * 客戶端 消費者 * socket客戶端連線 * Created by yz on 2018/03/31. */ public class ZkServerClient { // 用來存放所有伺服器端IP地址 public static List<String> listServer = new ArrayList<String>(); public static void main(String[] args) { initServer(); ZkServerClient client = new ZkServerClient(); // 讀取鍵盤輸入資料 BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true){ String name; try { name = console.readLine(); if("exit".equals(name)){ System.exit(0); } // 給伺服器端傳送訊息 client.send(name); } catch (IOException e) { e.printStackTrace(); } } } /** * 註冊所有server */ public static void initServer(){ final ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000); // 讀取叢集伺服器下節點資訊 final String path = "/server"; // 獲取/server 節點下所有子節點 final List<String> children = zkClient.getChildren(path); for (String chipath : children) { // 根據子節點完整路徑讀取value,新增到集合 listServer.add((String) zkClient.readData(path+"/"+chipath)); } System.out.println("###listServer:"+listServer.toString()); // 重要的一步,當節點發生變化的時候 重新讀取,比如服務端有一臺伺服器掛掉,客戶端也要做更新,通過zk事件監聽 // 引數1表示訂閱的監聽地址 引數2 監聽 zkClient.subscribeChildChanges(path, new IZkChildListener() { // 引數1父節點 引數2所有子節點 public void handleChildChange(String parentPath, List<String> parentPathchilist) throws Exception { // 監聽到節點發生變化,更新節點資訊 listServer.clear(); for (String chipath : parentPathchilist) { // 根據子節點完整路徑讀取value,新增到集合 listServer.add((String) zkClient.readData(parentPath+"/"+chipath)); } System.out.println("###事件通知,listServer:"+listServer.toString()); } }); // listServer.clear(); // listServer.add("127.0.0.1:8080"); } // 定義請求次數 int count = 0;--> 多執行緒時需要加同步 // 多執行緒執行緒安全計數器 static AtomicInteger ai = new AtomicInteger(0); /** * 獲取當前server資訊 * @return */ public static String getServer(){ // 獲取叢集伺服器數量 int listServerCount = listServer.size(); // 負載均衡輪詢演算法,算出要呼叫的機器. 請求數%伺服器數量=機器位置 // getAndIncrement 表示count++操作 String serverhost = listServer.get(ai.getAndIncrement() % listServerCount); return serverhost; } public void send(String name){ String server = ZkServerClient.getServer(); String[] cfg = server.split(":"); Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { // 引數1:伺服器端IP 引數2:埠號 socket = new Socket(cfg[0],Integer.parseInt(cfg[1])); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); out.println(name); while (true){ String resp = in.readLine(); if(resp == null){ break; }else if(resp.length()>0){ System.out.println("接收到伺服器端傳回來的訊息: "+resp); break; } } } catch (IOException e) { e.printStackTrace(); }finally { if(in !=null){ try { in.close(); } catch (IOException e1) { e1.printStackTrace(); } } if(out != null){ out.close(); } } } }
將8081伺服器啟起來
重啟消費者端,檢視消費者端使用負載均衡輪詢機制演算法呼叫效果 ,見證奇蹟的時刻: