Zookeeper機制詳解與實現
為什麼要有Zookeeper?
電視裡經常會有一些狗血的設定,隊長和副隊長一起出去執行任務,執行完任務後副隊長回來報到了,但是隊長可能因為天氣原因導致航班延期了,暫時回不來,這個時候副隊長左等右等還等不到隊長回來,而且副隊長擔心隊長如果出事了,下面的隊員沒有人約束,大家可能就會鬆懈下來,副隊長等了一個星期後,自己當隊長了。
結果過了兩個星期後,隊長回來了,這個時候就產生了連個隊長,這個時候組員就麻煩了,他們不知道聽誰的,假設也有可能寫一份報告時,都給兩位隊長。
這裡面如果還有一個支援保障小隊,發現隊長還沒回來啊,那麼這個支援保障小隊就去查隊長是不是遇到什麼事了,結果一查,隊長沒出事,平安大吉,那麼支援保障小隊就告訴副隊長,你不用擔心了,他沒事,我們會告知委員會,委員會會選舉出一個隊長,選到你了會正式通知你的。
什麼是Zookeeper?
Zookeeper是一個Apache下的開源分散式協調框架,就是為分散式應用提供協調服務的。
Zookeeper從設計模式的角度來理解,是一個基於觀察者模式設計的分散式服務管理框架,負責儲存和管理都關心的資料(狀態),然後接受觀察者的註冊,一旦這些資料的狀態發生了變化,Zookeeper就通知已經在Zookeeper上註冊的那些觀察者這做出相應的反應,從而實現叢集中類似Master/Slave管理模式。
特點
1、Zookeeper是由一個Leader和多個Follower組成的叢集。
2、Leader負責進行投票發起和發起決議,更新系統狀態
3、Follower用於接收客戶端的請求並返回結果,在選舉Leader過程中參與投票。
4、Follower負責讀請求,寫請求則要提交給Leader決定。
5、叢集中只要有半數以上節點存活,Zookeeper叢集
6、更新請求順序執行,來自同一個客戶端的更新請求時按照發送順序依次執行。
7、資料更新的原子性,要麼更新資料成功,要麼失敗。
8、實時性,在一定時間的範圍內客戶端可以讀到最新的資料
選舉機制(Leader)
配置檔案中沒有指定Leader和Follower的話,Zookeeper會內部進行選舉。
假設:
伺服器1、2、3。
伺服器1啟動,但是隻有自己一個人,選舉狀態為Looking。
伺服器2啟動,首先和已經啟動的伺服器1進行通訊,交換選舉資訊,但是伺服器1和2都沒有歷史資料,那麼他們都投票給了自己,但是id值大的伺服器2勝出。
伺服器3啟動,和伺服器1、2進行通訊,發現已經選舉出了2作為Leader,那麼就成為Follower了,但是如果伺服器2發生網路問題了,一直沒有傳送出去互動資訊,到把訊息傳送出去的時候,發現別人的投票已經處於第二輪,而它發出去的是第一輪,這裡就涉及到邏輯時鐘(時間同步)了,所以則伺服器3成為Leader。
應用場景
Zookeeper提供的服務資料釋出訂閱、負載均衡、命中服務、分散式協調/通知、分散式鎖、分散式佇列等功能。
Zookeeper就是一個監控、通知、輔助(其他).
如:分散式鎖,其他使用者在操作,通知其他伺服器有人在操作,等使用者操作完了就告訴其他伺服器,他操作完了。
案例
配置檔案
tickTime:伺服器和伺服器之間,或伺服器和客戶端之間傳送心跳訊息的時間間隔,session的最小超時時間為2*tickTime
initLimit:Leader和Follower的初始連線時心跳數(時間=initLimit * tickTime);Follower在啟動的時候,會從Leader同步所有最新的資料,以確定自己對外服務的狀態;投票選舉新Leader的初始化時間;
syncLimit:Leader和Follower的之間的最大等待響應時間(syncLimit * tickTime),超過這個時間,Leader就可以認為這個Follower掛了。
dataDir:資料檔案存放路徑
clientPort:客戶端連線ZK服務端的埠
資料結構
Zookeeper的資料結構是一個樹狀化的結構,每個節點稱為Znode,每個ZNode預設儲存1MB的資料可以通過路徑進行唯一標識。(/apps/app1)
節點型別
短暫(ephemeral):客戶端和伺服器端斷開後,這種型別的節點就會刪除。
短暫序號:建立這種節點的時候,會自動的加上一個遞增序號。
持久(persistent):客戶端和伺服器端斷開連線,這種型別的節點不會被刪除。
持久序號:建立這種節點的時候,會自動的加上一個遞增序號。
常用命令
應用程式命令
啟動伺服器端:/bin/zkServer.sh start
關閉伺服器端:/bin/zkServer.sh stop
啟動客戶端:bin/zkCli.sh
客戶端命令
退出客戶端:quit
檢視節點下的子節點:ls /
節點下的子節點變化監聽:ls /test watch
檢視節點下的子節點詳細資訊:ls2 /
建立普通節點:create /test “data”
建立普通節點(序號):create -s /test “data”
建立短暫節點:create -e /testE “data”
建立短暫節點(序號):create -e -s /testES "data"
修改節點資料值:set /test “new data”
獲取節點值:get /test
節點值變化監聽(監聽一次,註冊一次):get /test watch
刪除節點:delete /test
遞迴刪除節點:rmr /test
檢視節點詳情:stat /test
czxid:引起這個znode建立的zxid,建立節點的事務zxid
ctime:znode被建立的毫秒數(從1970開始)
mzxid:znode最後更新的zxid
mtime:znode最後修改的毫秒數(從1970開始)
pZxid:znode最後更新的子節點zxid
cversion:znode子節點的變化號,子節點修改次數
dataversion:znode資料變化號
aclVersion:znode訪問控制列表的變化號
ephemeralOwner:如果是臨時節點,這個znode擁有者的sesion id,如果不是臨時節點就為0
dataLength:znod的資料長度
numChildren:znode子節點數量
Java – API(模擬Zookeeper監控)
ZkClient
public class ZkClient { //連線地址 private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
//會話超時時間 private int sessionTimeout = 2000;
private ZooKeeper zooKeeper;
/** * 初始化 * @author AimSpeed * @Title init * @throws IOException void * @date 2018年12月8日 下午12:29:13 */ public void init() throws IOException { zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("伺服器下線 - Listener:" + event.getType() + " --- " + event.getPath()); //持續監聽伺服器 try { zooKeeper.getChildren("/servers", true); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }); }
/** * 監聽伺服器 * @author AimSpeed * @Title watcherServer * @throws KeeperException * @throws InterruptedException void * @date 2018年12月8日 下午12:30:54 */ public void watcherServer() throws KeeperException, InterruptedException { List<String> children = zooKeeper.getChildren("/servers", true); for (String string : children) { System.out.println("Registered Server:" + string); } Thread.sleep(Long.MAX_VALUE); }
public static void main(String[] args) throws KeeperException, InterruptedException, IOException { ZkClient zkClient = new ZkClient(); zkClient.init(); zkClient.watcherServer(); } } |
ZkServer
public class ZkServer {
//連線 private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
//會話超時時間 private int sessionTimeout = 2000;
//客戶端 private ZooKeeper zooKeeper;
/** * 初始化 * @author AimSpeed * @Title init * @throws IOException void * @date 2018年12月8日 下午12:21:22 */ public void init() throws IOException { zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("Listene:" + event.getType() + "---" + event.getPath()); } }); }
/** * 註冊伺服器 * @author AimSpeed * @throws InterruptedException * @throws KeeperException * @Title register void * @date 2018年12月8日 下午12:22:33 */ public void register(String num) throws KeeperException, InterruptedException { String create = zooKeeper.create("/servers/server", ("server" + num).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(create); }
public static void main(String[] args) throws KeeperException, InterruptedException, IOException { ZkServer zkServer = new ZkServer(); zkServer.init(); zkServer.register(args[0]); Thread.sleep(Long.MAX_VALUE); }
} |