1. 程式人生 > >Kafka(二)Kafka Connector與Debezium

Kafka(二)Kafka Connector與Debezium

Kafka Connector與Debezium

1.介紹

kafka connector 是連線kafka叢集和其他資料庫、叢集等系統的聯結器。kafka connector可以進行多種系統型別與kafka的連線,主要的任務包括從kafka讀(sink),向kafka寫(Source),所以聯結器也可以分為兩種:Source Connector、Sink Connector。

而kafka本身自帶的Connector只有三個:file connector、Sqlite connector、hdfs connector. 像連線類似於Mysql、Mongodb等資料庫需要有對應的第三方庫以Plugins的形式註冊到kafka connector 上。在這裡,我使用了debezium 來提供mysql connector 和mongodb connector的jar包。

2.Quickstart

在發起一個連線的過程中我們,不僅需要指明connector的配置屬性,同樣也需要指明kafka的schema屬性,以表明以何種方式去存貯或者讀取資料記錄。

命令格式:

path/to/kafka/bin/[connector-standalone/connector-distributed] [schema-properties] [connector-properties]


舉個例子:

./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties \
      ./etc/kafka/connect-file-source.properties
在這種執行方式中,我們的kafka server存在本地,故可以直接執行對應的connect檔案發起連線。不同properties的配置根據kafka connector的具體實現不同而不同。同樣我們可以在kafka的Rest Api裡做同樣的事情,實際的生產活動中,我們也是這樣做的,因為kafka server不可能部署在本地。

Rest Api命令格式:

curl -X POST [data] http://[IP]:[PORT]

預設情況下,kafka的埠為8083,我們可以通過REST Api去發起、終止和檢視一個connect。這部分可以參考官方文件。

3.Debezium的安裝

在Kafka中,debezium以Plugins的形式註冊到Kafka Connector上面,具體的做法只需要到ClassPath中註冊debezium的位置,並將jar包移動至/path/to/kafka/share/java/kafka-connect開頭的自定義資料夾中。

4.Kafka Connector工作原理及部分原始碼解析

秉承“原始碼之前,毫無祕密”的原則,現在我們來看Kafka Connector的啟動過程,我們以這裡作為切入點,來一覽全域性。

首先在Rest Api中呼叫Kafka的StartConnector建立連線:

private boolean startConnector(String connectorName) {
        log.info("Starting connector {}", connectorName);
        final Map<String, String> configProps = configState.connectorConfig(connectorName);
        final ConnectorContext ctx = new HerderConnectorContext(this, connectorName);
        final TargetState initialState = configState.targetState(connectorName);
        boolean started = worker.startConnector(connectorName, configProps, ctx, this, initialState);

        // Immediately request configuration since this could be a brand new connector. However, also only update those
        // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
        // just restoring an existing connector.
        if (started && initialState == TargetState.STARTED)
            reconfigureConnectorTasksWithRetry(connectorName);

        return started;
    }

在這段程式碼中,我們可以清晰的看到它做了這幾件事:設定連線的屬性,取得連線的名稱和控制代碼,設定目標狀態,開始一個Worker的連線,設定成功之後檢查Connect的狀態,然後設定Task。

這部分會對連線的屬性進行簡單賦值,我們展開講下Connect的狀態,Connect的狀態分為STARTED和PAUSED。kafka connector內部通過狀態機思想去控制每個connector;task亦然。

在worker中的startConnector就較為複雜:

public boolean startConnector(
            String connName,
            Map<String, String> connProps,
            ConnectorContext ctx,
            ConnectorStatus.Listener statusListener,
            TargetState initialState
    ) {
        if (connectors.containsKey(connName))
            throw new ConnectException("Connector with name " + connName + " already exists");

        final WorkerConnector workerConnector;
        ClassLoader savedLoader = plugins.currentThreadLoader();
        try {
            final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
            final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
            log.info("Creating connector {} of type {}", connName, connClass);
            final Connector connector = plugins.newConnector(connClass);
            workerConnector = new WorkerConnector(connName, connector, ctx, statusListener);
            log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
            savedLoader = plugins.compareAndSwapLoaders(connector);
            workerConnector.initialize(connConfig);
            workerConnector.transitionTo(initialState);
            Plugins.compareAndSwapLoaders(savedLoader);
        } catch (Throwable t) {
            log.error("Failed to start connector {}", connName, t);
            // Can't be put in a finally block because it needs to be swapped before the call on
            // statusListener
            Plugins.compareAndSwapLoaders(savedLoader);
            statusListener.onFailure(connName, t);
            return false;
        }

        WorkerConnector existing = connectors.putIfAbsent(connName, workerConnector);
        if (existing != null)
            throw new ConnectException("Connector with name " + connName + " already exists");

        log.info("Finished creating connector {}", connName);
        return true;
    }

在StartConnector中,我們可以看到它做了這樣的幾件事情:讀取註冊的Plugins並建立對應的Connector操作類,在監聽到Status狀態發生改變的時候建立workerConnector,初始化workConnector,run workConnector。

接下來我們在回到startConnector中,看到它還執行了reconfigureConnectorTasksWithRetry,這是用於檢查task在先前的歷史中是否已在執行,如果是的話,會將task的屬性、狀態、上下文、通通載入:

    private void reconfigureConnector(final String connName, final Callback<Void> cb) {
        try {
            if (!worker.isRunning(connName)) {
                log.info("Skipping reconfiguration of connector {} since it is not running", connName);
                return;
            }

            Map<String, String> configs = configState.connectorConfig(connName);

            ConnectorConfig connConfig;
            List<String> sinkTopics = null;
            if (worker.isSinkConnector(connName)) {
                connConfig = new SinkConnectorConfig(plugins(), configs);
                sinkTopics = connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG);
            } else {
                connConfig = new SourceConnectorConfig(plugins(), configs);
            }

            final List<Map<String, String>> taskProps
                    = worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
            boolean changed = false;
            int currentNumTasks = configState.taskCount(connName);
            if (taskProps.size() != currentNumTasks) {
                log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size());
                changed = true;
            } else {
                int index = 0;
                for (Map<String, String> taskConfig : taskProps) {
                    if (!taskConfig.equals(configState.taskConfig(new ConnectorTaskId(connName, index)))) {
                        log.debug("Change in task configurations, writing updated task configurations");
                        changed = true;
                        break;
                    }
                    index++;
                }
            }
            if (changed) {
                if (isLeader()) {
                    configBackingStore.putTaskConfigs(connName, taskProps);
                    cb.onCompletion(null, null);
                } else {
                    // We cannot forward the request on the same thread because this reconfiguration can happen as a result of connector
                    // addition or removal. If we blocked waiting for the response from leader, we may be kicked out of the worker group.
                    forwardRequestExecutor.submit(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                String reconfigUrl = RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks");
                                RestServer.httpRequest(reconfigUrl, "POST", taskProps, null);
                                cb.onCompletion(null, null);
                            } catch (ConnectException e) {
                                log.error("Request to leader to reconfigure connector tasks failed", e);
                                cb.onCompletion(e, null);
                            }
                        }
                    });
                }
            }
        } catch (Throwable t) {
            cb.onCompletion(t, null);
        }
    }
在這裡面除了判斷是否要恢復現場以及恢復現場的操作外,最重要的是將task的任務放到了configBackingStore的監聽佇列中,使得task在被監聽後能夠及時start,而如果這個broke不是我們選出的leader的話,它會將任務轉發。

關於原始碼部分,這邊我們先簡單介紹了,以後有時間在補充。