1. 程式人生 > >zookeeper 的斷線重連實現

zookeeper 的斷線重連實現

ZooKeeper是一個分散式的,開放原始碼的分散式應用程式協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要元件。

我們可以使用zookeeper做程式的健康監測(EPHEMERAL 臨時節點)、公共配置檔案、叢集管理(leader 選舉)等等。 

但是zookeeper並沒有提供斷線重連的功能,必須我們手動實現,這裡使用 Curator來實現了zookeeper的斷線重連功能,程式碼如下:

import java.io.UnsupportedEncodingException;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;


/**
 * @ClassName: ZookeeperExcutor
 * @Description: zookeeper連線處理器
 */
public class ZookeeperExcutor {
    
    private CuratorFramework client;
    
    public ZookeeperExcutor(String zklist,int sessionTimeout,int connectTimeout){
        client = CuratorFrameworkFactory.builder()
                .connectString(zklist).sessionTimeoutMs(sessionTimeout)
                .connectionTimeoutMs(connectTimeout)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
    }
    
    public CuratorFramework getClient() {
        return client;
    }

    /**
     * @Title: createNodeAddListener
     * @Description: 新增node節點
     * @param nodePath
     * @param nodeData 設定檔案
     * @return void    返回型別
     */
    public String createNode(String nodePath,String nodeData){
        if(client!=null){
            try {
                String nodeName=client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath(nodePath, nodeData.getBytes("UTF-8"));
                return nodeName;
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return null;
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }
        return null;
    }
    /**
     * @Title: getListener
     * @Description: 為節點新增 connectState 監聽器,實現斷線重連,然後新增上節點
     * @param nodePath    節點路徑
     * @param nodeData 節點資料
     * @return void    返回型別
     */
    public ConnectionStateListener getListener(final String nodePath,final String nodeData){
        if(null!=client){
            ConnectionStateListener connectListener = new ConnectionStateListener() {
                @Override
                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                    if (connectionState == ConnectionState.LOST) {
                        while (true) {
                            try {
                                //手動重連
                                boolean flag=curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut();
                                if (flag){
                                    //重新新增節點
                                    clearListener();
                                    createNode(nodePath, nodeData);
                                    client.getConnectionStateListenable().addListener(getListener(nodePath, nodeData));
                                    break;
                                }
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }else if(connectionState==ConnectionState.RECONNECTED){
                        //重新連線成功
                    }else if(connectionState==ConnectionState.SUSPENDED){
                        //自動重連,自動新建 schedular的臨時節點
                    }
                }
                
            };
            return connectListener;
        }
        return null;
    }
    
    public void clearListener(){
        ListenerContainer<ConnectionStateListener> list=(ListenerContainer<ConnectionStateListener>) client.getConnectionStateListenable();
        list.clear();
    }
    
    public void addListener(String nodePath,String nodeData){
        client.getConnectionStateListenable().addListener(getListener(nodePath, nodeData));
    }
    
    public static void main(String[] args) {
        ZookeeperExcutor zke=new ZookeeperExcutor("127.0.0.1:2181",10000, 10000);
        String nodeName=zke.createNode("/Test", "test");
        if(null!=nodeName){
            zke.addListener("/Test", "test");
        }
    }
}


Curator是Netflix公司開源的一個Zookeeper客戶端,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量,原生的zookeeper實現起來稍微麻煩一點,下面是Curator的maven配置:
<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-framework</artifactId>
	<version>2.4.2</version>
</dependency>
<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-client</artifactId>
	<version>2.4.2</version>
</dependency>
<dependency>
	<groupId>org.apache.curator</groupId>
	<artifactId>curator-recipes</artifactId>
	<version>2.4.2</version>
</dependency>