1. 程式人生 > >Springboot2(29)整合zookeeper的增刪改查、節點監聽、分散式讀寫鎖、分散式計數器

Springboot2(29)整合zookeeper的增刪改查、節點監聽、分散式讀寫鎖、分散式計數器

原始碼地址

springboot2教程系列

實現zookeeper節點的增刪改查、節點監聽、分散式讀寫鎖、分散式計數器

新增依賴

   <properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<java.version>1.8</java.version>
		<zookeeper.version>3.4.8</zookeeper.version>
		<curator.version>2.11.1</curator.version>
	</properties>
	
	<dependencies>

		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>${zookeeper.version}</version>
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-api</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>${curator.version}</version>
		</dependency>

    </dependencies>

ZkClient(curator)

這裡使用的是curator,curator是對zookeeper的簡單封裝,提供了一些整合的方法,或者是提供了更優雅的api

/**
 * zookeeper客戶端
 */
@Data
@Slf4j
public class ZkClient {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private CuratorFramework client;
    public TreeCache cache;
    private
ZookeeperProperties zookeeperProperties; public ZkClient(ZookeeperProperties zookeeperProperties){ this.zookeeperProperties = zookeeperProperties; } /** * 初始化zookeeper客戶端 */ public void init() { try{ RetryPolicy retryPolicy = new ExponentialBackoffRetry(
zookeeperProperties.getBaseSleepTimeMs(), zookeeperProperties.getMaxRetries()); Builder builder = CuratorFrameworkFactory.builder() .connectString(zookeeperProperties.getServer()).retryPolicy(retryPolicy) .sessionTimeoutMs( zookeeperProperties.getSessionTimeoutMs()) .connectionTimeoutMs( zookeeperProperties.getConnectionTimeoutMs()) .namespace( zookeeperProperties.getNamespace()); if(StringUtils.isNotEmpty( zookeeperProperties.getDigest())){ builder.authorization("digest", zookeeperProperties.getDigest().getBytes("UTF-8")); builder.aclProvider(new ACLProvider() { @Override public List<ACL> getDefaultAcl() { return ZooDefs.Ids.CREATOR_ALL_ACL; } @Override public List<ACL> getAclForPath(final String path) { return ZooDefs.Ids.CREATOR_ALL_ACL; } }); } client = builder.build(); client.start(); initLocalCache("/test"); // addConnectionStateListener(); client.getConnectionStateListenable().addListener(new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState state) { if (state == ConnectionState.LOST) { //連線丟失 logger.info("lost session with zookeeper"); } else if (state == ConnectionState.CONNECTED) { //連線新建 logger.info("connected with zookeeper"); } else if (state == ConnectionState.RECONNECTED) { logger.info("reconnected with zookeeper"); } } }); }catch(Exception e){ e.printStackTrace(); } } /** * 初始化本地快取 * @param watchRootPath * @throws Exception */ private void initLocalCache(String watchRootPath) throws Exception { cache = new TreeCache(client, watchRootPath); TreeCacheListener listener = (client1, event) ->{ log.info("event:" + event.getType() + " |path:" + (null != event.getData() ? event.getData().getPath() : null)); if(event.getData()!=null && event.getData().getData()!=null){ log.info("發生變化的節點內容為:" + new String(event.getData().getData())); } // client1.getData(). }; cache.getListenable().addListener(listener); cache.start(); } public void stop() { client.close(); } public CuratorFramework getClient() { return client; } /** * 建立節點 * @param mode 節點型別 * 1、PERSISTENT 持久化目錄節點,儲存的資料不會丟失。 * 2、PERSISTENT_SEQUENTIAL順序自動編號的持久化目錄節點,儲存的資料不會丟失 * 3、EPHEMERAL臨時目錄節點,一旦建立這個節點的客戶端與伺服器埠也就是session 超時,這種節點會被自動刪除 *4、EPHEMERAL_SEQUENTIAL臨時自動編號節點,一旦建立這個節點的客戶端與伺服器埠也就是session 超時,這種節點會被自動刪除,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點名。 * @param path 節點名稱 * @param nodeData 節點資料 */ public void createNode(CreateMode mode, String path , String nodeData) { try { //使用creatingParentContainersIfNeeded()之後Curator能夠自動遞迴建立所有所需的父節點 client.create().creatingParentsIfNeeded().withMode(mode).forPath(path,nodeData.getBytes("UTF-8")); } catch (Exception e) { logger.error("註冊出錯", e); } } /** * 建立節點 * @param mode 節點型別 * 1、PERSISTENT 持久化目錄節點,儲存的資料不會丟失。 * 2、PERSISTENT_SEQUENTIAL順序自動編號的持久化目錄節點,儲存的資料不會丟失 * 3、EPHEMERAL臨時目錄節點,一旦建立這個節點的客戶端與伺服器埠也就是session 超時,這種節點會被自動刪除 * 4、EPHEMERAL_SEQUENTIAL臨時自動編號節點,一旦建立這個節點的客戶端與伺服器埠也就是session 超時,這種節點會被自動刪除,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點名。 * @param path 節點名稱 */ public void createNode(CreateMode mode,String path ) { try { //使用creatingParentContainersIfNeeded()之後Curator能夠自動遞迴建立所有所需的父節點 client.create().creatingParentsIfNeeded().withMode(mode).forPath(path); } catch (Exception e) { logger.error("註冊出錯", e); } } /** * 刪除節點資料 * * @param path */ public void deleteNode(final String path) { try { deleteNode(path,true); } catch (Exception ex) { log.error("{}",ex); } } /** * 刪除節點資料 * @param path * @param deleteChildre 是否刪除子節點 */ public void deleteNode(final String path,Boolean deleteChildre){ try { if(deleteChildre){ //guaranteed()刪除一個節點,強制保證刪除, // 只要客戶端會話有效,那麼Curator會在後臺持續進行刪除操作,直到刪除節點成功 client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path); }else{ client.delete().guaranteed().forPath(path); } } catch (Exception e) { e.printStackTrace(); } } /** * 設定指定節點的資料 * @param path * @param datas */ public void setNodeData(String path, byte[] datas){ try { client.setData().forPath(path, datas); }catch (Exception ex) { log.error("{}",ex); } } /** * 獲取指定節點的資料 * @param path * @return */ public byte[] getNodeData(String path){ Byte[] bytes = null; try { if(cache != null){ ChildData data = cache.getCurrentData(path); if(data != null){ return data.getData(); } } client.getData().forPath(path); return client.getData().forPath(path); }catch (Exception ex) { log.error("{}",ex); } return null; } /** * 獲取資料時先同步 * @param path * @return */ public byte[] synNodeData(String path){ client.sync(); return getNodeData( path); } /** * 判斷路徑是否存在 * * @param path * @return */ public boolean isExistNode(final String path) { client.sync(); try { return null != client.checkExists().forPath(path); } catch (Exception ex) { return false; } } /** * 獲取節點的子節點 * @param path * @return */ public List<String> getChildren(String path) { List<String> childrenList = new ArrayList<>(); try { childrenList = client.getChildren().forPath(path); } catch (Exception e) { logger.error("獲取子節點出錯", e); } return childrenList; } /** * 隨機讀取一個path子路徑, "/"為根節點對應該namespace * 先從cache中讀取,如果沒有,再從zookeeper中查詢 * @param path * @return * @throws Exception */ public String getRandomData(String path) { try{ Map<String,ChildData> cacheMap = cache.getCurrentChildren(path); if(cacheMap != null && cacheMap.size() > 0) { logger.debug("get random value from cache,path="+path); Collection<ChildData> values = cacheMap.values(); List<ChildData> list = new ArrayList<>(values); Random rand = new Random(); byte[] b = list.get(rand.nextInt(list.size())).getData(); return new String(b,"utf-8"); } if(isExistNode(path)) { logger.debug("path [{}] is not exists,return null",path); return null; } else { logger.debug("read random from zookeeper,path="+path); List<String> list = client.getChildren().forPath(path); if(list == null || list.size() == 0) { logger.debug("path [{}] has no children return null",path); return null; } Random rand = new Random(); String child = list.get(rand.nextInt(list.size())); path = path + "/" + child; byte[] b = client.getData().forPath(path); String value = new String(b,"utf-8"); return value; } }catch(Exception e){ log.error("{}",e); } return null; } /** * 可重入共享鎖 -- Shared Reentrant Lock * @param lockPath * @param time * @param dealWork 獲取 * @return */ public Object getSRLock(String lockPath,long time, SRLockDealCallback<?> dealWork){ InterProcessMutex lock = new InterProcessMutex(client, lockPath); try { if (!lock.acquire(time, TimeUnit.SECONDS)) { log.error("get lock fail:{}", " could not acquire the lock"); return null; } log.debug("{} get the lock",lockPath); Object b = dealWork.deal(); return b; }catch(Exception e){ log.error("{}", e); }finally{ try { lock.release(); } catch (Exception e) { //log.error("{}",e); } } return null; } /** * 獲取讀寫鎖 * @param path * @return */ public InterProcessReadWriteLock getReadWriteLock(String path){ InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path); return readWriteLock; } /** * 在註冊監聽器的時候,如果傳入此引數,當事件觸發時,邏輯由執行緒池處理 */ ExecutorService pool = Executors.newFixedThreadPool(2); /** * 監聽資料節點的變化情況 * @param watchPath * @param listener */ public void watchPath(String watchPath,TreeCacheListener listener){ // NodeCache nodeCache = new NodeCache(client, watchPath, false); TreeCache cache = new TreeCache(client, watchPath); cache.getListenable().addListener(listener,pool); try { cache.start(); } catch (Exception e) { e.printStackTrace(); } } }

配置檔案

zookeeper.enabled: true
#zookeeper.server: 47.106.106.53:9036,47.106.106.53:9037,47.106.106.53:9038
zookeeper.server: 10.10.2.137:2181,10.10.2.138:2181,10.10.2.139:2181
zookeeper.namespace: demo
zookeeper.digest: rt:rt                     #zkCli.sh acl 命令 addauth digest mpush
zookeeper.sessionTimeoutMs: 1000            #會話超時時間,單位為毫秒,預設60000ms,連線斷開後,其它客戶端還能請到臨時節點的時間
zookeeper.connectionTimeoutMs: 6000         #連線建立超時時間,單位為毫秒
zookeeper.maxRetries: 3                     #最大重試次數
zookeeper.baseSleepTimeMs: 1000             #初始sleep時間 ,毫秒

程式會建立節點demo為namespace,之後所有增刪改查的操作都這節點下完成

Controller層方法

@Api(tags="zookeeper基本操作")
@RequestMapping("/zk")
@RestController
@Slf4j
public class ZookeeperController {

    @Autowired
    private ZkClient zkClient;

    @Autowired
    private ZkClient zkClientTest;

    /**
     * 建立節點
     * @param type
     * @param znode
     * @return
     */
    @ApiOperation(value = "建立節點",notes = "在名稱空間下建立節點")
    @ApiImplicitParams({
            @ApiImplicitParam(name ="type",value = "節點型別:<br> 0 持久化節點<br> 1 臨時節點<br>  2 持久順序節點<br> 3 臨時順序節點",
                    allowableValues = "0,1,2,3",defaultValue="3",paramType = "path",required = true,dataType = "Long"),
            @ApiImplicitParam(name ="znode",value = "節點名稱",paramType = "path",required = true,dataType = "String"),
            @ApiImplicitParam(name ="nodeData",value = "節點資料",paramType = "body",dataType = "String")
    })
    @RequestMapping(value = "/create/{type}/{znode}",method=RequestMethod.POST)
    private String create(@PathVariable Integer type,@PathVariable String znode,@RequestBody String nodeData){
        znode = "/" + znode;
        try {
            zkClient.createNode(CreateMode.fromFlag(type),znode,nodeData);
        } catch (Kee