1. 程式人生 > >flume接收資料傳入hbase,並生成指定的rowkey和column

flume接收資料傳入hbase,並生成指定的rowkey和column

目的:flume從event中取出資料作為hbase的rowkey 使用flume接收資料,再傳入hbase中,要求中間資料不落地。 flume使用http source入口,使用sink連線hbase實現資料匯入,並且通過channels使flume的記憶體資料儲存到本地磁碟(防止叢集出現故障,資料可以備份至本地) 傳入資料格式為 http:10.0.0.1_{asdasd} 格式說明(url_資料)

hbase儲存的結果為: 在這裡插入圖片描述rowkey:當前時間_url value:資料 即要對傳入的資料進行切分,將url作為rowkey的一部分,當前時間作為另一部分,資料儲存到value中

步驟: 1.重寫flume中能指定rowkey的原始碼(HbaseEventSerializer介面)。再打成jar包 java原始碼見下面:

2.將製作jar包放入flume的/home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/lib目錄下 在這裡插入圖片描述 3.flume配置檔案

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.bind = 10.0.0.183

# Describe the sink
a1.sinks.k1.type = hbase
a1.sinks.k1.channel = c1
a1.sinks.k1.table = httpdata
a1.sinks.k1.columnFamily = a
a1.sinks.k1.serializer = com.hbase.Rowkey
a1.sinks.k1.channel = memoryChannel

# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/mathartsys/oyzm_test/flu-hbase/checkpoint/
a1.channels.c1.useDualCheckpoints = false
a1.channels.c1.dataDirs = /home/mathartsys/oyzm_test/flu-hbase/flumedir/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

4.在hbase中建表 create 'httpdata,‘a’

5.flume啟動命令 flume-ng agent -c . -f /mysoftware/flume-1.7.0/conf/hbase_simple.conf -n a1 -Dflume.root.logger=INFO,console

6.flume資料寫入命令 curl -X POST -d’[{“body”:“http:10.0.0.1_{asdasd}”}]’ http://10.0.0.183:44444

hbase中資料結果: 20181108104034_http:10.0.0.183 column=a:data, timestamp=1541644834926, value={asdasd}

java原始碼:
package com.hbase;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.flume.Context;  
import org.apache.flume.Event;
import org.apache.flume.conf.ComponentConfiguration;  
import org.apache.flume.sink.hbase.HbaseEventSerializer;  
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;

public class Rowkey implements HbaseEventSerializer {   
   //列族(不用管)
    private byte[] colFam="cf".getBytes();  
    //獲取檔案
    private Event currentEvent;  
    
    public void initialize(Event event, byte[] colFam) {  
        //byte[]位元組型陣列  
        this.currentEvent = event;
        this.colFam = colFam;  
    }  
    public void configure(Context context) {}  
    
    public void configure(ComponentConfiguration conf) {  
    }  
    
    //指定rowkey,單元格修飾名,值
    public List<Row> getActions() {  
         // 切分 currentEvent檔案 從中拿到的值
         String eventStr = new String(currentEvent.getBody());
         
         //body格式為:url_value
         String url = eventStr.split("_")[0];
         String data = eventStr.split("_")[1];
         
         //得到系統日期  
		 Date d = new Date();
		 SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss");
         //rowkey
         byte[] currentRowKey = (df.format(d)+"_"+url).getBytes(); 
         
         //hbase的put操作
         List<Row> puts = new ArrayList<Row>();  
         Put putReq = new Put(currentRowKey);  
         //putReq.addColumn  列族,單元格修飾名(可指定),值
         //putReq={"totalColumns":0,"families":{},"row":"d934e9adf3c540c8b58af1077fe7a0a39223370594393854807/webapp"}  
         putReq.addColumn(colFam,  "data".getBytes(), data.getBytes());  
         puts.add(putReq);                
         return puts;  
    }   
    public List<Increment> getIncrements() {  
        List<Increment> incs = new ArrayList<Increment>();      
        return incs;  
    }  
   //關閉流
    public void close() {  
        colFam = null;  
        currentEvent = null;  
    }  
} 

pom檔案:
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <dependency>
	    <groupId>org.apache.flume.flume-ng-sinks</groupId>
		<artifactId>flume-ng-hbase-sink</artifactId>
		<version>1.7.0</version>
	</dependency>
   	<dependency>
		<groupId>org.apache.hbase</groupId>
		<artifactId>hbase-client</artifactId>
		<version>1.2.4</version>
	</dependency>
	<dependency>
         <groupId>jdk.tools</groupId>
         <artifactId>jdk.tools</artifactId>
         <version>1.8</version>
         <scope>system</scope>
         <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
	</dependency>
  </dependencies>
</project>