Kafka(二)Kafka Connector與Debezium
Kafka Connector與Debezium
1.介紹
kafka connector 是連線kafka叢集和其他資料庫、叢集等系統的聯結器。kafka connector可以進行多種系統型別與kafka的連線,主要的任務包括從kafka讀(sink),向kafka寫(Source),所以聯結器也可以分為兩種:Source Connector、Sink Connector。
而kafka本身自帶的Connector只有三個:file connector、Sqlite connector、hdfs connector. 像連線類似於Mysql、Mongodb等資料庫需要有對應的第三方庫以Plugins的形式註冊到kafka connector 上。在這裡,我使用了debezium 來提供mysql connector 和mongodb connector的jar包。
2.Quickstart
在發起一個連線的過程中我們,不僅需要指明connector的配置屬性,同樣也需要指明kafka的schema屬性,以表明以何種方式去存貯或者讀取資料記錄。
命令格式:
path/to/kafka/bin/[connector-standalone/connector-distributed] [schema-properties] [connector-properties]
舉個例子:
./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties \
./etc/kafka/connect-file-source.properties
Rest Api命令格式:
curl -X POST [data] http://[IP]:[PORT]
預設情況下,kafka的埠為8083,我們可以通過REST Api去發起、終止和檢視一個connect。這部分可以參考官方文件。
3.Debezium的安裝
在Kafka中,debezium以Plugins的形式註冊到Kafka Connector上面,具體的做法只需要到ClassPath中註冊debezium的位置,並將jar包移動至/path/to/kafka/share/java/kafka-connect開頭的自定義資料夾中。
4.Kafka Connector工作原理及部分原始碼解析
秉承“原始碼之前,毫無祕密”的原則,現在我們來看Kafka Connector的啟動過程,我們以這裡作為切入點,來一覽全域性。
首先在Rest Api中呼叫Kafka的StartConnector建立連線:
private boolean startConnector(String connectorName) {
log.info("Starting connector {}", connectorName);
final Map<String, String> configProps = configState.connectorConfig(connectorName);
final ConnectorContext ctx = new HerderConnectorContext(this, connectorName);
final TargetState initialState = configState.targetState(connectorName);
boolean started = worker.startConnector(connectorName, configProps, ctx, this, initialState);
// Immediately request configuration since this could be a brand new connector. However, also only update those
// task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
// just restoring an existing connector.
if (started && initialState == TargetState.STARTED)
reconfigureConnectorTasksWithRetry(connectorName);
return started;
}
在這段程式碼中,我們可以清晰的看到它做了這幾件事:設定連線的屬性,取得連線的名稱和控制代碼,設定目標狀態,開始一個Worker的連線,設定成功之後檢查Connect的狀態,然後設定Task。
這部分會對連線的屬性進行簡單賦值,我們展開講下Connect的狀態,Connect的狀態分為STARTED和PAUSED。kafka connector內部通過狀態機思想去控制每個connector;task亦然。
在worker中的startConnector就較為複雜:
public boolean startConnector(
String connName,
Map<String, String> connProps,
ConnectorContext ctx,
ConnectorStatus.Listener statusListener,
TargetState initialState
) {
if (connectors.containsKey(connName))
throw new ConnectException("Connector with name " + connName + " already exists");
final WorkerConnector workerConnector;
ClassLoader savedLoader = plugins.currentThreadLoader();
try {
final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
log.info("Creating connector {} of type {}", connName, connClass);
final Connector connector = plugins.newConnector(connClass);
workerConnector = new WorkerConnector(connName, connector, ctx, statusListener);
log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
savedLoader = plugins.compareAndSwapLoaders(connector);
workerConnector.initialize(connConfig);
workerConnector.transitionTo(initialState);
Plugins.compareAndSwapLoaders(savedLoader);
} catch (Throwable t) {
log.error("Failed to start connector {}", connName, t);
// Can't be put in a finally block because it needs to be swapped before the call on
// statusListener
Plugins.compareAndSwapLoaders(savedLoader);
statusListener.onFailure(connName, t);
return false;
}
WorkerConnector existing = connectors.putIfAbsent(connName, workerConnector);
if (existing != null)
throw new ConnectException("Connector with name " + connName + " already exists");
log.info("Finished creating connector {}", connName);
return true;
}
在StartConnector中,我們可以看到它做了這樣的幾件事情:讀取註冊的Plugins並建立對應的Connector操作類,在監聽到Status狀態發生改變的時候建立workerConnector,初始化workConnector,run workConnector。
接下來我們在回到startConnector中,看到它還執行了reconfigureConnectorTasksWithRetry,這是用於檢查task在先前的歷史中是否已在執行,如果是的話,會將task的屬性、狀態、上下文、通通載入:
private void reconfigureConnector(final String connName, final Callback<Void> cb) {
try {
if (!worker.isRunning(connName)) {
log.info("Skipping reconfiguration of connector {} since it is not running", connName);
return;
}
Map<String, String> configs = configState.connectorConfig(connName);
ConnectorConfig connConfig;
List<String> sinkTopics = null;
if (worker.isSinkConnector(connName)) {
connConfig = new SinkConnectorConfig(plugins(), configs);
sinkTopics = connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG);
} else {
connConfig = new SourceConnectorConfig(plugins(), configs);
}
final List<Map<String, String>> taskProps
= worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
boolean changed = false;
int currentNumTasks = configState.taskCount(connName);
if (taskProps.size() != currentNumTasks) {
log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size());
changed = true;
} else {
int index = 0;
for (Map<String, String> taskConfig : taskProps) {
if (!taskConfig.equals(configState.taskConfig(new ConnectorTaskId(connName, index)))) {
log.debug("Change in task configurations, writing updated task configurations");
changed = true;
break;
}
index++;
}
}
if (changed) {
if (isLeader()) {
configBackingStore.putTaskConfigs(connName, taskProps);
cb.onCompletion(null, null);
} else {
// We cannot forward the request on the same thread because this reconfiguration can happen as a result of connector
// addition or removal. If we blocked waiting for the response from leader, we may be kicked out of the worker group.
forwardRequestExecutor.submit(new Runnable() {
@Override
public void run() {
try {
String reconfigUrl = RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks");
RestServer.httpRequest(reconfigUrl, "POST", taskProps, null);
cb.onCompletion(null, null);
} catch (ConnectException e) {
log.error("Request to leader to reconfigure connector tasks failed", e);
cb.onCompletion(e, null);
}
}
});
}
}
} catch (Throwable t) {
cb.onCompletion(t, null);
}
}
在這裡面除了判斷是否要恢復現場以及恢復現場的操作外,最重要的是將task的任務放到了configBackingStore的監聽佇列中,使得task在被監聽後能夠及時start,而如果這個broke不是我們選出的leader的話,它會將任務轉發。
關於原始碼部分,這邊我們先簡單介紹了,以後有時間在補充。