1. 程式人生 > >Hbase表Scan方法獲取rowkey

Hbase表Scan方法獲取rowkey

/**
	 *每次讀取Hbase資料庫Page_Size條
	 * @param entity
	 */
	public static void getContentFromHbase(HbaseExportEntity entity) {
		 StringBuffer buffer = new StringBuffer();
		 ResultScanner scanner = null;
		 
		 String startRow =  entity.getStartRow();
		 String endRow =    entity.getStopRow();
		 String tableName = entity.getTableName();
		 
		 //Filter filter = getHbaseExportFilter(entity.getCellList());							//過濾器
		 String cellNameStr = getCellNameStr(entity.getCellList(),entity.getFieldSplit());		//列名
		 String fileName = HbaseExportUtil.getFileName(entity);								//獲取檔名
		 
		 //檔案儲存路徑(配置)+ tableName + 檔名(程式自動生成)+副檔名(配置)
		 String  fileURL = entity.getSavePath()+ File.separator + entity.getTableName() 
		 	+ File.separator  + fileName;											//儲存絕對檔案路徑
		
		 if(entity.getShowFieldTitle()) {											//首行顯示欄位
         	buffer.append(cellNameStr);
         }
		 
		 if(entity.getShowRowkey()) {
			 buffer.append(entity.getFieldSplit());
			 buffer.append(ROW_KEY);
		 }
		 
		 buffer.append(FILE_NEWLINE);
		 logger.info("讀hbase資料庫--"+ tableName+"--表資料寫入--"+ fileName +"--處理開始");
		 int recordSize = 0;
	     int totalSize = 0;
	     String rowKey = null;
		 do {
	            scanner = HBaseScan.hbaseScan(tableName, ExportConstant.PAGE_SIZE, startRow, endRow, null);
	            recordSize = 0;
	            for (Result result : scanner) {
	            	for (KeyValue kv : result.raw()) {  
	            		int rowlength = Bytes.toShort(kv.getBuffer(), kv.getOffset()+KeyValue.ROW_OFFSET);  
	                    rowKey = Bytes.toStringBinary(kv.getBuffer(), kv.getOffset()+KeyValue.ROW_OFFSET + Bytes.SIZEOF_SHORT, rowlength);
	                    if(!StringUtils.isBlank(rowKey)) {
	                    	 break;
	                    }
	            	}
	                recordSize++;
	                startRow = HBaseUtil.bytesToStr(result.getRow()) + "0";	  
	                
	                String columnValueStr = getColumnValue(result,entity,rowKey);
	                buffer.append(columnValueStr);
	             	buffer.append(FILE_NEWLINE);
	            }
	            totalSize += recordSize;
	            
	            if(totalSize % ExportConstant.PAGE_SIZE==0) {						//每次寫入PAGE_SIZE條
	            	HbaseExportUtil.WriteContentToFile(buffer.toString(), fileURL);
	            	buffer = new StringBuffer();
	            }
	      } while (recordSize != 0);
		  //寫入內容到檔案中
		  HbaseExportUtil.WriteContentToFile(buffer.toString(), fileURL);
		  logger.info("讀hbase資料庫--"+ tableName+"--表資料寫入--"+ fileName +"--處理結束...共寫入記錄總數:"+totalSize+"條");
	}

獲取rowkey方法

 for (Result result : scanner) {
	            	for (KeyValue kv : result.raw()) {  
	            		int rowlength = Bytes.toShort(kv.getBuffer(), kv.getOffset()+KeyValue.ROW_OFFSET);  
	                    rowKey = Bytes.toStringBinary(kv.getBuffer(), kv.getOffset()+KeyValue.ROW_OFFSET + Bytes.SIZEOF_SHORT, rowlength);
	                    if(!StringUtils.isBlank(rowKey)) {
	                    	 break;
	                    }
	            	}