1. 程式人生 > >大資料(三十五)HBASE【mapreduce操作hbase】

大資料(三十五)HBASE【mapreduce操作hbase】

現在有一些大的檔案,需要存入HBase中,其思想是先把檔案傳到HDFS上,利用map階段讀取<key,value>對,可在reduce把這些鍵值對上傳到HBase中。

package test;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

publicclass MapperClassextendsMapper<LongWritable,Text,Text,Text>{

publicvoidmap(LongWritable key,Text value,Context context)thorws IOException{

String[] items =value.toString().split(" ");

          String k = items[0];

String v = items[1];        

context.write(newText(k),new Text(v));

}

}

Reduce類,主要是將鍵值傳到HBase表中

package test;

import java.io.IOException;

import

org.apache.hadoop.hbase.client.Put;

importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.io.Text;

publicclass ReducerClassextendsTableReducer<Text,Text,ImmutableBytesWritable>{

publicvoid reduce(Text key,Iterable<Text>values,Context context){

String k = key.toString();

StringBuffer str=null;

for(Text value:values){

str.append(value.toString());

}

String v = new String(str);

Put putrow = new Put(k.getBytes());

putrow.add("fam1".getBytes(),"name".getBytes(), v.getBytes());    

}

}

由上面可知ReducerClass繼承TableReduce,在hadoop裡面ReducerClass繼承Reducer類。它的原型為:TableReducer<KeyIn,Values,KeyOut>可以看出,HBase裡面是讀出的Key型別是ImmutableBytesWritable。

Map,Reduce,以及Job的配置分離,比較清晰,mahout也是採用這種構架。

package test;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.HBaseConfiguration;

importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.util.Tool;

publicclass DriverextendsConfiguredimplements Tool{

@Override

public static void run(String[] arg0)throwsException {

// TODO Auto-generated method stub

Configuration conf = HBaseConfiguration.create();

conf.set("hbase.zookeeper.quorum.","localhost"); 

Job job = newJob(conf,"Hbase");

job.setJarByClass(TxtHbase.class);

Path in = newPath(arg0[0]);

job.setInputFormatClass(TextInputFormat.class);

FileInputFormat.addInputPath(job, in);

job.setMapperClass(MapperClass.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

TableMapReduceUtil.initTableReducerJob("table", ReducerClass.class,job);

job.waitForCompletion(true);

}

}

Driver中job配置的時候沒有設定 job.setReduceClass(); 而是用 TableMapReduceUtil.initTableReducerJob("tab1", THReducer.class, job); 來執行reduce類。

主函式

package test;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.util.ToolRunner;

publicclass TxtHbase {

publicstaticvoid main(String [] args)throwsException{

       
Driver.run(newConfiguration(),new THDriver(),args);

   
}
}

讀取資料時比較簡單,編寫Mapper函式,讀取<key,value>值就行了。

package test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.client.Result;

importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapred.TableMap;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;

publicclass MapperClassextendsMapReduceBaseimplements

TableMap<Text, Text> {

staticfinal String NAME = "GetDataFromHbaseTest";

private Configuration conf;

publicvoid map(ImmutableBytesWritable row, Result values,

OutputCollector<Text, Text>output, Reporter reporter)

throwsIOException {

StringBuilder sb = newStringBuilder();

for (Entry<byte[],byte[]> value : values.getFamilyMap(

"fam1".getBytes()).entrySet()) {

String cell =value.getValue().toString();

if (cell !=null){

sb.append(newString(value.getKey())).append(new String(cell));

}

}

output.collect(newText(row.get()),new Text(sb.toString()));

}

要實現這個方法initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, Class<? extendsorg.apache.hadoop.io.WritableComparable> outputKeyClass, Class<? extendsorg.apache.hadoop.io.Writable> outputValueClass,org.apache.hadoop.mapred.JobConf job, boolean addDependencyJars)

package test;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.HBaseConfiguration;

importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;

importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.util.Tool;

publicclass DriverextendsConfiguredimplements Tool{

@Override

publicstaticvoid run(String[] arg0)throws Exception{

// TODO Auto-generated method stub

Configuration conf =HBaseConfiguration.create();

conf.set("hbase.zookeeper.quorum.","localhost"); 

Job job = newJob(conf,"Hbase");

job.setJarByClass(TxtHbase.class);

job.setInputFormatClass(TextInputFormat.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

TableMapReduceUtilinitTableMapperJob("table",args0[0],MapperClass.class, job);
       
job.waitForCompletion(true);}
 
}

主函式

package test;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.util.ToolRunner;

publicclass TxtHbase {

publicstaticvoid main(String [] args)throwsException{

Driver.run(newConfiguration(),new THDriver(),args);

}

}