1. 程式人生 > >zookeeper實現系統註冊和系統發現

zookeeper實現系統註冊和系統發現

  需求:多系統整合,需要登陸任意系統後都可以顯示業務系統名稱,並得到對應地址資訊等。

  實現思路:使用zookeeper作為系統註冊,每個系統啟動的時候進行註冊系統資訊,臨時有序為註冊型別,並且註冊事件監聽,並獲取所    有子節點的系統資訊,新增至靜態變數,考慮到部分系統可能會進行叢集部署需要進行系統資訊的去重.

環境以及工具包

zookeeper3.4.10,系統後端均使用spring框架,zkclient工具包

1.引入pom

<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
			<dependency>
			    <groupId>com.101tec</groupId>
			    <artifactId>zkclient</artifactId>
			    <version>0.10</version>
			</dependency>

2.編寫java程式碼

RegisterServer啟動註冊類和SystemRegisterInfo資訊儲存類以及ZKChildListener事件監聽實現

package fsl.lcp.utils.zk;





import java.util.ArrayList;
import java.util.List;

import java.util.stream.Collectors;

import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;

import fsl.lcp.constant.RentConstant;


/**
 * 初始化
 * @author Jun
 *
 */
public class RegisterServer {
	
	private Logger logger = LoggerFactory.getLogger(fsl.lcp.utils.zk.RegisterServer.class);
	

	public  static volatile List<SystemRegisterInfo> systemList = null;  
    public ZkClient zkClient=null;
 
    @Value("${zookeeper.connectString}")
	private  String connectString;//zookeeper地址	
    
    @Value("${zookeeper.nodeName}")
   	private  String nodeName;//節點
    @Value("${zookeeper.localIp}")
    private String localIp;//本系統ip
    @Value("${zookeeper.localPort}")
    private int localPort;//本系統埠
    
 
	/**
	 * 初始化
	 */
	public void init(){
		    try {
				zkClient = new ZkClient(connectString, 5000);
				logger.debug("====zookeeper====connect===is==ok==========");
				
				SystemRegisterInfo systemRegisterInfo = new SystemRegisterInfo();
				
				systemRegisterInfo.setIp(localIp);
				systemRegisterInfo.setPort(localPort);
				systemRegisterInfo.setName(RentConstant.SYSTEMNAME_LOCAL);
				systemRegisterInfo.setNextNode(RentConstant.SYSTEMCODE_LOCAL);
				
					zkClient.createPersistent(nodeName, true);
					//建立節點-臨時有序
					String path = zkClient.create(nodeName +"/"+systemRegisterInfo.getNextNode(), systemRegisterInfo,
							CreateMode.EPHEMERAL_SEQUENTIAL);
					//輸出建立節點的路徑  
					logger.debug("=========zookeeper==========created path:" + path);
					//註冊事件監聽
					zkClient.subscribeChildChanges(nodeName, new ZKChildListener(zkClient));
					systemList=new ArrayList<SystemRegisterInfo>();
					List<String> children = zkClient.getChildren(nodeName);//獲取子節點
					for (String string : children) {
						Object readData2 = zkClient.readData(nodeName+"/"+string);
						SystemRegisterInfo readData = (SystemRegisterInfo) readData2;
						systemList.add(readData);
					}
					List<SystemRegisterInfo> collect = systemList.stream().distinct().collect(Collectors.toList());
					systemList=collect;
			} catch (Exception e) {
				logger.debug("==========zookeeper==========connect==============" + e.getMessage());
			}
		    
	}
   
	
  


}
package fsl.lcp.utils.zk;

import java.io.Serializable;

import com.fasterxml.jackson.databind.annotation.JsonSerialize;

/**
 * 系統註冊資訊
 * @author jun
 *
 */
@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
public class SystemRegisterInfo implements Serializable {
	
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private String ip;//本系統ip
	private int port;//本系統埠
	private String name;//系統名稱
	private String nextNode;//本系統節點地址
	private String isLogin="N";//是否本系統登陸獲取
	private String attribute1;//擴充套件欄位
	private String attribute2;//擴充套件欄位
	private String attribute3;//擴充套件欄位
	private String attribute4;//擴充套件欄位
	
	public String getAttribute1() {
		return attribute1;
	}
	public void setAttribute1(String attribute1) {
		this.attribute1 = attribute1;
	}
	public String getAttribute2() {
		return attribute2;
	}
	public void setAttribute2(String attribute2) {
		this.attribute2 = attribute2;
	}
	public String getAttribute3() {
		return attribute3;
	}
	public void setAttribute3(String attribute3) {
		this.attribute3 = attribute3;
	}
	public String getAttribute4() {
		return attribute4;
	}
	public void setAttribute4(String attribute4) {
		this.attribute4 = attribute4;
	}
	public String getIsLogin() {
		return isLogin;
	}
	public void setIsLogin(String isLogin) {
		this.isLogin = isLogin;
	}
	public String getIp() {
		return ip;
	}
	public void setIp(String ip) {
		this.ip = ip;
	}
	public int getPort() {
		return port;
	}
	public void setPort(int port) {
		this.port = port;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getNextNode() {
		return nextNode;
	}
	public void setNextNode(String nextNode) {
		this.nextNode = nextNode;
	}
	@Override
	public int hashCode() {
		String str = ip + isLogin+name+nextNode+port;
		return str.hashCode();
	}
	@Override
	public boolean equals(Object obj) {
		SystemRegisterInfo p = (SystemRegisterInfo) obj;
		return name.equals(p.getName()) && isLogin.equals(p.getIsLogin())&& ip.equals(p.getIp())&& nextNode.equals(p.getNextNode())&&port==p.getPort();
	}
	
	
}
package fsl.lcp.utils.zk;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * 節點監聽
 * @author Jun
 * 
 */
public class ZKChildListener implements IZkChildListener{

	
	private Logger logger = LoggerFactory.getLogger(fsl.lcp.utils.zk.ZKChildListener.class);
	
	public ZkClient zkClient=null;
	
	public ZKChildListener(ZkClient zkClient) {
		super();
		this.zkClient = zkClient;
	}
	
	@Override
	public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
		List<SystemRegisterInfo> list=new ArrayList<>();
		for (String string : currentChilds) {
			Object readData2=null;
			try {
				readData2 = zkClient.readData(parentPath+"/"+string);
			} catch (Exception e) {
				e.printStackTrace();
				logger.debug("=======zookeeper====node="+parentPath+"/"+string+"==========wrong=========="+e.getMessage());
			}
			if(null!=readData2){
				SystemRegisterInfo readData = (SystemRegisterInfo) readData2;
				list.add(readData);
			}
		}
		RegisterServer.systemList=list.stream().distinct().collect(Collectors.toList());
	}

	
	
	
	public ZKChildListener() {
		super();
	}
	
	
}

3.注入Bean,並在啟動的時候進行初始化

<bean id="registerServer" class="fsl.lcp.utils.zk.RegisterServer" scope="singleton"  init-method="init"></bean>

4.編寫Resit註冊資訊測試類

package fsl.lcp.utils.zk;


import org.I0Itec.zkclient.ZkClient;

import org.apache.zookeeper.CreateMode;


public class Resit {
	
	public ZkClient zkClient=null;
	
	public static void main(String[] args) {
		Resit re=new Resit();
		re.createNode();
	}
	
  
	
public void createNode(){
	//zk叢集的地址  
    String ZKServers = "127.0.0.1:2181";  
     zkClient = new ZkClient(ZKServers, 5000);
	SystemRegisterInfo systemRegisterInfo = new SystemRegisterInfo();  
	systemRegisterInfo.setIp("189.23.23.65");
	systemRegisterInfo.setPort(80);
	systemRegisterInfo.setName("資產系統");
	systemRegisterInfo.setNextNode("capital");  
	String path = zkClient.create("/lcp-all/capital", systemRegisterInfo, CreateMode.EPHEMERAL_SEQUENTIAL);  
	try {
		this.handle(path);
	} catch (Exception e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
}



//伺服器的具體業務處理功能
	private void handle(String serverName) throws Exception {
		System.out.println("server " + serverName
				+ " is waiting for task process......");
		Thread.sleep(Long.MAX_VALUE);

	}
	
		 
}

5.測試

先啟動專案,註冊本系統資訊,這時先進行獲取系統資訊列表,然後在執行一次Resit類,再獲取系統列表資訊

 

補充:logback為日誌框架,zookeeper預設會列印心跳日誌,如下更改,取消日誌

<logger name="org.apache.zookeeper.ClientCnxn" level="info" />  

參考 

https://www.2cto.com/kf/201707/661220.html

https://blog.csdn.net/sun_wangdong/article/details/77461108

考慮到zookeeper伺服器宕機情況,RegisterServer.java程式碼修改如下

package com.fsl.lcp.utils.zk;





import java.util.ArrayList;
import java.util.List;

import java.util.stream.Collectors;

import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;




/**
 * 初始化
 * @author Jun
 * 2018年11月1日
 */
public class RegisterServer {
	
	private Logger logger = LoggerFactory.getLogger(com.fsl.lcp.utils.zk.RegisterServer.class);
	

	public  static volatile List<SystemRegisterInfo> systemList = null;  
	
	public  static volatile  SystemRegisterInfo systemRegisterInfo= null;  
	
    public ZkClient zkClient=null;
 
    @Value("${zookeeper.connectString}")
	private  String connectString;//zookeeper地址	
    
    @Value("${zookeeper.nodeName}")
   	private  String nodeName;//節點
    @Value("${zookeeper.localIp}")
    private String localIp;//本系統ip
    @Value("${zookeeper.localPort}")
    private int localPort;//本系統埠
    @Value("${zookeeper.localSystemName}")
    private String localSystemName;//本系統描述
    @Value("${zookeeper.localSystemCode}")
    private String localSystemCode;//本系統編碼
 
	/**
	 * 初始化
	 */
	public void init(){
		    try {
		    	
		    	systemRegisterInfo = new SystemRegisterInfo();
				systemRegisterInfo.setIp(localIp);
				systemRegisterInfo.setPort(localPort);
				systemRegisterInfo.setName(localSystemName);
				systemRegisterInfo.setNextNode(localSystemCode);
				zkClient = new ZkClient(connectString, 5000);
				zkClient.subscribeStateChanges(new IZkStateListener() {
					
					@Override
					public void handleStateChanged(KeeperState state) throws Exception {
						if("Disconnected".equals(state.name())){
							//斷開了
							systemList=new ArrayList<>();
							logger.debug("====zookeeper====connect===is==Disconnected==========");
						}else if("SyncConnected".equals(state.name())){
							//又連線上了
							getInfo(zkClient);
							logger.debug("====zookeeper====connect===is==SyncConnected==========");
						}else{
							logger.debug("====zookeeper====connect===is============"+state.name());
						}
					}
					
					@Override
					public void handleSessionEstablishmentError(Throwable error) throws Exception {
						// TODO Auto-generated method stub
						logger.debug("====zookeeper====connect===is============"+error.getMessage());
					}
					
					@Override
					public void handleNewSession() throws Exception {
						logger.debug("====zookeeper====connect===is=======new session=====");
					}
				});
				
				logger.debug("====zookeeper====connect===is==ok==========");
				logger.debug("==========zookeeper=====local=====info=======is======="+systemRegisterInfo );
				getInfo(zkClient);
			} catch (Exception e) {
				logger.debug("==========zookeeper==========connect==============" + e.getMessage());
			}
		    
	}
   
	
	/**
	 * 獲取資訊
	 * @param zkClient
	 * @param systemRegisterInfo
	 */
	public  void getInfo(ZkClient zkClient){
		zkClient.createPersistent(nodeName, true);
		//建立節點-臨時有序
		String path = zkClient.create(nodeName +"/"+systemRegisterInfo.getNextNode(), systemRegisterInfo,
				CreateMode.EPHEMERAL_SEQUENTIAL);
		//輸出建立節點的路徑  
		logger.debug("=========zookeeper==========created path:" + path);
		//註冊事件監聽
		zkClient.subscribeChildChanges(nodeName, new ZKChildListener(zkClient));
		systemList=new ArrayList<SystemRegisterInfo>();
		List<String> children = zkClient.getChildren(nodeName);//獲取子節點
		for (String string : children) {
			Object readData2 = zkClient.readData(nodeName+"/"+string);
			SystemRegisterInfo readData = (SystemRegisterInfo) readData2;
			logger.debug("==========zookeeper=====startNow=====allInfo=======is======="+readData );
			systemList.add(readData);
		}
		List<SystemRegisterInfo> collect = systemList.stream().distinct().collect(Collectors.toList());
		systemList=collect;
	}
  


}