1. 程式人生 > >hbase與flume整合程式設計

hbase與flume整合程式設計

1、官網下載src包,解壓,需要匯入的——》flume-ng-sinks——》flume-ng-hbase-sink

2、編輯SimpleAsyncHbaseEventSerializer:複製一份重新命名為MySimpleAsyncHbaseEventSerializer,修改程式碼,案例:

@Override
  public List<PutRequest> getActions() {
    List<PutRequest> actions = new ArrayList<PutRequest>();
    if (payloadColumn != null) {
      byte[] rowKey;
      try {
        
    	  String[] columns =new String(this.payloadColumn).split(",");
    	  String[] values = new String(this.payload).split(",");
    	  
    	  for(int i=0;i<columns.length;i++) {
    		  
    		  byte[] colColumn = columns[i].getBytes(Charsets.UTF_8);
    		  byte[] colValue = values[i].getBytes(Charsets.UTF_8);
    		  
    		  if(columns.length != values.length) break;
    		  
    		  String datetime = String.valueOf(values[0]);
    		  String userid = String.valueOf(values[1]);	  
    		  rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid, datetime);
    		  
    		  PutRequest putRequest =  new PutRequest(table, rowKey, cf,
    				  colColumn, colValue);
    		        actions.add(putRequest);
    	  }    
      } catch (Exception e) {
        throw new FlumeException("Could not get row key!", e);
      }
    }
    return actions;
  }

可以自定義rowkey:編輯SimpleRowKeyGenerator:

public class SimpleRowKeyGenerator {

  public static byte[] getUUIDKey(String prefix) throws UnsupportedEncodingException {
    return (prefix + UUID.randomUUID().toString()).getBytes("UTF8");
  }

  public static byte[] getRandomKey(String prefix) throws UnsupportedEncodingException {
    return (prefix + String.valueOf(new Random().nextLong())).getBytes("UTF8");
  }

  public static byte[] getTimestampKey(String prefix) throws UnsupportedEncodingException {
    return (prefix + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
  }

  public static byte[] getNanoTimestampKey(String prefix) throws UnsupportedEncodingException {
    return (prefix + String.valueOf(System.nanoTime())).getBytes("UTF8");
  }
  
  public static byte[] getKfkRowKey(String userid, String datetime) throws UnsupportedEncodingException {
	    return (userid+datetime+ String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
	  }
}

3、需要將剛才編輯的指定預設:

編輯AsyncHBaseSink,查詢SimpleAsyncHbaseEventSerializer,改為MySimpleAsyncHbaseEventSerializer,如下:

    if (eventSerializerType == null || eventSerializerType.isEmpty()) {
      eventSerializerType =
          "org.apache.flume.sink.hbase.MyAsyncHbaseEventSerializer";
      logger.info("No serializer defined, Will use default");
    }

4、打jar包,名字改為flum的lib目錄下的對應jar包同名,傳入lib目錄,覆蓋原jar包即可