flume將kafka中topic資料匯入hive中
阿新 • • 發佈:2018-12-07
一、首先更加資料的表結構在hive中進行表的建立。
create table AREA1(unid string,area_punid string,area_no string,area_name string,area_dept_unid string,area_longitude string,area_latitude string,area_sortid string,create_time string) clustered by (unid) into 2 buckets stored as orc;
注意點: clustered by () into 2 buckets 和 stored as orc 要加上不然會報錯,我第一次弄的時候沒加,也是在網上找到這樣的解決方法。
二、建立完表之後,就可以開始編寫flume的配置檔案了,這是關鍵。
在flume的conf目錄下建立一個配置檔案 叫 kafkatohive.conf。然後進行下面的配置
flumeagent1.sources = source_from_kafka flumeagent1.channels = mem_channel flumeagent1.sinks = hive_sink # Define / Configure source flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource flumeagent1.sources.source_from_kafka.zookeeperConnect = 192.168.72.129:2181,192.168.72.130:2181,192.168.72.131:2181 flumeagent1.sources.source_from_kafka.topic = oracle-kafka flumeagent1.sources.source_from_kafka.channels = mem_channel flumeagent1.sources.source_from_kafka.interceptors = i1 flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000 # Hive Sink flumeagent1.sinks.hive_sink.type = hive flumeagent1.sinks.hive_sink.hive.metastore = thrift://192.168.72.129:9083 flumeagent1.sinks.hive_sink.hive.database = test flumeagent1.sinks.hive_sink.hive.table = AREA1 flumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2 flumeagent1.sinks.hive_sink.batchSize = 10 flumeagent1.sinks.hive_sink.serializer = DELIMITED flumeagent1.sinks.hive_sink.serializer.delimiter = , flumeagent1.sinks.hive_sink.serializer.fieldnames = unid,area_punid,area_no,area_name,area_dept_unid,area_longitude,area_latitude,area_sortid,create_time # Use a channel which buffers events in memory flumeagent1.channels.mem_channel.type = memory flumeagent1.channels.mem_channel.capacity = 10000 flumeagent1.channels.mem_channel.transactionCapacity = 100 # Bind the source and sink to the channel flumeagent1.sources.source_from_kafka.channels = mem_channel flumeagent1.sinks.hive_sink.channel = mem_channel
三、執行flume agent命令如下:
[[email protected] bin]# flume-ng agent -n flumeagent1 -f ../conf/flumetohive.conf
此時就可以把資料匯入到hive中了,為了能實時的匯入資料到hive中,可以建立一個agent從其他資料來源導資料到kafka其主題中,這樣只要設定好deley,應該就能實現準實時的資料流了吧。