1. 程式人生 > >記一次OGG數據寫入HBase的丟失數據原因分析

記一次OGG數據寫入HBase的丟失數據原因分析

hat xdg column 安裝 tint b- 主鍵 取余 bst

一、現象
二、原因排查
2.1 SparkStreaming程序排查
2.2 Kafka數據驗證
2.3 查看OGG源碼
2.3.1 生成Kafka消息類
2.3.2 Kafka配置類
2.3.3 Kafka 消息發送類
2.3.4 Kafka 分區獲取方式
三、結論


一、現象

目前我們的數據是通過OGG->Kafka->Spark Streaming->HBase。由於之前我們發現HBase的列表put無法保證順序,因此改了程序,如果是在同一個SparkStreaming的批次裏面對同一條數據進行操作,則寫入HBase的數據時間戳就非常相近,只會差幾毫秒,如果是不同批次則會差好幾秒。此為背景。

現在有一條數據,理應先刪除再插入,但是結果變成了先插入再刪除,結果如下

  1. hbase(main):002:0> get ‘XDGL_ACCT_PAYMENT_SCHEDULE‘,‘e5ad-***‘, {COLUMN=>‘cf1:SQLTYPE‘,VERSIONS=>10}
  2. COLUMN CELL
  3. cf1:SQLTYPE timestamp=1498445308420, value=D
  4. cf1:SQLTYPE timestamp=1498445301336, value=I

其中,兩條記錄的時間戳換算過來正好相差了7秒
2017-06-26 10:48:21 I
2017-06-26 10:48:28 D

很明顯這兩條數據並沒有在同一個批次得到處理,很明顯Spark獲取到數據的先後順序出了點問題。

二、原因排查

2.1 SparkStreaming程序排查

首先SparkStream接收到數據後根據數據的pos排序,然後再根據主鍵排序。從現象看,是SparkStreaming分了兩個批次才拿到,而SparkStreaming從Kafka拿數據也是順序拿的。那麽出現問題的可能性就只有兩個:
1、OGG發給Kafka的數據順序是錯誤的。
2、OGG發給Kafka的數據順序是正確的,但是發到了不同的Kafka Partition。

2.2 Kafka數據驗證

為了驗證上面的兩個猜想,我把kafka的數據再次獲取出來進行分析。重點分析數據的partition、key、value。
得到的結果如下:

技術分享

可以看到數據的同一個表數據寫到了不同的分區,可以看到OGG的同一分區下的數據順序是正確的。
正好說明2.1裏面的第二個猜想。看來是OGG寫入的時候並沒有按照數據的表名寫入不同的分區。

在OGG 文檔
http://docs.oracle.com/goldengate/bd1221/gg-bd/GADBD/GUID-2561CA12-9BAC-454B-A2E3-2D36C5C60EE5.htm#GADBD449
中的 5.1.4 Kafka Handler Configuration 的屬性 gg.handler.kafkahandler.ProducerRecordClass 裏面提到了,默認使用的是oracle.goldengate.handler.kafka.DefaultProducerRecord這個類對表名進行分區的。如果要自定義的話需要實現CreateProducerRecord這個接口

原話是 The unit of data in Kafka - a ProducerRecord holds the key field with the value representing the payload. This key is used for partitioning a Kafka Producer record that holds change capture data. By default, the fully qualified table name is used to partition the records. In order to change this key or behavior, theCreateProducerRecord Kafka Handler Interface needs to be implemented and this property needs to be set to point to the fully qualified name of the custom ProducerRecord class.

然而寫入kafka的結果卻不是這樣子的。這點讓人費解。看來我們需要查看OGG的源代碼。

2.3 查看OGG源碼

在OGG的安裝包裏面有一個名叫ggjava/resources/lib/ggkafka-****.jar的文件,我們將其導入一個工程之後就可以直接看到它的源代碼了。

技術分享

2.3.1 生成Kafka消息類

我們直接查看oracle.goldengate.handler.kafka.DefaultProducerRecord這個類

  1. public class DefaultProducerRecord implements CreateProducerRecord {
  2. public DefaultProducerRecord() {
  3. }
  4. public ProducerRecord createProducerRecord(String topicName, Tx transaction, Op operation, byte[] data, TxOpMode handlerMode) {
  5. ProducerRecord pr;
  6. if(handlerMode.isOperationMode()) {
  7. pr = new ProducerRecord(topicName, operation.getTableName().getOriginalName().getBytes(), data);
  8. } else {
  9. pr = new ProducerRecord(topicName, (Object)null, data);
  10. }
  11. return pr;
  12. }
  13. }

這個類只返回一個ProducerRecord,這個是用於發送給Kafka的一條消息。我們先不管這個,繼續看他是如何寫給kafka的

2.3.2 Kafka配置類

首先是OGG與Kafka相關的配置類 oracle.goldengate.handler.kafka.impl.KafkaProperties 。這個類裏面定義了一堆參數,我們只需要關心partitioner.class這個參數,該參數用於定義寫入Kafka的時候獲取分區的類。很遺憾,這個類沒有該參數配置。

2.3.3 Kafka 消息發送類

這裏有一個抽象類oracle.goldengate.handler.kafka.impl.AbstractKafkaProducer,他有兩個子類,分別叫BlockingKafkaProducerNonBlockingKafkaProducer (默認是NonBlockingKafkaProducer)
這兩個類都是直接將通過producer對象將record發送給了kafka。因此想要指導Kafka的分區信息還需要看Kafka是怎麽獲取分區的。

2.3.4 Kafka 分區獲取方式

進入kafka的producer發送record的函數

  1. public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
  2. return send(record, null);
  3. }
  4. public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  5. // intercept the record, which can be potentially modified; this method does not throw exceptions
  6. ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
  7. return doSend(interceptedRecord, callback);
  8. }

發送的方法在doSend裏面,裏面內容很多,請看我勾出來的這兩段

技術分享

技術分享

由於寫入的時候都沒有對Record指定分區,因此這段代碼的partition都為空。所以代碼總會執行到 this.partitioner.partition(record.topic(), record.key(), serializedKey,record.value(), serializedValue,cluster)
該函數是kafka的Partitioner這個抽象類裏面的
技術分享

由於2.3.2 Kafka配置類中沒有指定分區的class,因此只會使用Kafka默認的分區類org.apache.kafka.clients.producer.internals.DefaultPartitioner
技術分享

private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
這裏先是獲取了一個隨機值,然後再獲取了Kafka中對應topic的可用分區列表,然後根據分區數和隨機值進行取余得到分區數的值。

流程走到這裏,我們基本可以得到一個結論。

  • Kafka的record指定了分區,則會使用指定的分區寫入;否則進行下一個判斷;
  • Kafka根據自己定義的partitioner接口進行分區,如果沒指定類,則使用默認的分區則進行下一個判斷;
  • Kafka獲取record中的key進行分區,如果key不為空,則使用Hash分區,如果為空,基本上就是隨機分配分區了。

三、結論

事情到了這裏,我們可以斷定,寫入分區錯亂的問題是因為gg.handler.kafkahandler.Mode是事務模式,導致多條消息一次發送了,無法使用表名作為key,OGG就用了null作為key發送給了Kafka,最終Kafka拿到空值之後只能隨機發送給某個partition,所以才會出現這樣的問題。

最終,修改了ogg的操作模式之後可以看到,寫入的分區正常了。
技術分享

記一次OGG數據寫入HBase的丟失數據原因分析