1. 程式人生 > >2018-07-14期 ZK編程案例-分布式協調【本人親自反復驗證通過分享】

2018-07-14期 ZK編程案例-分布式協調【本人親自反復驗證通過分享】

ont clas connected spa keep oid smo ins 子節點


利用ZK監聽器實現分布式協調服務,即實現服務端服務健康狀態的實時監測。

1、編寫一個服務端程序,實現原理:

(1)服務端程序啟動後,開啟Socket監聽

(2)開啟Socket監聽後,將自己監聽Socket身份信息臨時寫入Zookeeper集群

(3)服務關閉後,Zookeeper集群自動將該服務身份信息從ZK集群清除

實現代碼


package cn.itcast.zk.distributeserver;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.InetAddress;

import java.net.NetworkInterface;

import java.net.ServerSocket;

import java.net.Socket;

import java.util.Enumeration;


import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;


public class DistributeServer {


/**

* 連接zk,服務啟動後往zk註冊服務器信息,並啟動監聽端口9091

*

* @throws Exception

*/

public void connectionZk(String zNodeVal) throws Exception {

/*

* 這裏連接ZK,無需註冊監聽器,所有監聽器watcher為null

*/

ZooKeeper zkCli = new ZooKeeper("192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181", 2000, null);

if (zkCli.exists("/server", null) == null) {

zkCli.create("/server", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

}

/*

* 服務啟動後,註冊自己的身份信息到ZK,註冊方式采用臨時註冊EPHEMERAL_SEQUENTIAL,同時生成的Znode為在/server/host+

* 自增序列號SEQUENTIAL 如/server/host00000000000001

*/

zkCli.create("/server/host", zNodeVal.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

}


/**

* 主線程 模擬服務端socket監聽,並註冊ZK集群

*

* @param args

* @throws Exception

*/

public static void main(String[] args) throws Exception {

DistributeServer distributeServer = new DistributeServer();

InetAddress localAddress = new DistributeServer().getLocalHostLANAddress();

String zNodeVal = localAddress.getHostName() + ":" + localAddress.getHostAddress() + ":9091";

// 服務啟動後,將服務信息註冊到zk

distributeServer.connectionZk(zNodeVal);

// 將服務端口監聽起來,實時準備處理客戶端發送過來消息

distributeServer.handleBusiness(localAddress.getHostAddress());

}


/**

* 模擬處理業務 這裏為開啟Socket監聽,並接收客戶端發送過來的消息。

*

* @param ipaddress

* @throws IOException

*/

public static void handleBusiness(String ipaddress) throws IOException {

ServerSocket server = new ServerSocket(9091);

try {

System.out.println("Server " + ipaddress + " has listener on 9091......");

Socket client = server.accept();

try {

BufferedReader input = new BufferedReader(new InputStreamReader(client.getInputStream()));

boolean flag = true;

while (flag) {

String line = input.readLine();

if (line.equals("exit")) {

flag = false;

System.out.println("Client exit!");

} else {

System.out.println("Client Msg:" + line);

}

}

} finally {

client.close();

server.close();

/**

* 防止客戶端連接斷開後,服務端端監聽異常,因此這裏沒每次處理完客戶端消息後,都會重新建立監聽

*/

DistributeServer.handleBusiness(ipaddress);

}


} finally {

server.close();

}

}


/**

* 自動獲取操作系統IP地址,目的是讓服務端socket在該地址上建立監聽。

*

* @return

* @throws Exception

*/

public InetAddress getLocalHostLANAddress() throws Exception {

try {

InetAddress candidateAddress = null;

// 遍歷所有的網絡接口

for (Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces(); ifaces

.hasMoreElements();) {

NetworkInterface iface = (NetworkInterface) ifaces.nextElement();

// 在所有的接口下再遍歷IP

for (Enumeration inetAddrs = iface.getInetAddresses(); inetAddrs.hasMoreElements();) {

InetAddress inetAddr = (InetAddress) inetAddrs.nextElement();

if (!inetAddr.isLoopbackAddress()) {// 排除loopback類型地址

if (inetAddr.isSiteLocalAddress()) {

// 如果是site-local地址,就是它了

return inetAddr;

} else if (candidateAddress == null) {

// site-local類型的地址未被發現,先記錄候選地址

candidateAddress = inetAddr;

}

}

}

}

if (candidateAddress != null) {

return candidateAddress;

}

// 如果沒有發現 non-loopback地址.只能用最次選的方案

InetAddress jdkSuppliedAddress = InetAddress.getLocalHost();

return jdkSuppliedAddress;

} catch (Exception e) {

e.printStackTrace();

}

return null;

}


}

2、客戶端程序,實現原理:

利用ZK監聽器實現分布式協調服務,即實現服務端服務健康狀態的實時監測。

(1)、服務端服務啟動後,會立即向ZK註冊自身服務標識信息

(2)、若服務端服務掉線,則會ZK會自動將該服務標識信息從ZK集群清除

(3)、客戶端實時監聽ZK集群中服務在線情況,若服務端服務掉線,會被客戶端監聽器立即監聽到

(4)、客戶端監聽到服務端服務掉線後,相關後續業務不會再提交給掉線服務處理


實現代碼:

package cn.itcast.zk.distributeserver;


import java.io.IOException;

import java.io.PrintWriter;

import java.net.Socket;

import java.util.ArrayList;

import java.util.List;

import java.util.Scanner;

import java.util.concurrent.CountDownLatch;


import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.Watcher.Event.KeeperState;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper.States;


/**

* 利用ZK監聽器實現分布式協調服務,即實現服務端服務健康狀態的實時監測。

* 實現原理:

* 1、服務端服務啟動後,會立即向ZK註冊自身服務標識信息

* 2、若服務端服務掉線,則會ZK會自動將該服務標識信息從ZK集群清除

* 3、客戶端實時監聽ZK集群中服務在線情況,若服務端服務掉線,會被客戶端監聽器立即監聽到

* 4、客戶端監聽到服務端服務掉線後,先關後續業務不會再提交給掉線服務處理

* 5、本代碼具體主要實現以下功能:

* (1)服務端程序啟動後,會立即啟動一個Socket監聽,並將Socket連接信息註冊到ZK集群,註冊內容為IP:PORT套接字

* (2)客戶端通過ZK一直監聽服務端在線情況

* (3)客戶端定時向各個服務端發送Socket消息,默認情況下如果所有服務端服務均正常,則所有服務端都會收到客戶端發送的Socket消息

* 若服務端服務異常,則客戶端會檢測到,將不會向掉線的服務發送socket消息。這樣就保障了客戶端發送的消息不會丟失,總有活動的

* 服務端能接收到。

*

* @author songjq

*

*/

public class DistributeClient {


// 為了線程安全,需要加volatile修飾符

volatile private static ArrayList<String> hlist = new ArrayList<String>();

private static ZooKeeper zk1_;


public static void waitUntilConnected(ZooKeeper zooKeeper, CountDownLatch connectedLatch) {

if (States.CONNECTING == zooKeeper.getState()) {

try {

connectedLatch.await();

} catch (InterruptedException e) {

throw new IllegalStateException(e);

}

}

}


static class ConnectedWatcher implements Watcher {


private CountDownLatch connectedLatch;


ConnectedWatcher(CountDownLatch connectedLatch) {

this.connectedLatch = connectedLatch;

}


/**

* 監聽器回調方法 如果需要對某個znode進行持續監聽,需要重新在回調方法中註冊監聽器

*/

@Override

public void process(WatchedEvent event) {

System.out.println("節點:" + event.getPath() + " 發生了事件:" + event.getType());

if (event.getState() == KeeperState.SyncConnected) {

connectedLatch.countDown();

}

try {

/**

* 這裏調用獲取葉子加點的方法,目的是在該監聽器中重新註冊監聽,防止監聽失效。

*/

getServerHostList(zk1_);

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}


}

}


/**

* 獲取zk連接

* @param hostports

* @param times

* @return

* @throws Exception

*/

public ZooKeeper getConnection(String hostports, int times) throws Exception {

ZooKeeper zktmp = new ZooKeeper(hostports, 1000, new Watcher() {


@Override

public void process(WatchedEvent event) {

System.out.println("節點:" + event.getPath() + " 發生了事件:" + event.getType());

try {

getServerHostList(zk1_);

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});


return zktmp;

}


/**

* 獲取類的實例

*/

static private DistributeClient static_;

static public DistributeClient Instance() {

if (static_ == null) {

static_ = new DistributeClient();

}

return static_;

}


public static void main(String[] args) throws Exception {

String hostports = "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";

DistributeClient instance = DistributeClient.Instance();

zk1_ = instance.getConnection(hostports, 2000);

getServerHostList(zk1_);

/*

* 模擬客戶端永不間斷的定時向活動的服務端發送socket消息

*/

while (true) {

getServerHostList(zk1_);

for (String host : hlist) {

String[] hostAry = host.split(":");

String hostname = hostAry[0];

String ip = hostAry[1];

int port = Integer.parseInt(hostAry[2]);

new DistributeClient().handleBusiness(ip, port, hostname);

System.out.println(host);

}

Thread.sleep(20000);

System.out.println("---------------");

}

}


/**

* 獲取在線服務節點

* @param zkCli

* @throws KeeperException

* @throws InterruptedException

*/

public static void getServerHostList(ZooKeeper zkCli) throws KeeperException, InterruptedException {

hlist.clear();

/*

* 獲取/server/葉子節點,並同時在該葉子節點註冊監聽器

*/

List<String> hosts = zkCli.getChildren("/server", true);

for (String host : hosts) {

byte[] data = zkCli.getData("/server/" + host, null, null);

hlist.add(new String(data));

}

}


/**

* 模擬處理業務

* 向活動的服務端發送Socket消息

* @throws IOException

*/


public void handleBusiness(String IP, int port, String hostname) throws IOException {

Socket client = new Socket(IP, port);

try {

PrintWriter output = new PrintWriter(client.getOutputStream(), true);

/*

* 向服務端發送消息

*/

String words = "Client MSG->" + hostname + "," + IP;

output.println(words);

} finally {

client.close();

}


}


}





2018-07-14期 ZK編程案例-分布式協調【本人親自反復驗證通過分享】