1. 程式人生 > >2018-07-15期 ZK編程案例-分布式鎖【本人親自反復驗證通過分享】

2018-07-15期 ZK編程案例-分布式鎖【本人親自反復驗證通過分享】

bst path 動向 author throw app strong () vat

一、 實現原理:

1、編寫服務端Socket監聽程序,運行與某臺服務器上作為所有客戶端競爭資源

2、客戶端啟動後,都會自動向ZK註冊自己的身份信息,並將自己的身份ID根據統一的生成規則臨時寫入ZK

3、客戶端實時監聽ZK中自身註冊到ZK集群中身份ID變化,若發現自身ID為ZK集群最小的身份ID,則獲得鎖,然後向服務端Socket建立連接發送消息,其它客戶端處於監聽等待狀態

4、自己處理完自己業務後,即不發消息後,自己將自己舊的身份ID從ZK集群刪除,刪除成功後,重新註冊新的身份ID,目的是釋放鎖,讓其它客戶端獲得鎖。

5、若自己異常退出,則ZK集群會將該客戶端身份信息清除,防止客戶端身份ID不變一直獲得鎖,導致其它客戶端無法獲得鎖。

二、實現代碼


1、服務端監聽程序

package cn.itcast.zk.lock;


import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.ServerSocket;

import java.net.Socket;


/**

* 模擬客戶端競爭的服務資源,這裏開啟一個Socket監聽程序,並實時接收客戶端發送的消息。

* @author songjq

*

*/

public class TcpServerSocket {

public static void getConn() throws IOException {


ServerSocket server = new ServerSocket(9091);

try {

System.out.println("服務端已經在9091端口監聽......");

Socket client = server.accept();

try {

BufferedReader input =

new BufferedReader(new InputStreamReader(client.getInputStream()));

boolean flag = true;

while (flag) {

String line = input.readLine();

if (line.equals("exit")) {

flag = false;

} else {

System.out.println("客戶端說:" + line);

}

}

} finally {

client.close();

server.close();

TcpServerSocket.getConn();

}

} finally {

//server.close();

}

}

public static void main(String[] args) throws Exception {

TcpServerSocket.getConn();

}

}


2、客戶端訪問程序


package cn.itcast.zk.lock;


import java.io.IOException;

import java.io.PrintWriter;

import java.net.Socket;

import java.net.UnknownHostException;

import java.util.Collections;

import java.util.Date;

import java.util.List;


import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.Watcher.Event.EventType;


/**

* zk實現分布式鎖

* 實現原理:

* 1、編寫服務端Socket監聽程序,運行與某臺服務器上作為所有客戶端競爭資源

* 2、客戶端啟動後,都會自動向ZK註冊自己的身份信息,並將自己的身份ID根據統一的生成規則臨時寫入ZK

* 3、客戶端實時監聽ZK中自身註冊到ZK集群中身份ID變化,若發現自身ID為ZK集群最小的身份ID,則獲得鎖,然後向服務端Socket建立連接發送消息,其它客戶端處於監聽等待狀態

* 4、自己處理完自己業務後,即不發消息後,自己將自己舊的身份ID從ZK集群刪除,刪除成功後,重新註冊新的身份ID,目的是釋放鎖,讓其它客戶端獲得鎖。

* 5、若自己異常退出,則ZK集群會將該客戶端身份信息清除,防止客戶端身份ID不變一直獲得鎖,導致其它客戶端無法獲得鎖。

* @author songjq

*/

public class ZKDistributeLockService {


private final static String rootNode = "/hosts";


private static String myZnodePath = "";


private static ZooKeeper zkCli = null;


private static boolean havelock = false;


private static String hostname = "";


/**

* 獲取zk連接

*

* @throws IOException

*/

public static ZooKeeper getZkconnection() throws IOException {

return new ZooKeeper("192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181,", 2000, getWather());

}


/**

* 監聽器處理

*/

public static Watcher getWather() {

return new Watcher() {

@Override

public void process(WatchedEvent event) {

//如果沒有子節點變更,則終止下面程序執行

if(event.getType() != EventType.NodeChildrenChanged) return ;

// 獲取鎖

try {

havelock = gainLock();

if (havelock) {

System.out.println(new Date() + ":" + hostname + " get lock....");

// 處理業務

doSomethings(hostname);

// 處理完業務後刪除鎖,並重新註冊znode節點鎖

deleteLock();

// 重新註冊znode節點鎖

registerZnodeLock();

}

} catch (Exception e) {

// TODO: handle exception

e.printStackTrace();

}


}

};

}


/**

* 註冊鎖節點

*

* @throws KeeperException

* @throws InterruptedException

*/

public static void registerZnodeLock() throws KeeperException, InterruptedException {


if (zkCli.exists("/hosts", null) == null) {

zkCli.create("/hosts", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

}

myZnodePath = zkCli.create(rootNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

// 註冊完鎖節點後睡眠500-1000ms後在獲取鎖

Thread.sleep((long) (Math.random() * 500 + 500));


}


/**

* 獲取鎖,判斷當前註冊myZnodePath是否和zk集群中最小的節點一致,若一致就獲得鎖

*

* @return

* @throws KeeperException

* @throws InterruptedException

*/

public static boolean gainLock() throws KeeperException, InterruptedException {


List<String> children = zkCli.getChildren(rootNode, true);


// 若當前集群內znode節點數為1,說明是自己剛註冊的節點,可以直接獲得鎖

/*if (children.size() == 1) {

return true;

}*/

if (children.size() > 0) {

// 對znode節點進行排序

Collections.sort(children);

String tmpZnode = children.get(0);

System.out.println("tmpZnode:"+tmpZnode+",myZnodePath:"+myZnodePath);

if (tmpZnode.equals(myZnodePath.substring(rootNode.length()+1))) {

return true;

}else {

return false;

}

}else {

return false;

}

}


/**

* 模擬處理業務

* 這裏即為獲得鎖後向同服務端建立socket連接,並向服務端發送消息。

* @param hostname

* @throws InterruptedException

* @throws IOException

* @throws UnknownHostException

*/

public static void doSomethings(String hostname) throws InterruptedException, UnknownHostException, IOException {

System.out.println("------------------>" + hostname + " Begin connect to Server and send msg to Server...");

// 模擬業務處理2-3秒

Thread.sleep((long) (Math.random() * 5000 + 1000));

/**

* 在這裏實現發送消息代碼

*/


Socket client = new Socket("127.0.0.1", 9091);

try {

PrintWriter output = new PrintWriter(client.getOutputStream(), true);

/*

* 向服務端發送消息

*/

String words = "Client MSG-> This is from " + hostname +" message.";

output.println(words);

} finally {

client.close();

}


System.out.println("------------------>" + hostname + " Msg Send completed!!!");

}


/**

* 刪除myZnodePath節點

*

* @throws InterruptedException

* @throws KeeperException

*/

public static void deleteLock() throws InterruptedException, KeeperException {


zkCli.delete(myZnodePath, -1);


}


/**

* 主類調用

* @param args

* @throws IOException

* @throws KeeperException

* @throws InterruptedException

*/

public static void main(String[] args) throws IOException, KeeperException, InterruptedException {

hostname = args[0];

// 獲取zk連接

zkCli = ZKDistributeLockService.getZkconnection();

// 註冊znode節點鎖

ZKDistributeLockService.registerZnodeLock();

// 獲取鎖

havelock = ZKDistributeLockService.gainLock();

if (havelock) {

System.out.println(new Date() + ":" + hostname + " get lock....");

// 處理業務

ZKDistributeLockService.doSomethings(hostname);

// 處理完業務後刪除鎖,並重新註冊znode節點鎖

ZKDistributeLockService.deleteLock();

// 重新註冊znode節點鎖

ZKDistributeLockService.registerZnodeLock();

}

//主程序睡眠

Thread.sleep(Long.MAX_VALUE);

}

}


2018-07-15期 ZK編程案例-分布式鎖【本人親自反復驗證通過分享】