1. 程式人生 > >使用flume將kafka資料sink到HBase

使用flume將kafka資料sink到HBase

1. hbase sink介紹

如果還不瞭解flume請檢視我寫的其他flume下的部落格。

接下來的內容主要來自flume官方文件的學習。

hbase的sink主要有以下兩種。兩種方式都提供和HBASE一樣的一致性保證,即行級原子性

1.1 HbaseSink

agent的配置時提供兩種序列化模式:

  1. SimpleHbaseEventSerializer: 將整個事件body部分當做完整的一列寫入hbase
  2. RegexHbaseEventSerializer: 根據正則表示式將event body拆分到不同的列當中

優點:
安全性較高:支援往secure hbase寫資料(hbase可以開啟kerberos校驗)

缺點:
效能沒有後面的那種AsyncHBaseSink高

1.2 AsyncHbaseSink

非同步的Sink,可見速度是比前者快的,但是不支援往Secure Hbase寫資料。

採用的序列化器是:SimpleAsyncHbaseEventSerializer,也支援將event body分割成多個列,插入到對應KEY的ROW裡

2. 配置flume

我們這裡hbase沒有開啟安全相關選項,一般這叢集也主要在內網環境。所以我們這裡採用AsyncHbaseSink來進行本次操作。source則為kafka。

channel我們也選用kafka channel。之所以選擇kafka channel的依據可以參考

flume中各類channel分析對比

配置檔案如下:

# ------------------- 定義資料流----------------------
# source的名字
agent.sources = kafkaSource
# channels的名字,建議按照type來命名
agent.channels = kafkaChannel
# sink的名字,建議按照目標來命名
agent.sinks = hbaseSink


# ---------------------定義source和sink的繫結關係----------------

# 指定source使用的channel名字
agent.sources.kafkaSource.channels = kafkaChannel
# 指定sink需要使用的channel的名字,注意這裡是channel
agent.sinks.hbaseSink.channel = kafkaChannel

#-------- kafkaSource相關配置-----------------
# 定義訊息源型別
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
# 定義kafka所在zk的地址
agent.sources.kafkaSource.zookeeperConnect = 10.45.9.139:2181
# 配置消費的kafka topic
agent.sources.kafkaSource.topic = my-topic-test
# 配置消費者組的id
agent.sources.kafkaSource.groupId = flume
# 消費超時時間,參照如下寫法可以配置其他所有kafka的consumer選項。注意格式從kafka.xxx開始是consumer的配置屬性
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100



#------- kafkaChannel相關配置-------------------------
# channel型別
agent.channels.kafkaChannel.type = org.aprache.flume.channel.kafka.KafkaChannel
# channel儲存的事件容量,即佇列長度
agent.channels.kafkaChannel.capacity=10000
# 事務容量
agent.channels.kafkaChannel.transactionCapacity=1000
# kafka broker list
agent.channels.kafkaChannel.brokerList=mysql1:9092,mysql4:9092
# 指定topic
agent.channels.topic=flume
# 指定zk地址
agent.channels.kafkaChannel.zookeeperConnect=10.45.9.139:2181
# 指定producer的選項,關鍵是指定acks的值,保證訊息傳送的可靠性,retries採用預設的3
agent.channels.kafkaChannel.kafka.producer.acks=all


#---------hbaseSink 相關配置------------------
# 指定sink型別。PS:如果使用RegexHbaseEventSerializer只能使用hbase型別
# agent.sinks.hbaseSink.type = hbase
agent.sinks.hbaseSink.type = asynchbase
# 指定hbase中的表名
agent.sinks.hbaseSink.table = student
# 指明column family
agent.sinks.hbaseSink.columnFamily= info
# 使用的serializer
agent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
# 如果需要使用正則處理value可以使用以下的serializer
#agent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# 指定正則表示式,這裡用的正則是匹配逗號分隔的字串
#agent.sinks.hbaseSink.serializer.regex= ^([^,]+),([^,]+),([^,]+),([^,]+)$
# 指定在列族中對應的的colName
# agent.sinks.hbaseSink.serializer.colNames=c1,c2,c3


# 指定hbase所用的zk集合
agent.sinks.hbaseSink.zookeeperQuorum = mysql3:2181,mysql4:2181,mysql5:2181

3. 執行測試flume

在$FLUME_HOME/bin下執行以下命令執行。後臺會開啟一個Application 的Java程序

nohup sh flume-ng agent --conf-file ../conf/flume-conf.properties --name agent -Dflume.root.logger=INFO,console &

然後寫個kafka的producer往我們前面定義的my-topic-test中寫訊息。
producer的程式碼如下:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @author Wan Kaiming on 2016/8/1
 * @version 1.0
 */
public class MyProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        //broker地址 這裡用域名,記得修改本地的hosts檔案
        props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
        //訊息可靠性語義
        props.put("acks", "all");
        //請求broker失敗進行重試的次數,避免由於請求broker失敗造成的訊息重複
        props.put("retries", 0);
        //按批發送,每批的訊息數量
        props.put("batch.size", 16384);
        //防止來不及傳送,延遲一點點時間,使得能夠批量傳送訊息
        props.put("linger.ms", 1);
        //緩衝大小,bytes
        props.put("buffer.memory", 33554432);
        //key的序列化類
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //value的序列化類
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //建立一個Producer物件,載入配置上下文資訊
        Producer<String, String> producer = new KafkaProducer<String,String>(props);

        for(int i=0;i<6;i++){
            producer.send(new ProducerRecord<String, String>("my-topic-test", "hello", "world"));
        }

//        while(true){
//            //呼叫send方法進行傳送。send方法將訊息加到快取,非同步傳送
//            producer.send(new ProducerRecord<String, String>("my-topic", "hello", "world"));
//        }

        producer.close();
    }
}

執行完畢後可以去hbase shell中檢查下:

總結:可見key全部是由flume自動生成的。傳送給kafka的 value值"world"全部成功儲存到HBASE

PS:

  1. 最後一行多出來的incRow是Flume的SimpleAsyncHbaseEventSerializer中使用的。用來統計行數的,每次都在最後一行,效果就是一個計數的count。
  2. 這裡產生的行的名字是pCol和iCol都是SimpleAsyncHbaseEventSerializer的預設值,其實可以自定義指定

總結:可見,如果需要更加自由的對寫入HBASE的資料做自定義,建議需要了解下這個Event序列化類的原始碼,然後可以自定義序列化類

4. 使用RegexHbaseEventSerializer來處理些HBASE的值

  1. 修改flume的配置檔案,改用RegexHbaseEventSerializer,我使用的配置檔案如下
# ------------------- 定義資料流----------------------
# source的名字
agent.sources = kafkaSource
# channels的名字,建議按照type來命名
agent.channels = kafkaChannel
# sink的名字,建議按照目標來命名
agent.sinks = hbaseSink


# ---------------------定義source和sink的繫結關係----------------

# 指定source使用的channel名字
agent.sources.kafkaSource.channels = kafkaChannel
# 指定sink需要使用的channel的名字,注意這裡是channel
agent.sinks.hbaseSink.channel = kafkaChannel

#-------- kafkaSource相關配置-----------------
# 定義訊息源型別
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
# 定義kafka所在zk的地址
agent.sources.kafkaSource.zookeeperConnect = 10.45.9.139:2181
# 配置消費的kafka topic
agent.sources.kafkaSource.topic = my-topic-regex
# 配置消費者組的id
agent.sources.kafkaSource.groupId = flume
# 消費超時時間,參照如下寫法可以配置其他所有kafka的consumer選項。注意格式從kafka.xxx開始是consumer的配置屬性
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100
#------- kafkaChannel相關配置-------------------------
# channel型別
agent.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel
# channel儲存的事件容量,即佇列長度
agent.channels.kafkaChannel.capacity=10000
# 事務容量
agent.channels.kafkaChannel.transactionCapacity=1000
# kafka broker list
agent.channels.kafkaChannel.brokerList=mysql1:9092,mysql4:9092
# 指定topic
agent.channels.kafkaChannel.topic=flume-regex-channel
# 指定zk地址
agent.channels.kafkaChannel.zookeeperConnect=10.45.9.139:2181
# 指定producer的選項,關鍵是指定acks的值,保證訊息傳送的可靠性,retries採用預設的3
# agent.channels.kafkaChannel.kafka.producer.acks=all

#---------hbaseSink 相關配置------------------
# 指定sink型別
# agent.sinks.hbaseSink.type = asynchbase
agent.sinks.hbaseSink.type = hbase
# 指定hbase中的表名
agent.sinks.hbaseSink.table = student
# 指明column family
agent.sinks.hbaseSink.columnFamily = info
# 使用的serializer
# agent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

# 如果需要使用正則處理value可以使用以下的serializer
agent.sinks.hbaseSink.serializer= org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# 指定某一列來當主鍵,而不是用隨機生成的key
# agent.sinks.hbaseSink.serializer.rowKeyIndex = 0
# 指定正則表示式,這裡用的正則是匹配逗號分隔的字串
agent.sinks.hbaseSink.serializer.regex=^([^,]+),([^,]+),([^,]+),([^,]+)$
# 指定在列族中對應的的colName
agent.sinks.hbaseSink.serializer.colNames=c1,c2,c3,c4


# 指定hbase所用的zk集合
agent.sinks.hbaseSink.zookeeperQuorum = mysql3:2181,mysql4:2181,mysql5:2181
  1. 修改producer檔案,將value值傳送"one,two,three,four"可以匹配正則。執行結果如下圖:

PS:建議把lfume預設的JVM大小改大點,並且開啟JMX方便監控JVM

vi $FLUME_HOME/bin/flume-ng
# set default params
# 若干內容...
JAVA_OPTS="-Xmx1500m -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
LD_LIBRARY_PATH=""
# 若干內容...

5. 效率測試

測試按照第四節的配置來進行。生產者程式碼如下:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Date;
import java.util.Properties;

/**
 * @author Wan Kaiming on 2016/8/1
 * @version 1.0
 */
public class MyProducer {
    public static void main(String[] args) {

        //統計時間
        System.out.println("程式開始時間戳資訊:"+new Date());
        final long startTime=System.currentTimeMillis();

        Properties props = new Properties();
        //broker地址 這裡用域名,記得修改本地的hosts檔案
        props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
        //訊息可靠性語義
        props.put("acks", "all");
        //請求broker失敗進行重試的次數,避免由於請求broker失敗造成的訊息重複
        props.put("retries", 3);
        //按批發送,每批的訊息數量
        //props.put("batch.size", 16384);
        props.put("batch.size", 16384);
        //防止來不及傳送,延遲一點點時間,使得能夠批量傳送訊息
        props.put("linger.ms", 1);
        //緩衝大小,bytes
        props.put("buffer.memory", 33554432);
        //key的序列化類
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //value的序列化類
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //建立一個Producer物件,載入配置上下文資訊
        Producer<String, String> producer = new KafkaProducer<String,String>(props);

        for(int i=0;i<1000000;i++){
            producer.send(new ProducerRecord<String, String>("my-topic-regex", Integer.toString(i), "one,two,three,four"));
        }

        producer.close();

        final long endTime=System.currentTimeMillis();
        float excTime=(float)(endTime-startTime)/1000;
        System.out.println("執行時間:"+excTime+"s");
        System.out.println("當前時間為:"+ new Date());
    }
}

測試基本資訊:

名稱 資訊
PC硬體資訊 Intel(R) Xeon(R) CPU E7- 4830 @ 2.13GHz,記憶體4G
JAVA JDK1.8
KAFKA 2臺broker(heap size =1200M),1個topic,5個分割槽,2個複製分割槽,acks=all
flume 1.60版本,heap size=1.5G,kafkachannel,kafka source,hbase sink。
負載資訊 100萬條訊息,每條訊息20byte的樣子
hbase 1臺master,3臺slave,其中1臺slave和master在一臺機器,版本0.98-hadoop2

測試結果1(kafkachannel=0):

  1. kafka傳送訊息時間:6.912s
  2. hbase接受完全部訊息:4分33s
  3. 延遲時間:4分26s

測試結果2(kafkachannel=all):

  1. kafka傳送訊息時間:8.25s
  2. hbase接受完全部訊息:4分59s
  3. 延遲時間:4分51s

PS: 測試會存在一定誤差。因為讀取hbase的時候是按照1000條的批大小批量讀取的,count完整個HBASE的記錄本身也會花很多時間。也就是意味著,實際的延遲時間肯定比我測試的要小。測試1的時候,32.8萬條訊息,花費時間約為120s,得到吞吐量TPS=2733。該值基本比較準確。

綜上,在保證訊息可靠性前提下,kafka訊息通過flume寫hbase的吞吐量TPS基本在3K左右這個數量級。相信經過更多的配置優化、硬體效能提升、增大JVM堆等方式,提升TPS不是問題。