1. 程式人生 > >flume將kafka中topic資料匯入hive中

flume將kafka中topic資料匯入hive中

一、首先更加資料的表結構在hive中進行表的建立。

        

 create table AREA1(unid string,area_punid string,area_no string,area_name string,area_dept_unid string,area_longitude string,area_latitude string,area_sortid string,create_time string) 
clustered by (unid) into 2 buckets 
stored as orc;                

注意點: clustered by () into 2 buckets 和 stored as orc 要加上不然會報錯,我第一次弄的時候沒加,也是在網上找到這樣的解決方法。

二、建立完表之後,就可以開始編寫flume的配置檔案了,這是關鍵。

在flume的conf目錄下建立一個配置檔案 叫 kafkatohive.conf。然後進行下面的配置

flumeagent1.sources = source_from_kafka
flumeagent1.channels = mem_channel
flumeagent1.sinks = hive_sink
# Define / Configure source
flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
flumeagent1.sources.source_from_kafka.zookeeperConnect = 192.168.72.129:2181,192.168.72.130:2181,192.168.72.131:2181
flumeagent1.sources.source_from_kafka.topic = oracle-kafka
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sources.source_from_kafka.interceptors = i1
flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp
flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000
# Hive Sink
flumeagent1.sinks.hive_sink.type = hive
flumeagent1.sinks.hive_sink.hive.metastore = thrift://192.168.72.129:9083
flumeagent1.sinks.hive_sink.hive.database = test
flumeagent1.sinks.hive_sink.hive.table = AREA1
flumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2
flumeagent1.sinks.hive_sink.batchSize = 10
flumeagent1.sinks.hive_sink.serializer = DELIMITED
flumeagent1.sinks.hive_sink.serializer.delimiter = ,
flumeagent1.sinks.hive_sink.serializer.fieldnames = unid,area_punid,area_no,area_name,area_dept_unid,area_longitude,area_latitude,area_sortid,create_time 
# Use a channel which buffers events in memory
flumeagent1.channels.mem_channel.type = memory
flumeagent1.channels.mem_channel.capacity = 10000
flumeagent1.channels.mem_channel.transactionCapacity = 100
# Bind the source and sink to the channel
flumeagent1.sources.source_from_kafka.channels = mem_channel
flumeagent1.sinks.hive_sink.channel = mem_channel

 

三、執行flume agent命令如下:

[[email protected] bin]# flume-ng agent -n flumeagent1 -f ../conf/flumetohive.conf 


此時就可以把資料匯入到hive中了,為了能實時的匯入資料到hive中,可以建立一個agent從其他資料來源導資料到kafka其主題中,這樣只要設定好deley,應該就能實現準實時的資料流了吧。