1. 程式人生 > >MapReduce 操作 HBase

MapReduce 操作 HBase

1.HBase與MR關係

  HBase和MapReduce,這兩者並沒有直接關係,隸屬於不同的專案。這裡講到的MapReduce on HBase是指利用HBase表做為MR計算框架的資料輸入源或者輸出源源,使得能夠利用MR的平行計算能力計算HBase的內部資料。

2.官方HBase-MapReduce

(1)檢視HBase的MapReduce任務的執行

        $ bin/hbase mapredcp

(2)執行環境變數的匯入

        $ export HBASE_HOME=/opt/module/hbase-1.3.1

        $ export HADOOP_HOME=/opt/module//hadoop-2.7.2

        $ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`

(3)執行官方的MapReduce任務

        -- 案例一:統計Student表中有多少行資料

        $ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student

(4)案例二:使用MapReduce將本地資料匯入到HBase

1)在本地建立一個tsv格式的檔案:fruit.tsv

1001    Apple    Red

1002    Pear        Yellow

1003    Pineapple    Yellow

2)建立HBase表

hbase(main):001:0> create 'fruit','info'

3)在HDFS中建立input_fruit資料夾並上傳fruit.tsv檔案

$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/

$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/

4)執行MapReduce到HBase的fruit表中

$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \

-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \

5)使用scan命令檢視匯入後的結果

3.自定義HBase-MapReduce1

    目標:將fruit表中的一部分資料,通過MR遷入到fruit_mr表中。

分步實現:

(0) 新建專案後在pom.xml中新增依賴:

<?xml version="1.0" encoding="UTF-8"?>

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.luomk</groupId>

    <artifactId>HBaseMapReduce1</artifactId>

    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-server</artifactId>

            <version>1.3.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-client</artifactId>

            <version>1.3.1</version>

        </dependency>

    </dependencies>

</project>

(1)構建HbaseMapper類,用於讀取fruit表中的資料

package com.luomk;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CellUtil;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class HbaseMapper extends TableMapper<ImmutableBytesWritable, Put> {

    @Override

    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

        //獲取put物件

        Put v = new Put(key.copyBytes());

        for (Cell cell : value.rawCells()) {

            if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {

                v.add(cell);

            }

        }

        context.write(key, v);

    }

}

(2)構建HbaseReducer類,用於將讀取到的fruit表中的資料寫入到fruit_mr表中

package com.luomk;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import java.io.IOException;

public class HbaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {

    @Override

    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {

        for (Put value : values) {

            context.write(key,value);

        }

    }

}

(3)呼叫執行

package com.luomk;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

public class HbaseDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //1.獲取Hbase的conf&封裝job

        Configuration configuration = new Configuration();

        Configuration conf = HBaseConfiguration.create(configuration);

        Job job = Job.getInstance(conf);

        //2.設定主類

        job.setJarByClass(HbaseDriver.class);

        Scan scan = new Scan();

        //3.設定Mapper類

        TableMapReduceUtil.initTableMapperJob(Bytes.toBytes(args[0]), scan, HbaseMapper.class, ImmutableBytesWritable.class, Put.class, job);

        //4.設定reducer個數

        // job.setNumReduceTasks(1);

        //5.設定Reducer

        TableMapReduceUtil.initTableReducerJob(args[1], HbaseReducer.class, job);

        //提交

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

(4)打包執行任務

$ ~/modules/hadoop-2.7.2/bin/yarn jar  ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar com.luomk.HbaseDriver

提示:執行任務前,如果待資料匯入的表不存在,則需要提前建立。

提示:maven打包命令:-P local clean package或-P dev clean package install(將第三方jar包一同打包,需要外掛:maven-shade-plugin)

4.自定義HBase-MapReduce2

    目標:實現將HDFS中的資料寫入到HBase表中。

    分步實現:

(0) 新建專案後在pom.xml中新增依賴:

<?xml version="1.0" encoding="UTF-8"?>

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.luomk</groupId>

    <artifactId>HBaseMapReducer2</artifactId>

    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-server</artifactId>

            <version>1.3.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-client</artifactId>

            <version>1.3.1</version>

        </dependency>

    </dependencies>

</project>

(1)構建HbaseMapper於讀取HDFS中的檔案資料

package com.luomk;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**

* :實現將HDFS中的資料寫入到HBase表中。

*/

public class HbaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

    private ImmutableBytesWritable k = new ImmutableBytesWritable();

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();

        String[] split = line.split("\t");

        k.set(Bytes.toBytes(split[0]));

        Put put = new Put(Bytes.toBytes(split[0]));

        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));

        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));

        context.write(k, put);

    }

}

(2)構建HbaseReducer類

package com.luomk;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import java.io.IOException;

public class HbaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {

    @Override

    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {

        for (Put value : values) {

            context.write(key, value);

        }

    }

}

(3)呼叫執行

package com.luomk;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class HbaseDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration configuration = new Configuration();

        Configuration conf = HBaseConfiguration.create(configuration);

        Job job = Job.getInstance(conf);

        job.setJarByClass(HbaseDriver.class);

        job.setMapperClass(HbaseMapper.class);

        job.setMapOutputKeyClass(ImmutableBytesWritable.class);

        job.setMapOutputValueClass(Put.class);

        TableMapReduceUtil.initTableReducerJob(args[1], HbaseReducer.class, job);

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

(4)打包執行

$ ~/modules/hadoop-2.7.2/bin/yarn jar ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar com.luomk.HbaseDriver

提示:執行任務前,如果待資料匯入的表不存在,則需要提前建立之。

提示:maven打包命令:-P local clean package或-P dev clean package install(將第三方jar包一同打包,需要外掛:maven-shade-plugin)

5.原始碼下載地址: