1. 程式人生 > >curator原始碼(一) 初始化、啟動和關閉。

curator原始碼(一) 初始化、啟動和關閉。

Curator框架是zookeeper客戶端框架,官網有句話說的很簡潔:curator對於zookeeper就像Guava對於java。
重複策略,例項化,眾多實用的食譜選單(分散式鎖,計數器,佇列,柵欄,訊號量,路徑快取)。

初始化

1.直接呼叫CuratorFrameworkFactory的newClient方法

   /**
     * 建立客戶端
     * @param connectString       zk地址
     * @param sessionTimeoutMs    session超時
     * @param connectionTimeoutMs 路徑超時
     * @param
retryPolicy 重複策略 * @return client */
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) { return builder(). connectString(connectString). sessionTimeoutMs(sessionTimeoutMs). connectionTimeoutMs(connectionTimeoutMs). retryPolicy(retryPolicy). build(); }
    /**
     * 返回一個用來建立CuratorFramework新的builder
     * @return new builder
     */
    public static Builder builder()
    {
        return new Builder();
    }

返回的Builder是CuratorFrameworkFactory的內部類,主要用於流式的建立CuratorFramework,裡面包含所需引數

        private EnsembleProvider ensembleProvider;
        private
int sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS; private int connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS; private int maxCloseWaitMs = DEFAULT_CLOSE_WAIT_MS; private RetryPolicy retryPolicy; private ThreadFactory threadFactory = null; private String namespace; private List<AuthInfo> authInfos = null; private byte[] defaultData = LOCAL_ADDRESS; private CompressionProvider compressionProvider = DEFAULT_COMPRESSION_PROVIDER; private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY; private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER; private boolean canBeReadOnly = false; private boolean useContainerParentsIfAvailable = true;

ensembleProvider(配置提供者) 可以通過在Builder裡的以下2個方法構造。設定伺服器的連結地址,
格式: host:port,host2:port2…..
主要由此提供連結地址,供後期zookeeper裡使用。

       public Builder connectString(String connectString)
        {
            ensembleProvider = new FixedEnsembleProvider(connectString);
            return this;
        }

       public Builder ensembleProvider(EnsembleProvider ensembleProvider)
        {
            this.ensembleProvider = ensembleProvider;
            return this;
        }

2.通過Builder的build函式建立客戶端。

       /**
         * 根據builder裡的值建立新的CuratorFramework
         * @return new CuratorFramework
         */
        public CuratorFramework build()
        {
            return new CuratorFrameworkImpl(this);
        }

3.CuratorFrameworkImpl定義

CuratorFrameworkImpl為CuratorFramework介面的一個實現,平時主要用到的就是此client。
該建構函式主要還是使用Builder裡的預設配置的一些引數,這些引數可以通過CuratorFrameworkFactory李的Builder去流式建立。

設定如下引數,如ZookeeperFactory 工廠,CuratorZookeeperClient【重點,客戶端的工作主要靠它】,listeners 監聽,backgroundOperations 後臺執行行為,namespace 名稱空間(用於放置在路徑的字首),
threadFactory CuratorFrameworkImpl的執行緒工廠,connectionStateManager 連結狀態管理器, compressionProvider 壓縮器等等。

public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
    {
        ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
        this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
        {
            @Override
            public void process(WatchedEvent watchedEvent)
            {
                CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()),
                 null, null, null, null, null, watchedEvent, null);
                processEvent(event);
            }
        }, builder.getRetryPolicy(), builder.canBeReadOnly());

        listeners = new ListenerContainer<CuratorListener>();
        unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
        backgroundOperations = new DelayQueue<OperationAndData<?>>();
        namespace = new NamespaceImpl(this, builder.getNamespace());
        threadFactory = getThreadFactory(builder);
        maxCloseWaitMs = builder.getMaxCloseWaitMs();
        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
        compressionProvider = builder.getCompressionProvider();
        aclProvider = builder.getAclProvider();
        state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
        useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();

        byte[] builderDefaultData = builder.getDefaultData();
        defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
        authInfos = buildAuths(builder);

        failedDeleteManager = new FailedDeleteManager(this);
        namespaceFacadeCache = new NamespaceFacadeCache(this);
    }

3.1 ZookeeperFactory的構建

//1.從builder裡拿出ZookeeperFactory
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());

//2.builder裡的預設ZookeeperFactory
private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;
private static final DefaultZookeeperFactory DEFAULT_ZOOKEEPER_FACTORY = new DefaultZookeeperFactory();

//3.ZookeeperFactory的預設實現類,預設zookeeper工廠
//實際上就是new一個org.apache.zookeeper.ZooKeeper,原生態的Zookeeper。
public class DefaultZookeeperFactory implements ZookeeperFactory
{
    @Override
    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
    {
        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
    }
}

//4.包裝一層,加上鑑權資訊
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
    {
        return new ZookeeperFactory()
        {
            @Override
            public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, 
            Watcher watcher, boolean canBeReadOnly) throws Exception
            {
                ZooKeeper zooKeeper = actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
                for ( AuthInfo auth : authInfos )
                {
                    zooKeeper.addAuthInfo(auth.getScheme(), auth.getAuth());
                }

                return zooKeeper;
            }
        };
    }

3.2 ConnectionStateManager的構建

設定其service為單例ExecutorService。主要用在監控連結狀態。

public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory)
    {
        this.client = client;
        if ( threadFactory == null )
        {
            threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
        }
        service = Executors.newSingleThreadExecutor(threadFactory);
    }

4.CuratorZookeeperClient定義

在CuratorFrameworkImpl初始化的時候構建

1.初始化,需要新建Watcher

this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
        {
            @Override
            public void process(WatchedEvent watchedEvent)
            {
                CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null);
                processEvent(event);//呼叫本類的方法
            }
        }, builder.getRetryPolicy(), builder.canBeReadOnly());

2.listeners採用函數語言程式設計,此處的監聽執行就是在ConnectionState裡的process呼叫的。

private void processEvent(final CuratorEvent curatorEvent)
    {
        if ( curatorEvent.getType() == CuratorEventType.WATCHED )
        {
            validateConnection(curatorEvent.getWatchedEvent().getState());
        }

        listeners.forEach(new Function<CuratorListener, Void>()
        {
            @Override
            public Void apply(CuratorListener listener)
            {
                try
                {
                    TimeTrace trace = client.startTracer("EventListener");
                    listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
                    trace.commit();
                }
                catch ( Exception e )
                {
                    logError("Event listener threw exception", e);
                }
                return null;
            }
        });
    } 

3.主要有2個欄位ConnectionState連線狀態,retryPolicy 重複策略就在如下定義。

    private final ConnectionState                   state;
    private final AtomicReference<RetryPolicy>      retryPolicy = new AtomicReference<RetryPolicy>();

初始化主要做了如下工作,校驗重複策略不能為空,校驗配置連結地址提供者不能為空,初始化ConnectionState,設定重複策略。

public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
 int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly)
    {
        if ( sessionTimeoutMs < connectionTimeoutMs )
        {
            log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
        }

        retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
        ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");

        this.connectionTimeoutMs = connectionTimeoutMs;
        state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly);
        setRetryPolicy(retryPolicy);
    }

5.ConnectionState定義

ConnectionState實現了Watcher和Closeable介面。
設定了連結地址的配置策略,session過期時間,連結超時時間,設定日誌追蹤驅動器(使用的預設的DefaultTracerDriver),
將傳遞進來的watcher放入parentWatchers中,最後定義HandleHolder。

private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, 
int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
    {
        this.ensembleProvider = ensembleProvider;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.connectionTimeoutMs = connectionTimeoutMs;
        this.tracer = tracer;
        if ( parentWatcher != null )
        {
            parentWatchers.offer(parentWatcher);
        }

        //這個地方的this就是ConnectionState實現Watcher介面的原因。
        zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
    }

    //主要用來執行CuratorFrameworkImpl裡的監聽。
    //第一、根據當前ConnectionState的isConnected與事件的狀態,
    //去檢查當前的事件的KeeperState,在checkState過程中,
    //若為SyncConnected和ConnectedReadOnly表示為連結狀態,其他則為斷鏈狀態;
    //同時若過期Expired,會重新連結zookeeper,
    //不過期的會去判斷當前連結地址是否發生變化,若發生也會重新連結zookeeper。
    //同時若連結狀態與之前的不同再修改狀態。(詳細的在以後講去了)
    //第二、將parentWatchers裡的所有watcher全部呼叫一次。
    @Override
    public void process(WatchedEvent event)
    {
        if ( LOG_EVENTS )
        {
            log.debug("ConnectState watcher: " + event);
        }

        if ( event.getType() == Watcher.Event.EventType.None )
        {
            boolean wasConnected = isConnected.get();
            boolean newIsConnected = checkState(event.getState(), wasConnected);
            if ( newIsConnected != wasConnected )
            {
                isConnected.set(newIsConnected);
                connectionStartMs = System.currentTimeMillis();
            }
        }

        for ( Watcher parentWatcher : parentWatchers )
        {
            TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get());
            parentWatcher.process(event);
            timeTrace.commit();
        }
    }

private boolean checkState(Event.KeeperState state, boolean wasConnected)
    {
        boolean isConnected = wasConnected;
        boolean checkNewConnectionString = true;
        switch ( state )
        {
        default:
        case Disconnected:
        {
            isConnected = false;
            break;
        }

        case SyncConnected:
        case ConnectedReadOnly:
        {
            isConnected = true;
            break;
        }

        case AuthFailed:
        {
            isConnected = false;
            log.error("Authentication failed");
            break;
        }
        //若過期Expired,會重新連結zookeeper
        case Expired:
        {
            isConnected = false;
            checkNewConnectionString = false;
            handleExpiredSession();
            break;
        }

        case SaslAuthenticated:
        {
            // NOP
            break;
        }
        }

        if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() )
        {
            handleNewConnectionString();
        }

        return isConnected;
    }

6.HandleHolder定義

簡單的設定5個欄位

HandleHolder(ZookeeperFactory zookeeperFactory, 
Watcher watcher, EnsembleProvider ensembleProvider, 
int sessionTimeout, boolean canBeReadOnly)
    {
        this.zookeeperFactory = zookeeperFactory;
        this.watcher = watcher;
        this.ensembleProvider = ensembleProvider;
        this.sessionTimeout = sessionTimeout;
        this.canBeReadOnly = canBeReadOnly;
    }

總結

  • 主要使用到CuratorFrameworkImpl,CuratorZookeeperClient,ConnectionState,HandleHolder4個類,一步一步往後定義;
  • 同時使用ConnectionStateManager進行連結狀態監控;

啟動

CuratorFrameworkImpl啟動

先校驗狀態是否為LATENT並設定為STARTED。
啟動connectionStateManager。
連結狀態管理器中增加一個監聽器,用於在連結狀態時將client的logAsErrorConnectionErrors設為true。
再啟動CuratorZookeeperClient。
executorService定義成2個執行緒的執行器,1個為監聽,1個為後臺。並執行後臺的迴圈操作。

public void start()
    {
        log.info("Starting");
        if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
        {
            throw new IllegalStateException("Cannot be started more than once");
        }

        try
        {
            connectionStateManager.start(); // ordering dependency - must be called before client.start()

            final ConnectionStateListener listener = new ConnectionStateListener()
            {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState)
                {
                    if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
                    {
                        logAsErrorConnectionErrors.set(true);
                    }
                }
            };

            this.getConnectionStateListenable().addListener(listener);

            client.start();

            executorService = Executors.newFixedThreadPool(2, threadFactory);  // 1 for listeners, 1 for background ops

            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    backgroundOperationsLoop();
                    return null;
                }
            });
        }
        catch ( Exception e )
        {
            handleBackgroundOperationException(null, e);
        }
    }

ConnectionStateManager啟動

啟動一個ConnectionStateManager的執行緒執行者,通過listeners的函數語言程式設計去執行監聽狀態變化。

   /**
     * Start the manager
     */
    public void start()
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        service.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        processEvents();
                        return null;
                    }
                }
            );
    }

private void processEvents()
    {
        try
        {
            while ( !Thread.currentThread().isInterrupted() )
            {   //take方法可以阻塞
                final ConnectionState newState = eventQueue.take();

                if ( listeners.size() == 0 )
                {
                    log.warn("There are no ConnectionStateListeners registered.");
                }

                listeners.forEach
                    (
                        new Function<ConnectionStateListener, Void>()
                        {
                            @Override
                            public Void apply(ConnectionStateListener listener)
                            {
                                listener.stateChanged(client, newState);
                                return null;
                            }
                        }
                    );
            }
        }
        catch ( InterruptedException e )
        {
            Thread.currentThread().interrupt();
        }
    }

CuratorZookeeperClient啟動

需要校驗CuratorZookeeperClient是否已啟動,若啟動拋異常。
再去啟動ConnectionState。

 public void start() throws Exception
    {
        log.debug("Starting");

        if ( !started.compareAndSet(false, true) )
        {
            IllegalStateException ise = new IllegalStateException("Already started");
            throw ise;
        }

        state.start();
    }

ConnectionState啟動

先啟動連結地址配置器。
再重設。
在重設中,原子性的增加例項次數1次;
同時將連結狀態設為false,
再去啟動HandleHolder,
同時最重要一步zooKeeper.getZooKeeper()初始化原生態的連結;

void start() throws Exception
    {
        log.debug("Starting");
        ensembleProvider.start();
        reset();
    }

    private synchronized void reset() throws Exception
    {
        log.debug("reset");

        instanceIndex.incrementAndGet();

        isConnected.set(false);
        connectionStartMs = System.currentTimeMillis();
        zooKeeper.closeAndReset();
        zooKeeper.getZooKeeper();   // initiate connection
    }

HandleHolder啟動

可以看出借用內部介面Helper來完成操作的。
Helper介面有2方法,一個獲取原生的zookeeper,一個是獲取連結地址路徑。

private interface Helper
    {
        ZooKeeper getZooKeeper() throws Exception;

        String getConnectionString();
    }

重點還是在closeAndReset方法上。
主要還是在helper上的初始化,
當第一次呼叫getZooKeeper 時用synchronized 包裹,並會對連結地址和zookeeper定義,同時再次定義helper物件,並將此次定義好的兩個值作為返回值去實現此前的2個方法。此刻以後每次呼叫getZooKeeper 時均從子helper裡的方法。避免同時有客戶端new zookeeper。

void closeAndReset() throws Exception
    {
        internalClose();
        // first helper is synchronized when getZooKeeper is called. Subsequent calls
        // are not synchronized.
        helper = new Helper()
        {
            private volatile ZooKeeper zooKeeperHandle = null;
            private volatile String connectionString = null;

            @Override
            public ZooKeeper getZooKeeper() throws Exception
            {
                synchronized(this)
                {
                    if ( zooKeeperHandle == null )
                    {
                        connectionString = ensembleProvider.getConnectionString();
                        zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly);
                    }

                    helper = new Helper()
                    {
                        @Override
                        public ZooKeeper getZooKeeper() throws Exception
                        {
                            return zooKeeperHandle;
                        }

                        @Override
                        public String getConnectionString()
                        {
                            return connectionString;
                        }
                    };

                    return zooKeeperHandle;
                }
            }

            @Override
            public String getConnectionString()
            {
                return connectionString;
            }
        };
    }

ZooKeeper getZooKeeper() throws Exception
    {
        return (helper != null) ? helper.getZooKeeper() : null;
    }