1. 程式人生 > >elasticsearch原始碼分析——叢集狀態

elasticsearch原始碼分析——叢集狀態

現在的工程就是在原始碼的層面進行改動,之前因為一個問題出現了叢集假死的狀態。所以才深入的去分析了,原始碼的叢集同步的狀態。

簡述

  首先需要明白,類似於solr使用的是zookeeper來進行叢集狀態的同步。等於是使用了三方件實現叢集狀態的維護。但是要明白elasticsearch沒有用到zookeeper,etcd來管理節點的主備邏輯。
  所以,叢集狀態同步是怎麼完成的呢。
  推薦看一下這篇文章 ELASTICSEARCH 機制和架構 這個網站寫了很多elasticsearch相關的分析,對我的啟發不小。我也只是在他的文章的期初上做點發揮。

節點型別

  不說那麼複雜,簡單關注兩個節點型別。

master節點

  首先,在elasticsearch.yml檔案中只有配置了node.master: true ,本節點才能保證可以被選為主節點。

如果自己做原始碼分析,最好是將master和data節點分開,如果可以就自己多打點日誌。或者開啟debug日誌,可以簡單跟蹤一下流程。單節點除錯的話,因為很多流程是非同步的,所以不一定能分離的很清楚。

  其次,主節點主要就是負責叢集狀態的下發。關注ClusterService類。
  狀態更新的入口,至於怎麼走到這個入口的慢慢分析:

void runTasks(TaskInputs taskInputs) {
        ...
        TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, startTimeNS);  // 第一個重點,主節點計算metadata,比如建立index之後的叢集狀態。
taskOutputs.notifyFailedTasks(); if (taskOutputs.clusterStateUnchanged()) { taskOutputs.notifySuccessfulTasksOnUnchangedClusterState(); TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] no change in cluster_state", taskInputs.summary, executionTime); warnAboutSlowTaskIfNeeded(executionTime, taskInputs.summary); } else { ClusterState newClusterState = taskOutputs.newClusterState; if (logger.isTraceEnabled()) { logger.trace("cluster state updated, source [{}]\n{}", taskInputs.summary, newClusterState); } else if (logger.isDebugEnabled()) { logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), taskInputs.summary); } try { publishAndApplyChanges(taskInputs, taskOutputs); // 看名字就知道什麼意思了,將叢集狀態下發。 TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); logger.debug("processing [{}]: took [{}] done applying updated cluster_state (version: {}, uuid: {})", taskInputs.summary, executionTime, newClusterState.version(), newClusterState.stateUUID()); warnAboutSlowTaskIfNeeded(executionTime, taskInputs.summary); } catch (Exception e) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); final long version = newClusterState.version(); final String stateUUID = newClusterState.stateUUID(); final String fullState = newClusterState.toString(); logger.warn( (Supplier<?>) () -> new ParameterizedMessage( "failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}", executionTime, version, stateUUID, taskInputs.summary, fullState), e); // TODO: do we want to call updateTask.onFailure here? } } }

  下面關注一下,狀態時怎麼下發的,這個流程也比較長,慢慢更新吧。
  每一次狀態更新都會對應一個version,根據這個version就可以判斷,哪一次更新是最新的。

    private void publishAndApplyChanges(TaskInputs taskInputs, TaskOutputs taskOutputs) {
        ClusterState previousClusterState = taskOutputs.previousClusterState;
        ClusterState newClusterState = taskOutputs.newClusterState;

        ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(taskInputs.summary, newClusterState, previousClusterState);
        // new cluster state, notify all listeners
        final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
        if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
            String summary = nodesDelta.shortSummary();
            if (summary.length() > 0) {
                logger.info("{}, reason: {}", summary, taskInputs.summary);
            }
        }

        final Discovery.AckListener ackListener = newClusterState.nodes().isLocalNodeElectedMaster() ?
            taskOutputs.createAckListener(threadPool, newClusterState) :
            null;

        nodeConnectionsService.connectToNodes(newClusterState.nodes());

        // if we are the master, publish the new state to all nodes
        // we publish here before we send a notification to all the listeners, since if it fails
        // we don't want to notify
        // 這裡就是主節點的轉發邏輯
        if (newClusterState.nodes().isLocalNodeElectedMaster()) {
            logger.debug("publishing cluster state version [{}]", newClusterState.version());
            try { // 好吧,又是函數語言程式設計,經過我的一路跟蹤,預設使用的ZenDiscovery的publish方法,後面詳細解釋這個流程。
                clusterStatePublisher.accept(clusterChangedEvent, ackListener);
            } catch (Discovery.FailedToCommitClusterStateException t) {
                final long version = newClusterState.version();
                logger.warn(
                    (Supplier<?>) () -> new ParameterizedMessage(
                        "failing [{}]: failed to commit cluster state version [{}]", taskInputs.summary, version),
                    t);
                // ensure that list of connected nodes in NodeConnectionsService is in-sync with the nodes of the current cluster state
                nodeConnectionsService.connectToNodes(previousClusterState.nodes());
                nodeConnectionsService.disconnectFromNodesExcept(previousClusterState.nodes());
                taskOutputs.publishingFailed(t);
                return;
            }
        }

        logger.debug("applying cluster state version {}", newClusterState.version());
        try {
            // nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
            if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
                final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
                clusterSettings.applySettings(incomingSettings);
            }
        } catch (Exception ex) {
            logger.warn("failed to apply cluster settings", ex);
        }

        logger.debug("set local cluster state to version {}", newClusterState.version());
        // 注意這個地方,master節點是先給其它節點發送請求,如果有節點沒有響應,預設的是30s超時,之後才會走到本地節點的狀態更新。記得是本地的data節點,所以將master和data節點進行分離,原始碼比較好分析。
        // 這裡就有一個問題,加入說一個shard有三個shard分佈在三個node上,每個shard刪除加入說需要1s的話。這裡相當遠是同步的方法,所以總共的刪除時間就需要2s。
        callClusterStateAppliers(newClusterState, clusterChangedEvent);

        nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());

        updateState(css -> newClusterState);

        Stream.concat(clusterStateListeners.stream(), timeoutClusterStateListeners.stream()).forEach(listener -> {
            try {
                logger.trace("calling [{}] with change to version [{}]", listener, newClusterState.version());
                listener.clusterChanged(clusterChangedEvent);
            } catch (Exception ex) {
                logger.warn("failed to notify ClusterStateListener", ex);
            }
        });

        //manual ack only from the master at the end of the publish
        if (newClusterState.nodes().isLocalNodeElectedMaster()) {
            try {
                ackListener.onNodeAck(newClusterState.nodes().getLocalNode(), null);
            } catch (Exception e) {
                final DiscoveryNode localNode = newClusterState.nodes().getLocalNode();
                logger.debug(
                    (Supplier<?>) () -> new ParameterizedMessage("error while processing ack for master node [{}]", localNode),
                    e);
            }
        }

        taskOutputs.processedDifferentClusterState(previousClusterState, newClusterState);

        if (newClusterState.nodes().isLocalNodeElectedMaster()) {
            try {
                taskOutputs.clusterStatePublished(clusterChangedEvent);
            } catch (Exception e) {
                logger.error(
                    (Supplier<?>) () -> new ParameterizedMessage(
                        "exception thrown while notifying executor of new cluster state publication [{}]",
                        taskInputs.summary),
                    e);
            }
        }
    }

狀態分發

  狀態的分發,其實包括兩個階段。一個叫send一個叫commit。目的就是保證叢集狀態的一致性。master首先發送send請求,如果有足夠的節點發送了響應,那接下來master節點再發送commit請求,這時候其它節點才開始執行。那麼這就牽扯到了幾個問題。
  1、send請求傳送之後,其它節點會講這個state儲存在一個佇列裡面。
  2、接收到commit請求的時候,將佇列中的節點標記為marked,然後進行處理。
  3、send請求,SEND_ACTION_NAME = “internal:discovery/zen/publish/send”;
  4、commit請求,COMMIT_ACTION_NAME = “internal:discovery/zen/publish/commit”
  順著這個action name你就能找到它的傳送和處理邏輯。elasticsearch很多地方都是這樣進行請求傳送和處理的。

處理邏輯

  一路跟啊跟的,你就能看到建立和刪除的流程是在以下地方執行的。IndicesClusterStateService,其實也就是在上面的ClusterService做本地更新的時候呼叫的。就是這個方法,callClusterStateAppliers(newClusterState, clusterChangedEvent);

@Override
    public synchronized void applyClusterState(final ClusterChangedEvent event) {
        if (!lifecycle.started()) {
            return;
        }

        final ClusterState state = event.state();

        // we need to clean the shards and indices we have on this node, since we
        // are going to recover them again once state persistence is disabled (no master / not recovered)
        // TODO: feels hacky, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks?
        if (state.blocks().disableStatePersistence()) {
            for (AllocatedIndex<? extends Shard> indexService : indicesService) {
                indicesService.removeIndex(indexService.index(), NO_LONGER_ASSIGNED,
                    "cleaning index (disabled block persistence)"); // also cleans shards
            }
            return;
        }

        updateFailedShardsCache(state);

        deleteIndices(event); // also deletes shards of deleted indices

        removeUnallocatedIndices(event); // also removes shards of removed indices

        failMissingShards(state);

        removeShards(state);   // removes any local shards that doesn't match what the master expects

        updateIndices(event); // can also fail shards, but these are then guaranteed to be in failedShardsCache

        createIndices(state);

        createOrUpdateShards(state);
    }

關注點:

  1. 此方法是synchronized,同步的方法,也就是說,前一個狀態沒有更新完,下一個狀態是進不來的。
  2. 那麼就有一個問題,如果建立或者刪除耗時較長,那不就有阻塞了?其實這個方法裡面的都是元資料的更新,刪除和比較耗時的資料recovery流程都是在後臺執行緒執行的。所以邏輯上是不會卡主執行緒的。其實牽扯到recovery的流程還是有一定的複雜度在裡面的,後續專門寫一篇文章介紹吧。

     經過這麼一個複雜的流程,叢集的狀態也就更新了。

data node

  主要就是負責資料的寫入,預設data node的值為true。
  主要關注,叢集狀態時怎麼在data node進行更新的。

send訊息

  上面有提到send請求使用的action名是SEND_ACTION_NAME,根據這個就可以找到處理邏輯。

protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException {
        Compressor compressor = CompressorFactory.compressor(request.bytes());
        StreamInput in = request.bytes().streamInput();
        try {
            if (compressor != null) {
                in = compressor.streamInput(in);
            }
            in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
            in.setVersion(request.version());
            synchronized (lastSeenClusterStateMutex) {
                final ClusterState incomingState;
                // If true we received full cluster state - otherwise diffs
                if (in.readBoolean()) {
                    incomingState = ClusterState.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode());
                    logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
                        request.bytes().length());
                } else if (lastSeenClusterState != null) {
                    Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode());
                    incomingState = diff.apply(lastSeenClusterState);
                    logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
                        incomingState.version(), incomingState.stateUUID(), request.bytes().length());
                } else {
                    logger.debug("received diff for but don't have any local cluster state - requesting full state");
                    throw new IncompatibleClusterStateVersionException("have no local cluster state");
                }
                // sanity check incoming state
                validateIncomingState(incomingState, lastSeenClusterState);

                pendingStatesQueue.addPending(incomingState); // 關鍵點,主要是加到pending佇列裡面
                lastSeenClusterState = incomingState;
            }
        } finally {
            IOUtils.close(in);
        }
        channel.sendResponse(TransportResponse.Empty.INSTANCE);
    }

  這裡就可以看到send只是確保data node節點接收到請求,但是並沒有進行處理先放在pendingStatesQueue中。進行回覆,主節點就知道這個data node能接收到訊息。後面master節點會發送commit請求過來。

commit請求

  COMMIT_ACTION_NAME,一樣的辦法ctrl+h搜尋,就可以看到這個action是怎麼註冊的,以及對應的處理邏輯。

protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {
        final ClusterState state = pendingStatesQueue.markAsCommitted(request.stateUUID,
            new PendingClusterStatesQueue.StateProcessedListener() {
            @Override
            public void onNewClusterStateProcessed() {  // 非同步框架會看到很多這樣的邏輯,處理完成之後就會呼叫sendResponse方法
                try {
                    // send a response to the master to indicate that this cluster state has been processed post committing it.
                    channel.sendResponse(TransportResponse.Empty.INSTANCE);
                } catch (Exception e) {
                    logger.debug("failed to send response on cluster state processed", e);
                    onNewClusterStateFailed(e);
                }
            }

            @Override
            public void onNewClusterStateFailed(Exception e) {
                try {
                    channel.sendResponse(e);
                } catch (Exception inner) {
                    inner.addSuppressed(e);
                    logger.debug("failed to send response on cluster state processed", inner);
                }
            }
        });
        if (state != null) {
            newPendingClusterStatelistener.onNewClusterState("master " + state.nodes().getMasterNode() +
                " committed version [" + state.version() + "]");  // 具體處理邏輯
        }
    }

  後續還是走到了ZenDistovery的處理邏輯。

private class NewPendingClusterStateListener implements PublishClusterStateAction.NewPendingClusterStateListener {

        @Override
        public void onNewClusterState(String reason) {
            processNextPendingClusterState(reason);
        }
    }

  processNextPendingClusterState最終會提交一個BatchedTask,具體的處理邏輯就又回到ClusterService裡面了,就對上上面的流程。但這裡要注意一點就是,
  特別注意!!!!!
  1、threadExecutor,跟進去初始化的邏輯就可以看到這個有限佇列的大小是1。是1,也就代表著如果這個優先佇列的節點沒有處理完,沒有remove掉,那麼這個執行緒池就會將後續的請求快取到workqueue。
  2、需要知道前面的所有的狀態更新是要提交到pendingStatesQueue,所以如果這個執行緒池一直被卡主,就會導致pendingStatesQueue請求一直在積累。這個pendingStatesQueue有一個邏輯就是大小是25,如果超過大小,就會將最早的狀態更新請求刪除掉。我們的工程上要對elasticsearch進行改動,添加了C++的邏輯,結果在這裡就遇到了一個坑,後端因為問題C++,死鎖了,結果這個執行緒池就一直在這裡卡主,後續的請求根本就進不來。導致pendingStatesQueue不斷的進行刪除,但是一直不能處理
  3、curl 127.0.0.1:9200/_cat/tasks?v 可以檢視後臺正在執行的任務。
  

public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException {
        if (tasks.isEmpty()) {
            return;
        }
        final BatchedTask firstTask = tasks.get(0);
        assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) :
            "tasks submitted in a batch should share the same batching key: " + tasks;
        // convert to an identity map to check for dups based on task identity
        final Map<Object, BatchedTask> tasksIdentity = tasks.stream().collect(Collectors.toMap(
            BatchedTask::getTask,
            Function.identity(),
            (a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); },
            IdentityHashMap::new));

        synchronized (tasksPerBatchingKey) {
            LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey,
                k -> new LinkedHashSet<>(tasks.size()));
            for (BatchedTask existing : existingTasks) {
                // check that there won't be two tasks with the same identity for the same batching key
                BatchedTask duplicateTask = tasksIdentity.get(existing.getTask());
                if (duplicateTask != null) {
                    throw new IllegalStateException("task [" + duplicateTask.describeTasks(
                        Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued");
                }
            }
            existingTasks.addAll(tasks);
        }

        if (timeout != null) {
            threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));
        } else {
            threadExecutor.execute(firstTask);
        }
    }