1. 程式人生 > >MapReduce將HDFS文字資料匯入HBase中

MapReduce將HDFS文字資料匯入HBase中

HBase本身提供了很多種資料匯入的方式,通常有兩種常用方式:

  1. 使用HBase提供的TableOutputFormat,原理是通過一個Mapreduce作業將資料匯入HBase
  2. 另一種方式就是使用HBase原生Client API

本文就是示範如何通過MapReduce作業從一個檔案讀取資料並寫入到HBase中。

首先啟動Hadoop與HBase,然後建立一個空表,用於後面匯入資料:

hbase(main):006:0> create 'mytable','cf'
0 row(s) in 10.8310 seconds

=> Hbase::Table - mytable
hbase(main):007
:0> list TABLE mytable 1 row(s) in 0.1220 seconds => ["mytable"] hbase(main):008:0> scan 'mytable' ROW COLUMN+CELL 0
row(s) in 0.2130 seconds

一、示例程式

下面的示例程式通過TableOutputFormat將HDFS上具有一定格式的文字資料匯入到HBase中。

首先建立MapReduce作業,目錄結構如下:

Hdfs2HBase/
├── classes
└── src
    ├── Hdfs2HBase.java
    ├── Hdfs2HBaseMapper.java
    └── Hdfs2HBaseReducer.java

Hdfs2HBaseMapper.java

package com.lisong.hdfs2hbase;

import java.io.IOException;

import
org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class Hdfs2HBaseMapper extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text line, Context context) throws IOException,InterruptedException { String lineStr = line.toString(); int index = lineStr.indexOf(":"); String rowkey = lineStr.substring(0, index); String left = lineStr.substring(index+1); context.write(new Text(rowkey), new Text(left)); } }

Hdfs2HBaseReducer.java

package com.lisong.hdfs2hbase;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class Hdfs2HBaseReducer extends Reducer<Text, Text, ImmutableBytesWritable, Put> {
        public void reduce(Text rowkey, Iterable<Text> value, Context context) throws IOException,InterruptedException {
                String k = rowkey.toString();
                for(Text val : value) {
                        Put put = new Put(k.getBytes());
                        String[] strs = val.toString().split(":");
                        String family = strs[0];
                        String qualifier = strs[1];
                        String v = strs[2];
                        put.add(family.getBytes(), qualifier.getBytes(), v.getBytes());
                        context.write(new ImmutableBytesWritable(k.getBytes()), put);
                }
        }
}

Hdfs2HBase.java

package com.lisong.hdfs2hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Hdfs2HBase {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if(otherArgs.length != 2) {
            System.err.println("Usage: wordcount <infile> <table>");
            System.exit(2);
        }

        Job job = new Job(conf, "hdfs2hbase");
        job.setJarByClass(Hdfs2HBase.class);
        job.setMapperClass(Hdfs2HBaseMapper.class);
        job.setReducerClass(Hdfs2HBaseReducer.class);

        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);

        job.setOutputFormatClass(TableOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, otherArgs[1]);

        System.exit(job.waitForCompletion(true)?0:1);
    }
}

配置javac編譯依賴環境:

$HADOOP_HOME/share/hadoop/common/hadoop-common-2.4.1.jar
$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.4.1.jar
$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar

這裡要操作HBase,故除了上面三個jar包,還需要$HBASE_HOME/lib目錄下的jar包。為了方便,我們在/etc/profileCLASSPATH裡包含所有的依賴包:

TEMP=`ls /home/hadoop/hbase/lib/*.jar`
HBASE_JARS=`echo $TEMP | sed 's/ /:/g'`
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:/home/hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar:/home/hadoop/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar:$HBASE_JARS

編譯

$ javac -d classes/ src/*.java

打包

$ jar -cvf hdfs2hbase.jar classes

執行

建立一個data.txt檔案,內容如下(列族是建表時建立的列族cf):

r1:cf:c1:value1 
r2:cf:c2:value2 
r3:cf:c3:value3

將檔案複製到hdfs上:

$ hadoop/bin/hadoop fs -put data.txt /hbase

執行MapReduce作業:

$ hadoop/bin/hadoop jar Hdfs2HBase/hdfs2hbase.jar com.lisong.hdfs2hbase.Hdfs2HBase /hbase/data.txt mytable

報錯NoClassDefFoundError找不到類定義:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/io/ImmutableBytesWritable
    at com.lisong.hdfs2hbase.Hdfs2HBase.main(Hdfs2HBase.java:30)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    ...
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more

原因是我沒有把HBase的jar包加到hadoop-env.sh中。

TEMP=`ls /home/hadoop/hbase/lib/*.jar`
HBASE_JARS=`echo $TEMP | sed 's/ /:/g'`
HADOOP_CLASSPATH=$HBASE_JARS

再次執行發現又報了Unable to initialize MapOutputCollector的錯誤:

15/08/10 08:55:44 WARN mapred.MapTask: Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer
java.lang.NullPointerException
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1008)
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:401)
    ...
    at java.lang.Thread.run(Thread.java:745)
15/08/10 08:55:44 INFO mapred.LocalJobRunner: map task executor complete.
15/08/10 08:55:44 WARN mapred.LocalJobRunner: job_local2138114942_0001
java.lang.Exception: java.io.IOException: Unable to initialize any output collector
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Unable to initialize any output collector
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:412)
    ...
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/08/10 08:55:44 INFO mapreduce.Job: Job job_local2138114942_0001 failed with state FAILED due to: NA
15/08/10 08:55:45 INFO mapreduce.Job: Counters: 0

原因是我沒有指明Map輸出的Key/Value型別,在Hdfs2HBase.java中新增以下兩句:

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);     

如果沒有專門定義Mapper輸出型別的話,job.setOutputKeyClassjob.setOutputValueClass設定的是Mapper和Reducer兩個的輸出型別。

job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);

而Hdfs2HBaseMapper輸出型別是Text/Text,所以這裡需要單獨指定。

修改Hdfs2HBase.java

package com.lisong.hdfs2hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Hdfs2HBase {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if(otherArgs.length != 2) {
            System.err.println("Usage: wordcount <infile> <table>");
            System.exit(2);
        }

        Job job = new Job(conf, "hdfs2hbase");
        job.setJarByClass(Hdfs2HBase.class);
        job.setMapperClass(Hdfs2HBaseMapper.class);
        job.setReducerClass(Hdfs2HBaseReducer.class);

        job.setMapOutputKeyClass(Text.class);    // +
        job.setMapOutputValueClass(Text.class);  // +

        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);

        job.setOutputFormatClass(TableOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, otherArgs[1]);

        System.exit(job.waitForCompletion(true)?0:1);
    }
}

再次編譯、打包,然後執行成功!

查詢HBase表,驗證資料是否已匯入:

hbase(main):001:0> scan 'mytable'
ROW                         COLUMN+CELL                                                                 
 r1                         column=cf:c1, timestamp=1439223857492, value=value1                         
 r2                         column=cf:c2, timestamp=1439223857492, value=value2                         
 r3                         column=cf:c3, timestamp=1439223857492, value=value3                         
3 row(s) in 1.3820 seconds

可以看到,資料匯入成功!

由於需要頻繁的與儲存資料的RegionServer通訊,佔用資源較大,一次性入庫大量資料時,TableOutputFormat效率並不好。

二、拓展-TableReducer

我們可以將Hdfs2HBaseReducer.java程式碼改成下面這樣,作用是一樣的:

package com.lisong.hdfs2hbase;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class Hdfs2HBaseReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {
    public void reduce(Text rowkey, Iterable<Text> value, Context context) throws IOException,InterruptedException {
        String k = rowkey.toString();
        for(Text val : value) {
            Put put = new Put(k.getBytes());
            String[] strs = val.toString().split(":");
            String family = strs[0];
            String qualifier = strs[1];
            String v = strs[2];
            put.add(family.getBytes(), qualifier.getBytes(), v.getBytes());
            context.write(new ImmutableBytesWritable(k.getBytes()), put);
        }
    }
}

這裡直接繼承了TableReducerTableReducer是部分特例化的Reducer,它只有三個型別引數:輸入Key/Value是對應Mapper的輸出,輸出Key可以是任意的型別,但是輸出Value必須是一個PutDelete例項。

編譯打包執行,結果與前面的一樣!

相關推薦

MapReduceHDFS文字資料匯入HBase

HBase本身提供了很多種資料匯入的方式,通常有兩種常用方式: 使用HBase提供的TableOutputFormat,原理是通過一個Mapreduce作業將資料匯入HBase 另一種方式就是使用HBase原生Client API 本文就是示範如何通過M

sqlserver的資料匯入hbase

將sqlserver的資料匯入hbase中 1.解壓sqoop-sqlserver-1.0.tar.gz,並改名(可以不改)          tar  -zxvf  sqoop- sql

如何不同型別資料匯入Elaticsearch

題記 Elaticsearch的原理明白了以後,手頭有很多不同型別的資料,如: 1)單條資料,如程式中自己構造的JSON格式資料; 2)符合Elasticsearch索引規範的批量資料; 3)日誌檔案,格式*.log; 4)結構化資料,儲存在mysql

33.如何不同型別資料匯入Elaticsearch(ES同步小結)

題記Elaticsearch的原理明白了以後,手頭有很多不同型別的資料,如: 1)單條資料,如程式中自己構造的JSON格式資料; 2)符合Elasticsearch索引規範的批量資料; 3)日誌檔案,格式*.log; 4)結構化資料,儲存在mysql、oracle等關係型資料

文字資料匯入HBASE

在將有定界符文字檔案匯入HBASE庫中,需要將後面的定界符去掉,否則將匯入失敗。如下所示:[[email protected] bin]$ cat /tmp/emp.txt1,A,201304,2,B,201305,3,C,201306,4,D,201307,這個

文字檔案匯入HBase

文字檔案匯入到Hbase中  建立表sudo su - su - hadoop ./hbase shellcreate 'table1',{NAME => 'DF', VERSIONS => 5}  www.2cto.com  配置環境 1.修改hadoop環

使用mapreduce hdfs資料匯入到到hbase

package hbase; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase

使用命令文字資料匯入到資料庫

1.下載 oracle 客戶端 和 plsql Oracle 的下載地址: 2. 建立 load.ctl 檔案 在任意資料夾下建立  load.ctl 檔案,用編輯器開啟 load.ctl 檔

通過sqoopMySQL資料庫資料匯入Hbase

從接觸到大資料到成功的實現一個功能期間走了不少彎路也踩了不少坑,這裡作為我的學習筆記也可以作為小白們的前車之鑑,少走彎路,有不正確之處,望指出 環境準備: hadoop、hbase、sqoop、mys

hive over hbase方式文字資料匯入hbase

1,建立hbase表Corpus >> create 'Corpus','CF' 2,建立hive->hbase外表logic_Corpus,並對應hbase中的Corpus表 >> CREATE EXTERNAL TABLE logic_Co

利用sqoophive資料匯入Oracle(踩的坑)

教程很多,這裡只說踩過的坑 1.下載sqoop時,還得下一個bin的包,拿到sqoop-1.4.6.jar 的包,放到hadoop的lib目錄下 2.匯入oracle,執行程式碼時,使用者名稱和表名必須大寫!且資料庫建表時表名必須大寫!  示例程式碼: sqoop expo

mysql匯入資料load data infile用法(txt檔案資料匯入)

我們常常匯入資料!mysql有一個高效匯入方法,那就是load data infile 下面來看案例說明   基本語法: load data  [low_priority] [local] infile 'file_name txt' [replace | ignor

flumekafkatopic資料匯入hive

一、首先更加資料的表結構在hive中進行表的建立。          create table AREA1(unid string,area_punid string,area_no string,area_name s

oracle通過load data 資料匯入通過儲存過程進行批量處理

說明:雖然沒圖,但文字表述很清楚,自己做過的專案留著備用(這只是初版,比較繁瑣,但很明確) 準備工作做完之後,後期可直接使用。如後期excel資料有變更,只需改動對應的部分即可,不涉及改動的可直接使用。 實際操作步驟 依照excel資料模版格式準備好建表語句,將中間過渡

使用PLSQL文字資料匯入

開啟PLSQL,點選“Tools”-->“Text Importer...”,點選下圖的按鈕開啟所要匯入的檔案 匯入檔案之後,在下方進行一些列的設定之後,點選上圖中的“Data to Oracle” 點選“Data to Oracle”之後的操作如下圖所示 &

利用sqoophive資料匯入Oracle

首先: 如oracle則執行sqoop list-databases --connect jdbc:oracle:thin:@//192.168.27.235:1521/ORCL --username DATACENTER -P 來測試是否能正確連線資料庫  如mysql則執行sq

Hive 實戰練習(一)—按照日期每天的資料匯入Hive表

需求:         每天會產生很多的日誌檔案資料,有這麼一種需求:需要將每天產生的日誌資料在晚上12點鐘過後定時執行操作,匯入到Hive表中供第二天資料分析使用。要求建立分割槽表,並按照日期分割槽。資料檔案命名是以當天日期命名的,如2015-01-09.txt一、建立分割

模板word的特定欄位替換(資料匯入word

一、 將模板word中的特定欄位替換(將資料匯入word中) 所用jar包 一、 將模板word中的特定欄位替換(將資料匯入word中) 所用jar包 開發程式碼 /** * @Title createContract * @description 生成合

Excel的資料匯入SqlServer的表

記錄一下最近從Excel匯入大量資料到SqlServer表中的步驟。 在將Excel資料準備好以後。 1、右鍵SQL Server中需要匯入資料的庫名,選擇【任務】—【匯入資料】如圖: 2、彈

用sqoopmysql的資料匯入到hive表,原理分析

Sqoop 將 Mysql 的資料匯入到 Hive 中 準備Mysql 資料 如圖所示,準備一張表,資料隨便造一些,當然我這裡的資料很簡單。 編寫命令 編寫引數檔案 個人習慣問題,我喜歡把引數寫到檔案裡,然後再命令列引用。 vim mysql-info, #