1. 程式人生 > >[大資料]連載No19 Hbase Shell和API的增刪改查+與MapperReducer讀寫操作

[大資料]連載No19 Hbase Shell和API的增刪改查+與MapperReducer讀寫操作

本次總結如下
1、Hbase Shell的常用命令
2、Java APi 對hbase的增刪改查
3、Mapper Reducer從hbase讀寫數資料,計算單詞數量,並寫回hbase

登入hbase Shell

[[email protected] ~]#/home/softs/hbase-0.98.12.1-hadoop2/bin/hbase shell

1、表操作

建立表user    create 'test', 'cf'     # test表明  cf列族
查詢表user    scan  'test'

2、增刪改查操作
插入資料  put 'test', 'row1', 'cf:username', 'value1'    # row行唯一識別符號   username列名
查詢資料  list 'test'
id查詢    get 'test', 'row1'
刪除表    disable 'test' 然後 drop 'test'    #刪除前要先禁用掉
hbase(main):001:0> scan 'user'
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/softs/hbase-0.98.12.1-hadoop2/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/softs/hadoop-2.5.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
2018-06-29 04:03:54,274 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
ROW                                 COLUMN+CELL                                                                                            
 2                                  column=col1:count, timestamp=1530214622282, value=1                                                    
 userId1                            column=col1:age, timestamp=1530201359347, value=2                                                      
 userId1                            column=col1:name, timestamp=1530203162657, value=xiaohong                                              
 userId1                            column=col2:age, timestamp=1530203197562, value=33                                                     
 userId1                            column=col2:name, timestamp=1530201359347, value=\xE5\xB0\x8F\xE7\xBA\xA2                              
2 row(s) in 0.3550 seconds

hbase(main):002:0> 

Hbase WebUI檢視叢集資訊


java操作hbase Api

public class HbaseCrub {

    HBaseAdmin hbase;
HTable table;
String user = "user";
String col1="col1";
String col2="col2";
@Before
public void before() throws  Exception{
        Configuration configuration=new Configuration();
/**指定zookeeper叢集,找到配置檔案*/
configuration.set("hbase.zookeeper.quorum"
,"master,node1,node2"); /**資料庫連線**/ hbase =new HBaseAdmin(configuration); table=new HTable(configuration,user.getBytes()); } @After public void end() throws Exception{ if(hbase != null) { hbase.close(); } if(table != null) { table.close(); } } @Test public void createTable() throws Exception{ if(hbase.tableExists(user.getBytes())){ /*禁用,刪除**/ hbase.disableTable(user.getBytes()); hbase.deleteTable(user.getBytes()); } HTableDescriptor descriptor =new HTableDescriptor(TableName.valueOf(user)); /**要先指定列族*/ HColumnDescriptor columnDescriptor=new HColumnDescriptor(col1.getBytes()); /**記憶體快取*/ columnDescriptor.setInMemory(true); descriptor.addFamily(columnDescriptor); HColumnDescriptor columnDescriptor2=new HColumnDescriptor(col2.getBytes()); /**使用資料使用記憶體先存放*/ columnDescriptor2.setInMemory(false); descriptor.addFamily(columnDescriptor2); hbase.createTable(descriptor); } @Test public void insertUser() throws Exception{ /**指定rowkey*/ String rowKey ="userId1"; Put put =new Put(rowKey.getBytes()); put.add(col1.getBytes(),"name".getBytes(),"小石頭".getBytes()); put.add(col1.getBytes(),"age".getBytes(),"2".getBytes()); put.add(col2.getBytes(),"name".getBytes(),"小紅".getBytes()); table.put(put); } @Test public void deleteUser() throws Exception{ /**指定rowkey*/ Delete delete =new Delete("userId1".getBytes()); delete.deleteColumn(col1.getBytes(),"name".getBytes()); table.delete(delete); } @Test public void getByUserId() throws Exception{ /**指定rowkey*/ Get get =new Get("userId1".getBytes()); /**指定返回的列*/ get.addColumn(col1.getBytes(),"age".getBytes()); Result result= table.get(get); //單行記錄 Cell cell=result.getColumnLatestCell(col1.getBytes(),"age".getBytes()); System.out.println(new String(CellUtil.cloneValue(cell))); } @Test public void listUsers() throws Exception{ /** * Scan 查詢 返回多行資料 * 儘量不要用全表掃描 * 1、範圍查詢 起始rowkey 結束rowkey * 2、過濾器 filter 慎重!! * @throws Exception */ Scan scan =new Scan(); scan.setStartRow("userId0".getBytes()); scan.setStopRow("userId3".getBytes()); // 新增查詢條件 SingleColumnValueFilter filter1 = new SingleColumnValueFilter( col1.getBytes(), "age".getBytes(), CompareFilter.CompareOp.EQUAL, "2".getBytes()); scan.setFilter(filter1); ResultScanner results= table.getScanner(scan); results.forEach(result -> { System.out.print(new String(result.getValue(col1.getBytes(),"name".getBytes()))+"\t"); System.out.println(new String(result.getValue(col1.getBytes(),"age".getBytes()))); }); } }

hbase與mapperReduce整合,讀行資料,統計單詞數量

job類

public static void main(String []args) throws  Exception{
    Configuration conf =new Configuration();
/**本地執行*/
conf.set("fs.defaultFS","hdfs://master:8020");
conf.set("hbase.zookeeper.quorum", "master,node1,node2");
Job job =Job.getInstance(conf);
job.setJarByClass(WCJob.class);
/**從hbase讀取資料設定查詢條件*/
Scan scan =new Scan();
TableMapReduceUtil.initTableMapperJob("user",scan,WCMapper.class, Text.class, IntWritable.class,job,false);
/**
     * 最後一個引數指定為false,因為是本地執行,需要注意
     * */
TableMapReduceUtil.initTableReducerJob("user",WCReducer.class,job,null,null,null,null,false);
job.waitForCompletion(true);
}

Mapper類,統計單詞數量,輸出

public class WCMapper  extends TableMapper<Text,IntWritable>{

    /**
     * 每次讀一行呼叫一次map方法
     * */
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

        String age =new String(value.getValue("col1".getBytes(),"age".getBytes()));
context.write(new Text(age),new IntWritable(1));
}
}
reducer類,計算結果,並寫入到hbase
/**
 * Text,IntWritable 和mapper資料的資料型別一直
 * */
public class WCReducer extends TableReducer<Text,IntWritable,ImmutableBytesWritable> {

    @Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int num =0;
        for(IntWritable in : values){
            num ++;
}

        /**以年級為rowkey,寫入到hbase**/
Put put =new Put(key.getBytes());
put.add("col1".getBytes(),"count".getBytes(),(num+"").getBytes());
context.write(null,put);
}
}

結果檢視,正確