本次介紹的是將hbase中存的資料,用mapreduce解析。
一,hbase中的資料結構:
這裡寫圖片描述
二,分析map過程:
因為這裡是對hbase中的表進行解析,所以map必須是繼承TableMapper類來對資料進行解析。
注意:輸入的key value必須是ImmutableBytesWritable和Result。輸出的就可以自己定義了。
ImmutableBytesWritable,指代的是行健,
Result,指代的是值。
在這個map過程中會將tags,用“,”分割得到一個字元陣列,然後同nikname一起寫出,nakname做key,tag做value。
三,分析Reduce過程:
這裡的Reduce過程跟平時的沒有太大的區別,在整合資料時,也是用“,”分割的。
最後注意:
map過程的輸出值,與reduce的輸入值,一定要對應,並且value不能為空,否則reduce過程會進不去。開始我的就是key有,但是value空了,導致reduce過程一直不進去。
這裡還提供了hbase表建立的方法testData()。

四,程式碼:

mport java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class YaoHbaseAndMapReduce02 {

    public static class YaoMap02 extends TableMapper<Text, Text>{
        @Override

        //key是hbase中的行鍵
        //value是hbase中的所行鍵的所有資料
        protected void map(ImmutableBytesWritable key, Result value,
                Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
                        throws IOException, InterruptedException {
            Text v=null;
            String[] kStrs=null;
            List<Cell> cs=value.listCells();
            for(Cell cell:cs){
                if("tags".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    kStrs=Bytes.toString(CellUtil.cloneValue(cell)).split(",");
                //  System.out.println("yaomap,kStrs=="+kStrs);
                }
                else if("nickname".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    v=new Text(CellUtil.cloneValue(cell));
                }

            }
            for (String kStr : kStrs) {
                System.out.println("*****"+new Text(kStr.toLowerCase())+"==="+v);
                context.write(new Text(kStr.toLowerCase()), v);
            }
        }

    }

    public static class YaoReduce02 extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            StringBuilder sb=new StringBuilder();
            for(Text text:values){
                System.out.println("sb=="+sb);
                System.out.println(sb.length() > 0);
                sb.append((sb.length() > 0 ? ",":"") + text.toString());

            }
            Text va=new Text(sb.toString());

            context.write(key, va);

        }
    }




    public static void main(String[] args) throws Exception {
        // 測試資料執行一次後需要註釋
        // testData();
        Configuration conf=new Configuration();
        conf=HBaseConfiguration.create(conf);
        conf.set("hbase.zookeeper.quorum", "192.168.61.128");

        Job job=Job.getInstance(conf,"mapandreduce02");
        job.setJarByClass(YaoHbaseAndMapReduce02.class);//將此類打成jar包

        Scan scan=new Scan();
        //取對業務有用的資料 tags, nickname
        scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
        scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));

        TableMapReduceUtil.initTableMapperJob("blog1", scan, YaoMap02.class, Text.class, Text.class, job);



        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.61.128:9000/hbaseout" + new Date().getTime()));
        job.setReducerClass(YaoReduce02.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);


    }



    public static void testData() {
        try {
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "192.168.61.128");
            Connection con = ConnectionFactory.createConnection(conf);
            Admin admin = con.getAdmin();

            TableName tn = TableName.valueOf("blog1");
            if (admin.tableExists(tn)) {
                admin.disableTable(tn);
                admin.deleteTable(tn);
            }

            HTableDescriptor htd = new HTableDescriptor(tn);
            HColumnDescriptor hcd01 = new HColumnDescriptor("article");
            htd.addFamily(hcd01);
            HColumnDescriptor hcd02 = new HColumnDescriptor("author");
            htd.addFamily(hcd02);
            admin.createTable(htd);

            Table t = con.getTable(tn);
            Put put = new Put(Bytes.toBytes("1"));
            put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("content"),
                    Bytes.toBytes("HBase is the Hadoop database. Use it when you need random, "
                            + "realtime read/write access to your Big Data"));
            put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("HBase,NoSql,Hadoop"));
            put.addColumn(Bytes.toBytes("article"), Bytes.toBytes("title"), Bytes.toBytes("Head First Hbase"));
            put.addColumn(Bytes.toBytes("author"), Bytes.toBytes("name"), Bytes.toBytes("zhangsan"));
            put.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("sansan"));

            Put put02 = new Put(Bytes.toBytes("10"));
            put02.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("Hadoop"));
            put02.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("xiaoshi"));

            Put put03 = new Put(Bytes.toBytes("100"));
            put03.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"), Bytes.toBytes("hbase,nosql"));
            put03.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"), Bytes.toBytes("superman"));

            List<Put> puts = Arrays.asList(put, put02, put03);
            t.put(puts);
            System.out.println("==========> 測試資料準備完成...");

            if (admin != null) {
                admin.close();
            }
            if (con != null) {
                con.close();
            }

        } catch (IOException e) {
            e.printStackTrace();
        }

    }



}
.