hbase與flume整合程式設計
阿新 • • 發佈:2019-01-01
1、官網下載src包,解壓,需要匯入的——》flume-ng-sinks——》flume-ng-hbase-sink
2、編輯SimpleAsyncHbaseEventSerializer:複製一份重新命名為MySimpleAsyncHbaseEventSerializer,修改程式碼,案例:
@Override public List<PutRequest> getActions() { List<PutRequest> actions = new ArrayList<PutRequest>(); if (payloadColumn != null) { byte[] rowKey; try { String[] columns =new String(this.payloadColumn).split(","); String[] values = new String(this.payload).split(","); for(int i=0;i<columns.length;i++) { byte[] colColumn = columns[i].getBytes(Charsets.UTF_8); byte[] colValue = values[i].getBytes(Charsets.UTF_8); if(columns.length != values.length) break; String datetime = String.valueOf(values[0]); String userid = String.valueOf(values[1]); rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid, datetime); PutRequest putRequest = new PutRequest(table, rowKey, cf, colColumn, colValue); actions.add(putRequest); } } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; }
可以自定義rowkey:編輯SimpleRowKeyGenerator:
public class SimpleRowKeyGenerator { public static byte[] getUUIDKey(String prefix) throws UnsupportedEncodingException { return (prefix + UUID.randomUUID().toString()).getBytes("UTF8"); } public static byte[] getRandomKey(String prefix) throws UnsupportedEncodingException { return (prefix + String.valueOf(new Random().nextLong())).getBytes("UTF8"); } public static byte[] getTimestampKey(String prefix) throws UnsupportedEncodingException { return (prefix + String.valueOf(System.currentTimeMillis())).getBytes("UTF8"); } public static byte[] getNanoTimestampKey(String prefix) throws UnsupportedEncodingException { return (prefix + String.valueOf(System.nanoTime())).getBytes("UTF8"); } public static byte[] getKfkRowKey(String userid, String datetime) throws UnsupportedEncodingException { return (userid+datetime+ String.valueOf(System.currentTimeMillis())).getBytes("UTF8"); } }
3、需要將剛才編輯的指定預設:
編輯AsyncHBaseSink,查詢SimpleAsyncHbaseEventSerializer,改為MySimpleAsyncHbaseEventSerializer,如下:
if (eventSerializerType == null || eventSerializerType.isEmpty()) { eventSerializerType = "org.apache.flume.sink.hbase.MyAsyncHbaseEventSerializer"; logger.info("No serializer defined, Will use default"); }
4、打jar包,名字改為flum的lib目錄下的對應jar包同名,傳入lib目錄,覆蓋原jar包即可