1. 程式人生 > >zookeeper HA 實現負載均衡

zookeeper HA 實現負載均衡

在網上看到太多千篇一律的zookeeper相關的文章,都是定義,沒有一個是有完整程式碼的,這對自己學習zk十分困難,其實要用zk實現主備切換、負載均衡其實沒有自己想象的那麼難,只需要瞭解zk的基本特性即可。在這裡貼上自己寫的程式碼與自己的理解,大家多多指教! 一、思路 軟負載說簡單點,就是將Client端的請求均勻的分配到不同 的server端。下面我們來說說實現的基本思路:
  1. 註冊:首先你需要確定一個父節點,在這裡父節點的名稱暫且就叫/parentNode;每個server端啟動時首先向zk的叢集的父節點/parentNode下去註冊一個臨時的子節點,這樣當有N臺server時,註冊的子節點就是/parentNode/server1、
    /parentNode/server2.........、/parentNode/serverN。節點的資料就存每臺server的ip 與port,這樣你服務端不管是用rmi協議還是http協議,都可以向這個伺服器傳送請求了。做完這些你就成功了一半,繼續加油哦~
  2. 獲取服務列表:在Client端實現輪詢分發的功能,實現Watcher介面,這會讓你實時的監控服務端的變化。首先去獲取父節點/parentNode下所有的子節點,得到之前存的ip與port,然後將這些列表快取到一map中,這裡就叫serverUrlCacheMap。由於實現了Watcher介面,當父節點發生變化時zk 的叢集會通知Client端,此時Client端只要重新獲取父節點下所有子節點的資料,重新快取即可
  3. 輪詢分發:定義一個全域性變數index ,每次發起請求時,直接去serverUrlCacheMap中獲取這個編號的URL,然後傳送給server,就可達到輪詢分發的功能。
二、程式碼示例 說了這麼多,相信大家有點思路了,不明白也沒關係,下面的程式碼會給大家解釋的很詳細,在這裡你需要安裝配置一個zk的叢集。 網上 隨便搜一搜都有好多資料 服務端:
package com.newcosoft.lsmp.bank.server;

import java.rmi.Naming;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.newcosoft.lsmp.bank.client.config.BankConfig;
import com.newcosoft.lsmp.bank.client.constant.BankConstant;
import com.newcosoft.lsmp.bank.client.zookeeper.BankZookHelper;
import com.newcosoft.lsmp.bank.server.biz.front.rmi.RmiService;
import com.newcosoft.lsmp.bank.server.config.BankDatabaseConfig;
import com.newcosoft.lsmp.bank.server.util.DefaultBeanFactory;
import com.newcosoft.util.StringUtils;

public class LsmpBankStart {

private static Logger logger = LoggerFactory.getLogger(LsmpBankStart.class);
private ZooKeeper zk;

public static void main(String[] args) {
try {
// 啟動時,先註冊zk節點,重點在這裡
<strong><span style="color:#ff0000;">new LsmpBankStart().registerZook(args[0], args[1]);</span></strong>
// 以下為你專案啟動的核心程式碼,這裡我專案的server端使用的是rmi協議,可以不用看了
DefaultBeanFactory bf = DefaultBeanFactory
.getInstance("/spring/springContext_lsmp_bank.xml");
RmiService rmiService = (RmiService) bf.getBean("business");
int rmiPort = Integer.valueOf(args[1]);

createRegistry(rmiPort);
System.setProperty("java.rmi.server.hostname", args[0]);
UnicastRemoteObject.exportObject(rmiService, rmiPort);
String rmiUrl = "rmi://" + args[0] + ":" + rmiPort + "/" + args[2];
Naming.rebind(rmiUrl, rmiService);
} catch (Exception e) {
logger.error("LsmpBankStart failed!", e);
System.exit(1);
}
}


// 初始化zk的連線
private void initZk() {
try {
if (zk == null || !zk.getState().isAlive()) {
synchronized (this) {
if (zk != null) {
zk.close();
}
// 重新建立連線
zk = new ZooKeeper(BankConfig.getInstance().getZookURL(),
BankConstant.HA_SESSION_TIMEOUT, this);
while (zk.getState() != ZooKeeper.States.CONNECTED) {
Thread.sleep(3000);
}
}
}
} catch (Exception e) {
logger.error("zk初始化連線異常:" + e.toString());
}
}

private void registerZook(String ip, String port) {
// 初始化zk
initZk();
// 父節點路徑
String parentNode = BankConfig.getInstance().getBankServerParentNode();
String[] nodeList = parentNode.split("/");
String nodePath = "";
// 迴圈建立持久父節點
for (String node : nodeList) {
if (!StringUtils.isEmpty(node)) {
nodePath = nodePath + "/" + node;
BankZookHelper.createNode(zk, nodePath, node,
CreateMode.PERSISTENT);
}
}
// 建立臨時子節點 
//BankZookHelper為封裝的zookeeper一些基礎API的類
BankZookHelper.createNode(zk, nodePath + "/" + ip, ip + ":" + port,
CreateMode.EPHEMERAL);
}


}<pre name="code" class="java">package com.newcosoft.lsmp.bank.client.zookeeper;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.newcosoft.lsmp.bank.client.config.BankConfig;
import com.newcosoft.lsmp.bank.client.constant.BankConstant;
import com.newcosoft.lsmp.bank.client.rmi.BankServerRMIClient;

public class LoadBalanceZookHelper implements Watcher {

private static Logger logger = LoggerFactory
.getLogger(LoadBalanceZookHelper.class);

private ZooKeeper zk;

// 可用的服務列表
private List serverUrlList;

// 獲取service 的索引
private AtomicInteger index = new AtomicInteger(0);

// 快取的BankServerRMIClient
private Map bankServerRMIClientCacheMap = new ConcurrentHashMap();

public LoadBalanceZookHelper() {
// 初始化zk
initZk();
cacheBankServerRMIClients();
}

private static class SingletonHolder {
static final LoadBalanceZookHelper INSTANCE = new LoadBalanceZookHelper();
}

public static LoadBalanceZookHelper getInstance() {
return SingletonHolder.INSTANCE;
}

// 輪詢分發服務
public BankServerRMIClient distributeServer() {
if (index.get() >= Integer.MAX_VALUE) {
index.set(0);
}
int a = index.get() % serverUrlList.size();
// 自增長
index.incrementAndGet();
String url = serverUrlList.get(a);
if (logger.isDebugEnabled()) {
logger.debug("請求的URL:" + url);
}
// TODO 刪掉syso
System.out.println("請求的URL:" + url);
return bankServerRMIClientCacheMap.get(url);

}

// 註冊rmi ,並快取
private void registerRmiAndCache(List serverUrlList) {
for (String url : serverUrlList) {
String host[] = url.split(":");
BankServerRMIClient bankServerRMIClient = new BankServerRMIClient(
host[0], Integer.parseInt(host[1]));
bankServerRMIClientCacheMap.put(url, bankServerRMIClient);
}
}

// 初始化,快取rmi客戶端服務
public void cacheBankServerRMIClients() {
// 獲取到所有的 serverURL
serverUrlList = getServerUrlList(BankConfig.getInstance()
.getBankServerParentNode());
if (logger.isDebugEnabled()) {
logger.debug("重新快取可用的服務列表,serverUrlList:" + serverUrlList);
}
if (serverUrlList.size() > 0 && serverUrlList != null) {
// 註冊並快取
registerRmiAndCache(serverUrlList);
} else {
logger.error("可用的服務列表為空");
throw new RuntimeException("可用的服務列表為空");
}
}

// 獲取zk連線
private void initZk() {
try {
if (zk == null || !zk.getState().isAlive()) {
synchronized (this) {
if (zk != null) {
zk.close();
}
// 重新建立連線
zk = new ZooKeeper(BankConfig.getInstance().getZookURL(),
BankConstant.HA_SESSION_TIMEOUT, this);
while (zk.getState() != ZooKeeper.States.CONNECTED) {
Thread.sleep(3000);
}
}
}
} catch (Exception e) {
logger.error("zk初始化連線異常:" + e.toString());
}
}

// 獲取可用的服務列表
public List getServerUrlList(String parentNodePath) {
List serverList = new ArrayList();
try {
// 獲取所有子節點
List nodePathList = zk.getChildren(parentNodePath, true);
if (nodePathList.size() > 0 && nodePathList != null) {
for (String hostPath : nodePathList) {
hostPath = parentNodePath + "/" + hostPath;
// 獲取子節點資料URL
String data = new String(zk.getData(hostPath, true, null));
serverList.add(data);
}
}
} catch (KeeperException e) {
logger.error(e.toString());
} catch (InterruptedException e) {
logger.error(e.toString());
}
return serverList;
}
// 本來有個很簡單的辦法,直接將原來的快取去掉,然後在重新註冊,但是由於註冊rmi底層也是用所                    tcp/ip協議,由於這個協議比較耗時, 所以使用以下方式。請大家根據實際情況來處理
// 節點發生變化時,重新快取rmi 客戶端服務 ,簡單來說就是更新快取
public void reCacheBankServerRMIClients() {
// 將原來的serverURL 賦值給
List oldServerUrlList = serverUrlList;
// 獲取最新的 serverURL,保留住
serverUrlList = getServerUrlList(BankConfig.getInstance()
.getBankServerParentNode());
if (logger.isDebugEnabled()) {
logger.debug("子節點發生變化,重新獲取的服務的URL:" + serverUrlList);
}
// 將最新的serverURL賦值給
List newServerUrlList = serverUrlList;

List reducedUrlList = new ArrayList();
reducedUrlList.addAll(oldServerUrlList);
List incrementUrlList = new ArrayList();
incrementUrlList.addAll(newServerUrlList);

// 獲取down掉服務的的URL
reducedUrlList.removeAll(newServerUrlList);
// 獲取新增服務的URL
incrementUrlList.removeAll(oldServerUrlList);

if (reducedUrlList.size() > 0 && reducedUrlList != null) {
for (String reducedUrl : reducedUrlList) {
// 將down掉的URL的服務從快取中減掉
bankServerRMIClientCacheMap.remove(reducedUrl);
}
if (logger.isDebugEnabled()) {
logger.debug("去除掉的服務的URL:" + reducedUrlList);
}
}
if (incrementUrlList.size() > 0 && incrementUrlList != null) {
// 將新增的URL重新註冊rmi 並快取住
registerRmiAndCache(incrementUrlList);
if (logger.isDebugEnabled()) {
logger.debug("新增的服務的URL:" + incrementUrlList);
}
}

}

@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.Expired) {
logger.debug("觸發了回話過期事件");
// 重新連線zk
initZk();
// 重新快取
reCacheBankServerRMIClients();
}
// if (event.getState() == KeeperState.SyncConnected) {
// logger.debug("觸發了斷開重連事件");
// // 重新快取
// reCacheBankServerRMIClients();
// }
if (event.getType() == EventType.NodeChildrenChanged) {
logger.debug("觸發了子節點變化事件");
// 重新快取
reCacheBankServerRMIClients();
}
}

public static void main(String[] args) {
LoadBalanceZookHelper.getInstance();

}
}
<div style="font-family: 'Microsoft YaHei', SimSun, Verdana, Arial, Helvetica, sans-serif; font-size: 14.285715103149414px; text-indent: 28px; line-height: 1.5;">
</div>
<pre name="code" class="java"><div style="font-family: 'Microsoft YaHei', SimSun, Verdana, Arial, Helvetica, sans-serif; font-size: 14.285715103149414px; text-indent: 28px; line-height: 1.5;"><br style="line-height: 1.5;" /></div><div style="font-family: 'Microsoft YaHei', SimSun, Verdana, Arial, Helvetica, sans-serif; font-size: 14.285715103149414px; text-indent: 28px; line-height: 1.5;"><strong>客戶端:</strong></div>
<span style="font-family: 'Microsoft YaHei', SimSun, Verdana, Arial, Helvetica, sans-serif; font-size: 14.285715103149414px; line-height: 20px; text-indent: 28px; white-space: pre; background-color: rgb(240, 240, 240);">package com.newcosoft.lsmp.bank.client.zookeeper;</span>
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.newcosoft.lsmp.bank.client.config.BankConfig;
import com.newcosoft.lsmp.bank.client.constant.BankConstant;
import com.newcosoft.lsmp.bank.client.rmi.BankServerRMIClient;

public class LoadBalanceZookHelper implements Watcher {

private static Logger logger = LoggerFactory
.getLogger(LoadBalanceZookHelper.class);

private ZooKeeper zk;

// 可用的服務列表
private List serverUrlList;

// 獲取service 的索引
private AtomicInteger index = new AtomicInteger(0);

// 快取的BankServerRMIClient
private Map bankServerRMIClientCacheMap = new ConcurrentHashMap();

public LoadBalanceZookHelper() {
// 初始化zk
initZk();
cacheBankServerRMIClients();
}

private static class SingletonHolder {
static final LoadBalanceZookHelper INSTANCE = new LoadBalanceZookHelper();
}

public static LoadBalanceZookHelper getInstance() {
return SingletonHolder.INSTANCE;
}

// 輪詢分發服務
public BankServerRMIClient distributeServer() {
if (index.get() >= Integer.MAX_VALUE) {
index.set(0);
}
int a = index.get() % serverUrlList.size();
// 自增長
index.incrementAndGet();
String url = serverUrlList.get(a);
if (logger.isDebugEnabled()) {
logger.debug("請求的URL:" + url);
}
// TODO 刪掉syso
System.out.println("請求的URL:" + url);
return bankServerRMIClientCacheMap.get(url);

}

// 註冊rmi ,並快取
private void registerRmiAndCache(List serverUrlList) {
for (String url : serverUrlList) {
String host[] = url.split(":");
BankServerRMIClient bankServerRMIClient = new BankServerRMIClient(
host[0], Integer.parseInt(host[1]));
bankServerRMIClientCacheMap.put(url, bankServerRMIClient);
}
}

// 初始化,快取rmi客戶端服務
public void cacheBankServerRMIClients() {
// 獲取到所有的 serverURL
serverUrlList = getServerUrlList(BankConfig.getInstance()
.getBankServerParentNode());
if (logger.isDebugEnabled()) {
logger.debug("重新快取可用的服務列表,serverUrlList:" + serverUrlList);
}
if (serverUrlList.size() > 0 && serverUrlList != null) {
// 註冊並快取
registerRmiAndCache(serverUrlList);
} else {
logger.error("可用的服務列表為空");
throw new RuntimeException("可用的服務列表為空");
}
}

// 獲取zk連線
private void initZk() {
try {
if (zk == null || !zk.getState().isAlive()) {
synchronized (this) {
if (zk != null) {
zk.close();
}
// 重新建立連線
zk = new ZooKeeper(BankConfig.getInstance().getZookURL(),
BankConstant.HA_SESSION_TIMEOUT, this);
while (zk.getState() != ZooKeeper.States.CONNECTED) {
Thread.sleep(3000);
}
}
}
} catch (Exception e) {
logger.error("zk初始化連線異常:" + e.toString());
}
}

// 獲取可用的服務列表
public List getServerUrlList(String parentNodePath) {
List serverList = new ArrayList();
try {
// 獲取所有子節點
List nodePathList = zk.getChildren(parentNodePath, true);
if (nodePathList.size() > 0 && nodePathList != null) {
for (String hostPath : nodePathList) {
hostPath = parentNodePath + "/" + hostPath;
// 獲取子節點資料URL
String data = new String(zk.getData(hostPath, true, null));
serverList.add(data);
}
}
} catch (KeeperException e) {
logger.error(e.toString());
} catch (InterruptedException e) {
logger.error(e.toString());
}
return serverList;
}
// 本來有個很簡單的辦法,直接將原來的快取去掉,然後在重新註冊,但是由於註冊rmi底層也是用所                    tcp/ip協議,由於這個協議比較耗時, 所以使用以下方式。請大家根據實際情況來處理
// 節點發生變化時,重新快取rmi 客戶端服務 ,簡單來說就是更新快取
public void reCacheBankServerRMIClients() {
// 將原來的serverURL 賦值給
List oldServerUrlList = serverUrlList;
// 獲取最新的 serverURL,保留住
serverUrlList = getServerUrlList(BankConfig.getInstance()
.getBankServerParentNode());
if (logger.isDebugEnabled()) {
logger.debug("子節點發生變化,重新獲取的服務的URL:" + serverUrlList);
}
// 將最新的serverURL賦值給
List newServerUrlList = serverUrlList;

List reducedUrlList = new ArrayList();
reducedUrlList.addAll(oldServerUrlList);
List incrementUrlList = new ArrayList();
incrementUrlList.addAll(newServerUrlList);

// 獲取down掉服務的的URL
reducedUrlList.removeAll(newServerUrlList);
// 獲取新增服務的URL
incrementUrlList.removeAll(oldServerUrlList);

if (reducedUrlList.size() > 0 && reducedUrlList != null) {
for (String reducedUrl : reducedUrlList) {
// 將down掉的URL的服務從快取中減掉
bankServerRMIClientCacheMap.remove(reducedUrl);
}
if (logger.isDebugEnabled()) {
logger.debug("去除掉的服務的URL:" + reducedUrlList);
}
}
if (incrementUrlList.size() > 0 && incrementUrlList != null) {
// 將新增的URL重新註冊rmi 並快取住
registerRmiAndCache(incrementUrlList);
if (logger.isDebugEnabled()) {
logger.debug("新增的服務的URL:" + incrementUrlList);
}
}

}

@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.Expired) {
logger.debug("觸發了回話過期事件");
// 重新連線zk
initZk();
// 重新快取
reCacheBankServerRMIClients();
}
// if (event.getState() == KeeperState.SyncConnected) {
// logger.debug("觸發了斷開重連事件");
// // 重新快取
// reCacheBankServerRMIClients();
// }
if (event.getType() == EventType.NodeChildrenChanged) {
logger.debug("觸發了子節點變化事件");
// 重新快取
reCacheBankServerRMIClients();
}
}

public static void main(String[] args) {
LoadBalanceZookHelper.getInstance();

}
}

 
    
   
  


客戶端: