1. 程式人生 > >zookeeper負載均衡和資料同步

zookeeper負載均衡和資料同步

如何利用zookeeper做負載均衡呢,並且能夠讓客戶端動態監控服務端的狀態,一旦有的伺服器掛掉,客戶端能夠迅速感知,從而做出調整。

先演示一遍:注意,本地要執行一個zookeeper,讓客戶端和服務端分別和zookeeper進行連線,能實時跟zookeeper保持聯絡。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

import org.I0Itec.zkclient.ZkClient;
import org.apache.log4j.Logger;

 //服務端
public class SimpleServer implements Runnable {
	
	private static Logger logger = Logger.getLogger(SimpleServer.class.getName());
 
	public static void main(String[] args) throws IOException {
		int port = 18081;
		SimpleServer server = new SimpleServer(port);
		Thread thread = new Thread(server);
		thread.start();
	}
 
	private int port;
 
	public SimpleServer(int port) {
		this.port = port;
	}
	
	private void regServer() {
		//向ZooKeeper註冊當前伺服器
		ZkClient client = new ZkClient("127.0.0.1:2181", 60000, 1000);
		
		String pathroot = "/test"; 
		if (!client.exists(pathroot)) {
			logger.info("建立根節點:" + pathroot);
			client.createPersistent(pathroot);
		}
		
		String path = "/test/server" + port;
		if(client.exists(path)) {
			client.delete(path);
		}
		client.createEphemeral(path, "127.0.0.1:" + port);
	}

	
	@Override
	public void run() {
		ServerSocket server = null;
		try {
			server = new ServerSocket(port);
			regServer();
			System.out.println("Server started at " + port);
			Socket socket = null;
			while (true) {
				socket = server.accept();
				new Thread(new SimpleServerHandler(socket)).start();
			}
		} catch(IOException ex) {
			ex.printStackTrace();
		} finally {
			if (server != null) {
				try {
					server.close();
				} catch (IOException e) {}
			}
		}
	}
}
 
class SimpleServerHandler implements Runnable {
 
	private Socket socket;
 
	public SimpleServerHandler(Socket socket) {
		this.socket = socket;
	}
	
	
	@Override
	public void run() {
		BufferedReader in = null;
		PrintWriter out = null;
		try {
			in = new BufferedReader(new InputStreamReader(
					this.socket.getInputStream()));
			out = new PrintWriter(this.socket.getOutputStream(), true);
			String body = null;
			while (true) {
				body = in.readLine();
				if (body == null)
					break;
				System.out.println("Receive : " + body);
				out.println("Hello, " + body);
			}
 
		} catch (Exception e) {
			if (in != null) {
				try {
					in.close();
				} catch (IOException e1) {
					e1.printStackTrace();
				}
			}
			if (out != null) {
				out.close();
			}
			if (this.socket != null) {
				try {
					this.socket.close();
				} catch (IOException e1) {
					e1.printStackTrace();
				}
				this.socket = null;
			}
		}
	}
}

 

package com.tecno.BoomPlayerLog.bigdata.zookeeper.client;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;

//客戶端
public class SimpleClient {

	private static Integer pos = 0;

	private static List<String> servers = new ArrayList<>();

	public static void main(String[] args) {

		initServerList();

		SimpleClient client = new SimpleClient();
		BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
		while (true) {
			String name;
			try {
				name = console.readLine();
				if ("exit".equals(name)) {
					System.exit(0);
				}
				client.send(name);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	private static void initServerList() {
		// 啟動時從ZooKeeper讀取可用伺服器
		String path = "/test";
		ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000);
		List<String> childs = zkClient.getChildren(path);
		servers.clear();
		for (String p : childs) {

			servers.add(zkClient.readData(path + "/" + p));

		}
		// 訂閱節點變化事件
		zkClient.subscribeChildChanges("/test", new IZkChildListener() {
			@Override
			public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
				System.out.println(String.format("[ZookeeperRegistry] service list change: path=%s, currentChilds=%s",
						parentPath, currentChilds.toString()));
				servers.clear();
				for (String p : currentChilds) {
					servers.add(zkClient.readData(path + "/" + p));
				}
				System.out.println("Servers: " + servers.toString());
			}
		});

	}

	public static String getServer() {

		// return servers.get(new Random().nextInt(servers.size())); 隨機演算法 
		// 輪詢演算法
		String server = null;
		if (pos >= servers.size()) {
			pos = 0;
			
		}
		
		server = servers.get(pos);
		
		pos++;
		return server;
	}

	public SimpleClient() {
		
	}

	public void send(String name) {

		String server = SimpleClient.getServer();
		String[] cfg = server.split(":");

		Socket socket = null;
		BufferedReader in = null;
		PrintWriter out = null;
		try {
			socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			out = new PrintWriter(socket.getOutputStream(), true);

			out.println(name);
			while (true) {
				String resp = in.readLine();
				if (resp == null)
					break;
				else if (resp.length() > 0) {
					System.out.println("Receive : " + resp);
					break;
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (out != null) {
				out.close();
			}
			if (in != null) {
				try {
					in.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			if (socket != null) {
				try {
					socket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

 

首先,先執行服務端的main方法釋出服務,注意修改埠號,以免埠號衝突啟動不了,這裡我們執行三次服務端。執行完結果如下:

還有兩個服務端的控制檯也是類似;

然後執行客戶端的main方法,執行完後,在控制檯上可以輸入任意字串,回車之後(可以重複操作),服務端就會接收訊息,並且返回給客戶端,代表接收請求,並且處理請求。效果如下:

這裡我傳送三次訊息,也得到服務端的回覆,我們再看服務端的顯示:

可見每個服務端都請求一次,就是輪詢的效果。然後現在關閉一個服務端,結果如下(必須過一會,要進行心跳檢查):

客戶端就會感知,現在再來發送訊息,就是其他兩臺服務端輪詢處理訊息了,同理 一旦新增服務端,客戶端也可以有效感知。

 

資料同步呢?就是利用zookeeper的Watcher機制來監控zookeeper資料節點內容的變化,不過只能一次性,我們需要改良,進行實時監控。相關程式碼如下:

import java.io.IOException;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;

public class MonitoringZookeeperNodesContinuous {
	
	private  ZooKeeper zooKeeper;

	{
			
		try {
			zooKeeper = new ZooKeeper("127.0.0.1:2181", 6000, null);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	@Test //我們讓客戶端訂閱節點,就能收到節點變化的內容了
	public void testProcessClient() throws KeeperException, InterruptedException {
		Stat stat = new Stat();
		// 客服端訂閱節點
		String path = "/server/address";
		
		//獲取zookeeper更新後的版本資訊
		String initResult = doProcess(path, stat);

		System.err.println("initResult ==> " + initResult);

		// 處理業務更新版本,載入最新版本的資訊到記憶體中 
		//為了讓程式不停止方便看效果
		while (true) {
			
		}
	}

	/**
	 * @Description: 以遞迴方式實現節點狀態的持續監控的方法
	 * @author yunyao.huang
	 * @throws InterruptedException
	 * @throws KeeperException
	 * @date 2018年8月7日
	 */
	public String doProcess(final String path, final Stat stat) throws KeeperException, InterruptedException {

		Watcher watcher = new Watcher() {

			@Override
			public void process(WatchedEvent event) {
				try {
					String innerResult = doProcess(path, stat);

					System.out.println(innerResult);
					

				} catch (KeeperException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		};

		byte[] data = zooKeeper.getData(path, watcher, stat);

		return new String(data);

	}

	@Test
	public void testWatcherOnce() throws KeeperException, InterruptedException {
		Stat stat = new Stat();
		// 客戶端訂閱的zookeeper的配置資訊所儲存的路徑節點
		String path = "/server/address";
		// 回撥函式
		Watcher watcher = new Watcher() {

			@Override
			public void process(WatchedEvent event) {

				System.err.println("zookeeper的配置資訊發生改變了");

			}
		};
		zooKeeper.getData(path, watcher, stat);
		// 不能讓程式停止,否則看不到效果
		while (true) {
			
		}
	}
	
	@Test
	public void testCMSSetDataServer() {
		
		String path = "/server/address";
		
		// CMS更新完的版本號
		byte[] data = "Version71".getBytes();
		
		
		
		try {
			// 讓zookeeper更新COM的版本號  ,一更新,訂閱的client就會感知
			zooKeeper.setData(path, data, -1); 
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

執行testProcessClient()方法,然後執行testCMSSetDataServer()方法。效果如下:

每一次客戶端只要修改節點中的值,zookeeper立刻通知所有的客戶端,客戶端自然就會得到最新的訊息了,就可以實現資料同步了。

關於zookeeper,可以看下:這篇文章說的挺不錯的    ,看完就會對上面的操作有更深的體會了,知道原理還是非常有必要的。