1. 程式人生 > >HBase 協處理器實踐(一)AggregationClient

HBase 協處理器實踐(一)AggregationClient

                                                                                                           HBase 協處理器實踐(一)

         對資料表中的資料進行求和。

       1、新增coprocessor AggregateImplementation分為新增全域性和為單表新增Aggregation。

         a、 新增全域性AggregateImplementation,方法:修改hbase-site.xml檔案,在該檔案中新增

<span style="font-size:18px;"><property>
   <name>hbase.coprocessor.user.region.classes</name>
   <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
 </property></span>
            b、新增單表的AggregateImplementation,可以通過hbase shell實現也可以通過Java API

                hbase shell

<span style="font-size:18px;">disable 'table'
alter 'table',METHOD => 'table_att','coprocessor' => 'hdfs://ip:9000/user/hadoop/jars/test.
                         jar|mycoprocessor.SampleCoprocessor|1001|'
enable 'table'</span>
                         java api  通過HBaseAdmin / HTableDescriptor 來實現的
<span style="font-size:18px;">    建立HBaseConfiguration
    Configuration hbaseconfig = HBaseConfiguration.create();  
    通過HBaseAdmin來完成為表新增配置
    HBaseAdmin hbaseAdmin = new HBaseAdmin(hbaseconfig);  
    hbaseAdmin.disableTable(TABLE_NAME);  
    通過表的描述來為表新增AggregationImplementation
    HTableDescriptor htd = hbaseAdmin.getTableDescriptor(TABLE_NAME);  
    htd.addCoprocessor(AggregateImplementation.class.getName());  
    hbaseAdmin.modifyTable(TABLE_NAME, htd);  
    hbaseAdmin.enableTable(TABLE_NAME);  
    hbaseAdmin.close();</span>

在完成新增之後就是對於表資料統計,LongColumnInterpreter列直譯器,解析列值,呼叫AggregationClient的rowCount方法,t對於表資料的行統計的大小完全是取決於建立的Scan,可以是針對單個列族、單個列、範圍查詢。

<span style="font-size:18px;">public static void testAggregationClient()
        throws Throwable {
        HTable hTable = new HTable(config, TableName.valueOf("marketing"));
        /**
         * ColumnInterperter型別的引數ci。即列直譯器,用於解析列中的值。
         */
        LongColumnInterpreter columnInterpreter = new LongColumnInterpreter();
        AggregationClient aggregationClient = new AggregationClient(config);
        Scan scan = new Scan();

        // 統計範圍內的資料
        // scan.setStopRow("rk007".getBytes());

        // 統計單列族
        scan.addFamily(Bytes.toBytes("f"));
        // 統計單列族的列
        // scan.addColumn(Bytes.toBytes("f"), Bytes.toBytes("q"));

        Long count = aggregationClient.rowCount(hTable, columnInterpreter, scan);
        System.out.println("count..." + count);

    }</span>
這裡的範圍統計,只指定結束的rowKey,預設startRowkey從第一條開始。還有一些求平均值,最大值的方法注意資料型別。