1. 程式人生 > >HBASE(五 MapReduce)

HBASE(五 MapReduce)

Hbase 也可以做一些MapReduce操作

Hbase的MaprReuce 無非三種 :

  1. HDFS 中的資料 成為 Hbase 的某個表的某一列
  2. HBase中的某一列 成為HDFS 中的資料
  3. HBase某一表某列 加工 流入 HBase另一表中某列

實現Demo如下 :

1.建立兩個表 插入模板資料

public class HbaseMR {

    private static Configuration conf;
    private static Connection conn;
    private static Admin admin;
static{ conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181"); try { conn = ConnectionFactory.createConnection(conf); } catch (IOException e) { e.printStackTrace(); } } public static void initTable(){ try { //建立兩個表 admin = conn.getAdmin
(); HTableDescriptor word = new HTableDescriptor(TableName.valueOf("word")); HTableDescriptor stat = new HTableDescriptor(TableName.valueOf("stat")); HColumnDescriptor content = new HColumnDescriptor("content"); word.addFamily(content); stat.addFamily
(content); admin.createTable(word); admin.createTable(stat); //初始化第一個表的資料 Table table = conn.getTable(TableName.valueOf("word")); table.setAutoFlushTo(false); table.setWriteBufferSize(5); List<Put> lp = new ArrayList<Put>(); Put p1 = new Put(Bytes.toBytes("1")); p1.add("content".getBytes(), "info".getBytes(), ("The Apache Hadoop software library is a framework").getBytes()); lp.add(p1); Put p2 = new Put(Bytes.toBytes("2")); p2.add("content".getBytes(),"info".getBytes(),("The common utilities that support the other Hadoop modules").getBytes()); lp.add(p2); Put p3 = new Put(Bytes.toBytes("3")); p3.add("content".getBytes(), "info".getBytes(),("Hadoop by reading the documentation").getBytes()); lp.add(p3); Put p4 = new Put(Bytes.toBytes("4")); p4.add("content".getBytes(), "info".getBytes(),("Hadoop from the release page").getBytes()); lp.add(p4); Put p5 = new Put(Bytes.toBytes("5")); p5.add("content".getBytes(), "info".getBytes(),("Hadoop on the mailing list").getBytes()); lp.add(p5); table.put(lp); table.flushCommits(); } catch (Exception e) { e.printStackTrace(); } } }

**2.Mapper 要繼承TableMapper

public class HbaseMapper extends TableMapper<Text,IntWritable> {

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
    //通過 value獲取某一列族中的某一列 進行加工 
        byte[] l = value.getValue(Bytes.toBytes("content"), Bytes.toBytes("info"));
        String line = new String(l);
        String[] split = line.split(" ");
        for (String s : split) {
            context.write(new Text(s),new IntWritable(1));
        }
    }
}

**3.Reduce 類 要繼承TableReduce

public class HbaseReduce extends TableReducer<Text,IntWritable,ImmutableBytesWritable>{

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            int i  = Integer.parseInt(value.toString());
            sum =sum+i;
        }
        Put put = new Put(Bytes.toBytes(key.toString()));
        put.add(Bytes.toBytes("content"),Bytes.toBytes("info"),Bytes.toBytes(String.valueOf(sum)));

        context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put);

    }
}

4.重點:Driver類


    public static void main(String[] args) throws Exception {

        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");

        Job job = Job.getInstance(conf);

        job.setJarByClass(HbaseDriver.class);

        //初始化mapper任務 相當於設定mapper類 
        //引數 分別是: Hbase來源表名,new Scan(),Mapper類,輸出key,輸出value,job
        TableMapReduceUtil.initTableMapperJob("word",new Scan(),HbaseMapper.class,Text.class,IntWritable.class,job);

        //初始化reduce任務 相當於設定reduce類 
        //引數 分別是: Hbase目的表名,Reduce類,job 
        jobTableMapReduceUtil.initTableReducerJob("stat",HbaseReduce.class,job);


       job.waitForCompletion(true);

    }