基於zk的分布幸運28源碼下載式鎖(leader選舉)的實現
阿新 • • 發佈:2018-07-05
rest logger 接口 ets abstract 問題 .get single created 做了幸運28源碼下載論壇:haozbbs.com Q1446595067 兩版實現,先是直接用zk的接口做的,後來又用curator做了個。主要是用來在集群環境中確定一個主節點來完成一些需要單獨執行的任務(我的需求是向es中補充數據)。
自己實現的版本,主要思路是搶占式創建一個EPHEMERAL型zk路徑,創建成功的就認為是leader,創建失敗則作為follower監聽此路徑,當路徑消失時再次搶占。創建zkelection時傳入一個監聽對象linstener,在zk相關事件觸發時調用linstener的相關方法進行處理(比如在ApplicationContext中發布一個事件),此版本主要邏輯下:
@Deprecated abstract class ZookeeperOperations implements Watcher, AutoCloseable{ private final static Logger logger=Logger.getLogger(ZookeeperElection.class); private ZooKeeper zooKeeper; private String hosts; private String digest; private int timeoutInMs; public ZookeeperOperations( String hosts, int timeoutInMs) { this(hosts,null,timeoutInMs); } public ZookeeperOperations(String hosts, String digest, int timeoutInMs) { this.hosts = hosts; this.digest = digest; this.timeoutInMs = timeoutInMs; } protected void infoLog(String message){ if (logger.isInfoEnabled()){ logger.info(message); } } protected synchronized ZooKeeper zooKeeper(){ if (zooKeeper==null){ reconnect(); } return zooKeeper; } protected synchronized void reconnect(){ try{ if(zooKeeper!=null){ close(); } }catch (Exception e){} do{ infoLog("嘗試連接到Zookeeper : "+hosts); try { zooKeeper=new ZooKeeper(hosts,timeoutInMs,this); if (digest!=null){ zooKeeper.addAuthInfo("digest",digest.getBytes()); } return ; }catch (IOException e){ zooKeeper=null; logger.error("Zookeeper連接失敗,準備在20秒後重試",e); } try { Thread.sleep(20000l); } catch (InterruptedException e) {} }while (zooKeeper==null); } @Override public void close() throws InterruptedException { if (zooKeeper!=null){ zooKeeper.close(); } } private ArrayList<ACL> acls(){ if(digest==null){ return ZooDefs.Ids.OPEN_ACL_UNSAFE; } try { Id id1 = new Id("digest", DigestAuthenticationProvider.generateDigest(digest)); return new ArrayList<>(Collections.singletonList( new ACL(ZooDefs.Perms.ALL, id1))); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } return ZooDefs.Ids.OPEN_ACL_UNSAFE; } /** * 嘗試遞歸的創建持久目錄 * @param path * @throws KeeperException * @throws InterruptedException */ protected void createPathRecursive(ZooKeeper zooKeeper,String path) throws KeeperException, InterruptedException { Stack<String> tmp=new Stack<>(); if(path.charAt(0)!=‘/‘){ path="/"+path; } boolean needCheck=true; infoLog("準備檢查目錄 "+path); do{ String parent=path.substring(0,path.lastIndexOf(‘/‘)); if(zooKeeper.exists(path,false)==null){ if ((!needCheck)|| parent.equals("")|| zooKeeper.exists(parent,false)!=null){ try { infoLog("目錄 "+path+" 不存在,準備創建"); zooKeeper.create(path,new byte[0], acls(), CreateMode.PERSISTENT); infoLog("目錄 "+path+" 創建成功"); } catch (KeeperException e) { if(!(e instanceof KeeperException.NodeExistsException)){ throw e; }else { infoLog("目錄 "+path+" 已被其它服務創建成功"); } } needCheck=false; if(tmp.size()>0){ path=tmp.pop(); }else { break; } }else { tmp.push(path); path=parent; } }else if(tmp.size()>0){ infoLog("目錄 "+ path +" 已存在"); path=tmp.pop(); }else { infoLog("目錄 "+ path +" 已存在"); break; } }while (true); } } @Deprecated public class ZookeeperElection extends ZookeeperOperations implements Runnable { private String root; private ZkElectionListener listener; private CountDownLatch latch=new CountDownLatch(1); private final static Logger logger=Logger.getLogger(ZookeeperElection.class); public ZookeeperElection(String hosts, String root, int timeoutInMs, ZkElectionListener listener) { super(hosts,timeoutInMs); this.root=root; this.listener=listener; } public ZookeeperElection(String hosts, String root, String digest,int timeoutInMs, ZkElectionListener listener) { super(hosts,digest,timeoutInMs); this.root=root; this.listener=listener; } private String leaderPath(){ return root+"/leader"; } private String nodePath(){ return root+"/node"; } @Override public void run() { ZooKeeper zooKeeper=zooKeeper(); try { String zNode=leaderPath(); //循環嘗試獲取 do{ Stat stat=zooKeeper.exists(root,false); if (stat==null){ createPathRecursive(zooKeeper,root); } boolean wait=false; if(!findLeader(zNode)){ boolean success=tryLeader(zNode); if(success){ infoLog(Thread.currentThread().getName()+" 已選舉作為leader節點"); listener.onLeader(); wait=true; } }else { infoLog(Thread.currentThread().getName()+" 已選舉作為follower節點"); // tryFollow(zNode,stateListener); listener.onFollower(); wait=true; } if (wait){ if(latch.getCount()==0){ latch=new CountDownLatch(1); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } }else{ infoLog(Thread.currentThread().getName()+" 選舉失敗,3秒後重拾"); Thread.sleep(3000); } }while (true); } catch (KeeperException e) { logger.error("連接zookeeper失敗",e); } catch (InterruptedException e) { logger.error(e); } } /** * 生成當前服務器標識 * @return */ protected byte[] serverTag(){ try { return InetAddress.getLocalHost().getAddress(); } catch (Exception e) {} return new byte[0]; } private boolean findLeader(String zNode){ try { byte[] data=zooKeeper().getData(zNode,true,null); return data!=null; } catch (KeeperException e) { if(e instanceof KeeperException.NoNodeException){ return false; } } catch (Exception e) { e.printStackTrace(); } return false; } private boolean tryLeader(String zNode) throws KeeperException { ZooKeeper zooKeeper=zooKeeper(); String leader=null; try { leader=zooKeeper.create(zNode,serverTag(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); zooKeeper.exists(zNode,true); if(logger.isDebugEnabled()){ logger.debug("leader path : "+leader); } } catch (KeeperException e) { if (e instanceof KeeperException.NodeExistsException){ logger.error(Thread.currentThread().getName()+" 參與選舉Leader競爭失敗,已有其它node被選舉為Leader"); }else { throw e; } } catch (Exception e) { logger.error(Thread.currentThread().getName()+" 參與選舉Leader失敗",e); } if(leader!=null){ return true; } return false; } private boolean tryFollow(String zNode){ ZooKeeper zooKeeper=zooKeeper(); try { String node=zooKeeper.create(nodePath(),serverTag(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); infoLog(node+" 節點已註冊"); return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } @Override public void process(WatchedEvent event) { Event.EventType type=event.getType(); switch (type){ case None:{ //連接事件處理 choseState(event.getState()); } break; case NodeDataChanged:{ infoLog(event.getPath()+" 數據變動"); } break; case NodeCreated: { infoLog(event.getPath()+" 數據新建"); } break; case NodeChildrenChanged:{ infoLog(event.getPath()+" 子數據變動"); } break; case NodeDeleted:{ infoLog(event.getPath()+" 數據刪除"); if(event.getPath().equals(leaderPath())){ listener.onLeaderOff(); latch.countDown(); } } break; } } private void choseState(Event.KeeperState state){ switch (state){ case AuthFailed:{ logger.error("Zookeeper連接驗證失敗"); listener.onDisconnect(); } break; case SyncConnected:{ infoLog("Zookeeper節點連接成功"); listener.onConnect(); } break; case Disconnected:{ infoLog("Zookeeper連接已斷開"); listener.onDisconnect(); } break; case Expired:{ infoLog("Zookeeper會話超時斷開,正在準備重新連接"); listener.onDisconnect(); reconnect(); latch.countDown(); } } } }
此方法逼格不高,所以後來又用curator的分布式同步鎖機制做了另外的實現。
雖然curator實現了一個選舉,但他的實現我覺得存在一點問題。比如當一個節點被選舉為leader時,並沒有對自身路徑進行監聽。當你主動刪除了zk上的leader節點路徑時(比如版本平滑升級時),follower節點已經進行下一輪選舉並選出新的leader節點了,但之前的leader節點依然認為自己是leader節點,還會繼續執行leader節點的操作。所以用了他的鎖機制自己又弄了下。
大概如下:
/** * 使用Curator做的leader選舉,類似LeaderSelector機制,但做了自我lockpath路徑監聽。 * <br></>LeaderSelector的缺點在於它沒有監聽自身lockpath的路徑變更。 * 因此當手工刪除lockpath時,它不會發現其它follower已經升級為leader,還會認為自己是leader。 * Created by lewis on 18-6-21. */ public class ZkCuratorElection extends ZkCurator implements AutoCloseable{ private final static Logger logger=Logger.getLogger(ZkCuratorElection.class); public static ZkCuratorElection create(String connectString, int retryIntervalInMs, int retryTimes){ return new ZkCuratorElection(connectString,retryIntervalInMs,retryTimes); } protected ZkCuratorElection(String connectString, int retryIntervalInMs, int retryTimes) { super(connectString, retryIntervalInMs, retryTimes); } public LeaderTask tryLeader(@NotNull String path, String id){ return new LeaderTask(this,(Void v)->{ start(); PathObserverMutex mutex=new PathObserverMutex(client,path); mutex.setId(id); return mutex; }); } @Override public void close() throws Exception { super.close(); } private AtomicBoolean hasRun=new AtomicBoolean(false); /** * 用於接受參數 */ public static class LeaderTask { private volatile boolean loopInLeaderShip=false; private volatile boolean autoQueueOnLost=false; private ZkCuratorElection election; private Function<Void,PathObserverMutex> prepareStart; /** * 獲得鎖後循環調用任務 * @return */ public LeaderTask loopInLeaderShip(){ this.loopInLeaderShip=true; return this; } /** * 失去鎖後重新加入請求鎖的隊列 * @return */ public LeaderTask autoQueueAfterLost(){ this.autoQueueOnLost=true; return this; } private LeaderTask(ZkCuratorElection election, Function<Void,PathObserverMutex> prepareStart) { this.election = election; this.prepareStart = prepareStart; } /** * 設置要相應的任務 * @param function * @return */ public ClosableLockerThread accept(Runnable function){ return new ClosableLockerThread(){ public void stopNextRun(){ loopInLeaderShip=false; autoQueueOnLost=false; } private PathObserverMutex mutex; private Function<Void,Void> triggerStart; private Function<Void,Void> triggerGet; private Function<Void,Void> triggerLost; private Function<Void,Void> triggerRelease; private Function<Void,Void> triggerExit; @Override public void run() { try { if(!election.hasRun.compareAndSet(false,true)){ throw new RuntimeException("任務不能多次啟動"); } this.mutex=prepareStart.apply(null); mutex.setTriggerGet(triggerGet); mutex.setTriggerLost(triggerLost); mutex.setTriggerRelease(triggerRelease); if(triggerStart!=null){ triggerStart.apply(null); } do{ try{ mutex.acquire(); if (logger.isDebugEnabled()){ logger.debug(Thread.currentThread().getName()+" 獲得zk鎖,開始執行分配的任務"); } do{ if(mutex.holdingLock()){ function.run(); } }while (mutex.holdingLock()&&loopInLeaderShip); if (logger.isDebugEnabled()){ logger.debug(Thread.currentThread().getName()+" 失去zk鎖,分配的任務不再執行"); } }finally { mutex.release(); } }while (autoQueueOnLost); }catch (Exception e){ logger.warn("job 運行出現錯誤,即將退出執行",e); }finally { if (logger.isDebugEnabled()){ logger.debug(Thread.currentThread().getName()+" 任務已完全退出,準備關閉zk連接"); } try { election.close(); } catch (Exception e) {} if(triggerExit!=null){ triggerExit.apply(null); } } } public boolean holdingLock() { return mutex!=null&&mutex.holdingLock(); } @Override public ClosableLockerThread onStart(Function<Void, Void> trigger) { this.triggerStart=trigger; return this; } @Override public ClosableLockerThread onGetLock(Function<Void, Void> trigger) { this.triggerGet=trigger; return this; } @Override public ClosableLockerThread onLostLock(Function<Void, Void> trigger) { this.triggerLost=trigger; return this; } @Override public ClosableLockerThread onReleaseLock(Function<Void, Void> trigger) { this.triggerRelease=trigger; return this; } @Override public ClosableLockerThread onExit(Function<Void, Void> trigger) { triggerExit=trigger; return this; } }; } } public static abstract class ClosableLockerThread extends Thread { /** * 下一輪任務將不再執行,退出線程 */ public abstract void stopNextRun(); /** * 當前線程是否拿到鎖了 * @return */ public abstract boolean holdingLock(); /** * 當線程準備完畢,開始嘗試競爭鎖前調用 * @param trigger * @return */ public abstract ClosableLockerThread onStart(Function<Void,Void> trigger); /** * 得到鎖時會回調 * @param trigger * @return */ public abstract ClosableLockerThread onGetLock(Function<Void,Void> trigger); /** * 失去鎖時會回調 * @param trigger * @return */ public abstract ClosableLockerThread onLostLock(Function<Void,Void> trigger); /** * * @param trigger * @return */ public abstract ClosableLockerThread onReleaseLock(Function<Void,Void> trigger); /** * 線程退出時會回調 * @param trigger * @return */ public abstract ClosableLockerThread onExit(Function<Void,Void> trigger); } /** * 帶路徑監控的鎖 */ public static class PathObserverMutex extends InterProcessMutex{ private CuratorFramework client; private volatile boolean holdingLock=false; private String id; private Function<Void,Void> triggerGet; private Function<Void,Void> triggerRelease; private Function<Void,Void> triggerLost; public PathObserverMutex(CuratorFramework client, String path) { super(client, path); this.client=client; } public PathObserverMutex(CuratorFramework client, String path, LockInternalsDriver driver) { super(client, path, driver); this.client=client; } @Override public void acquire() throws Exception { super.acquire(); holdingLock=true; this.client.checkExists().usingWatcher(new PathDeleteWatcher()).inBackground().forPath(getLockPath()); if(triggerGet!=null){ triggerGet.apply(null); } } @Override public boolean acquire(long time, TimeUnit unit) throws Exception { holdingLock=super.acquire(time, unit); if (holdingLock){ this.client.checkExists().usingWatcher(new PathDeleteWatcher()).inBackground().forPath(getLockPath()); if(triggerGet!=null){ triggerGet.apply(null); } } return holdingLock; } @Override protected byte[] getLockNodeBytes() { return (id!=null)?id.getBytes(Charset.forName("UTF-8")):super.getLockNodeBytes(); } public boolean holdingLock(){ return holdingLock; } public String getId() { return id; } public void setId(String id) { this.id = id; } public void setTriggerGet(Function<Void, Void> triggerGet) { this.triggerGet = triggerGet; } public void setTriggerLost(Function<Void, Void> triggerLost) { this.triggerLost = triggerLost; } public void setTriggerRelease(Function<Void, Void> triggerRelease) { this.triggerRelease = triggerRelease; } @Override public void release() throws Exception { super.release(); if(this.triggerRelease!=null){ triggerRelease.apply(null); } } private class PathDeleteWatcher implements Watcher{ private void rebuildWatcher(){ //對於其它事件相應,則因為監聽已失效,需要補一次監聽 PathDeleteWatcher watcher=new PathDeleteWatcher(); try { PathObserverMutex.this.client .checkExists() .usingWatcher(watcher) .inBackground() .forPath(getLockPath()); } catch (Exception e) { //如果監聽添加失敗,則認為已失去鎖 PathObserverMutex.this.holdingLock=false; logger.warn("構造監聽事件異常",e); } } private void resetLost(){ boolean beforeHold=PathObserverMutex.this.holdingLock; PathObserverMutex.this.holdingLock=false; if(beforeHold&&PathObserverMutex.this.triggerLost!=null){ PathObserverMutex.this.triggerLost.apply(null); } } @Override public void process(WatchedEvent event) { if (logger.isDebugEnabled()){ logger.debug("路徑事件觸發 "+event.toString()); } switch (event.getState()){ case SyncConnected: tellType(event.getType()); break; default: resetLost(); break; } } private void tellType(Event.EventType type){ switch (type){ case NodeDeleted:{ //如果是節點已被刪除,則認為已失去鎖 resetLost(); } break; default: rebuildWatcher(); } }
基於zk的分布幸運28源碼下載式鎖(leader選舉)的實現