MapReduce功能實現一---Hbase和Hdfs之間資料相互轉換
MapReduce功能實現系列:
一、從Hbase表1中讀取資料再把統計結果存到表2
在Hbase中建立相應的表1:
create 'hello','cf' put 'hello','1','cf:hui','hello world' put 'hello','2','cf:hui','hello hadoop' put 'hello','3','cf:hui','hello hive' put 'hello','4','cf:hui','hello hadoop' put 'hello','5','cf:hui','hello world' put 'hello','6','cf:hui','hello world'
java程式碼:
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; public class HBaseToHbase { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String hbaseTableName1 = "hello"; String hbaseTableName2 = "mytb2"; prepareTB2(hbaseTableName2); Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(HBaseToHbase.class); job.setJobName("mrreadwritehbase"); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, IntWritable.class, job); TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job); System.exit(job.waitForCompletion(true) ? 1 : 0); } public static class doMapper extends TableMapper<Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String rowValue = Bytes.toString(value.list().get(0).getValue()); context.write(new Text(rowValue), one); } } public static class doReducer extends TableReducer<Text, IntWritable, NullWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { System.out.println(key.toString()); int sum = 0; Iterator<IntWritable> haha = values.iterator(); while (haha.hasNext()) { sum += haha.next().get(); } Put put = new Put(Bytes.toBytes(key.toString())); put.add(Bytes.toBytes("mycolumnfamily"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum))); context.write(NullWritable.get(), put); } } public static void prepareTB2(String hbaseTableName) throws IOException{ HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName); HColumnDescriptor columnDesc = new HColumnDescriptor("mycolumnfamily"); tableDesc.addFamily(columnDesc); Configuration cfg = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(cfg); if (admin.tableExists(hbaseTableName)) { System.out.println("Table exists,trying drop and create!"); admin.disableTable(hbaseTableName); admin.deleteTable(hbaseTableName); admin.createTable(tableDesc); } else { System.out.println("create table: "+ hbaseTableName); admin.createTable(tableDesc); } } }
在Linux中執行該程式碼:
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/javac HBaseToHbase.java
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HBaseToHbase*class
[[email protected] q1]$ hadoop jar xx.jar HBaseToHbase
檢視mytb2表:
hbase(main):009:0> scan 'mytb2' ROW COLUMN+CELL hello hadoop column=mycolumnfamily:count, timestamp=1489817182454, value=2 hello hive column=mycolumnfamily:count, timestamp=1489817182454, value=1 hello world column=mycolumnfamily:count, timestamp=1489817182454, value=3 3 row(s) in 0.0260 seconds
二、從Hbase表1中讀取資料再把結果存Hdfs中
1.將表1的內容不統計輸出:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class HbaseToHdfs {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String tablename = "hello";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "h71");
Job job = new Job(conf, "WordCountHbaseReader");
job.setJarByClass(HbaseToHdfs.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, Text.class, job);
job.setReducerClass(WordCountHbaseReaderReduce.class);
FileOutputFormat.setOutputPath(job, new Path(args[0]));
MultipleOutputs.addNamedOutput(job, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class doMapper extends TableMapper<Text, Text>{
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
String rowValue = Bytes.toString(value.list().get(0).getValue());
context.write(new Text(rowValue), new Text("one"));
}
}
public static class WordCountHbaseReaderReduce extends Reducer<Text,Text,Text,NullWritable>{
private Text result = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
for(Text val:values){
result.set(val);
context.write(key, NullWritable.get());
}
}
}
}
在Linux中執行該程式碼:
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/javac HbaseToHdfs.java
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseToHdfs*class
[[email protected] q1]$ hadoop jar xx.jar HbaseToHdfs /output
注意:/output目錄不能存在,如果存在就刪除掉
[[email protected] q1]$ hadoop fs -ls /output
Found 2 items
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 14:28 /output/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 73 2017-03-18 14:28 /output/part-r-00000
[[email protected] q1]$ hadoop fs -cat /output/part-r-00000
hello hadoop
hello hadoop
hello hive
hello world
hello world
hello world
2.將表1的內容統計輸出:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class HbaseToHdfs1 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String tablename = "hello";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "h71");
Job job = new Job(conf, "WordCountHbaseReader");
job.setJarByClass(HbaseToHdfs1.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job);
job.setReducerClass(WordCountHbaseReaderReduce.class);
FileOutputFormat.setOutputPath(job, new Path(args[0]));
MultipleOutputs.addNamedOutput(job, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class doMapper extends TableMapper<Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
/*
String rowValue = Bytes.toString(value.list().get(0).getValue());
context.write(new Text(rowValue), one);
*/
String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" ");
for (String str: rowValue){
word.set(str);
context.write(word,one);
}
}
}
public static class WordCountHbaseReaderReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int total=0;
for(IntWritable val:values){
total++;
}
context.write(key, new IntWritable(total));
}
}
}
[[email protected] q1]$ hadoop fs -cat /output/part-r-00000
hadoop 2
hello 6
hive 1
world 3
三、讀取Hdfs檔案將統計結果存入到Hbase表中
建立檔案並上傳到Hdfs中:
[[email protected] q1]$ vi hello.txt
hello world
hello hadoop
hello hive
hello hadoop
hello world
hello world
[[email protected] q1]$ hadoop fs -mkdir /input
[[email protected] q1]$ hadoop fs -put hello.txt /input
java程式碼:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class HdfsToHBase {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable i = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String s[] = value.toString().trim().split("/n");
for (String m : s) {
context.write(new Text(m), i);
}
}
}
public static class Reduce extends TableReducer<Text, IntWritable, NullWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : values) {
sum += i.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
// 列族為cf,列為count,列值為數目
put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
context.write(NullWritable.get(), put);
}
}
public static void createHBaseTable(String tableName) throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
HColumnDescriptor col = new HColumnDescriptor("cf");
htd.addFamily(col);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "h71");
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tableName)) {
System.out.println("table exists, trying to recreate table......");
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
System.out.println("create new table:" + tableName);
admin.createTable(htd);
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//將結果存入hbase的表名
String tableName = "mytb2";
Configuration conf = new Configuration();
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
createHBaseTable(tableName);
String input = args[0];
Job job = new Job(conf, "WordCount table with " + input);
job.setJarByClass(HdfsToHBase.class);
job.setNumReduceTasks(3);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(input));
// FileInputFormat.setInputPaths(job, new Path(input)); //這種方法也可以
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/javac HdfsToHBase.java
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HdfsToHBase*class
[[email protected] q1]$ hadoop jar xx.jar HdfsToHBase /input/hello.txt
hbase(main):011:0> scan 'mytb2'
ROW COLUMN+CELL
hello hadoop column=cf:count, timestamp=1489819702236, value=2
hello hive column=cf:count, timestamp=1489819702236, value=1
hello world column=cf:count, timestamp=1489819704448, value=3
3 row(s) in 0.3260 seconds
四、從Hdfs到Hdfs(其實就是mapreduce的經典例子wordcount)
java程式碼:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HdfsToHdfs{
public static class WordCountMapper extends Mapper<Object,Text,Text,IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (String str: words){
word.set(str);
context.write(word,one);
}
}
}
public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int total=0;
for (IntWritable val : values){
total++;
}
context.write(key, new IntWritable(total));
}
}
public static void main (String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
job.setJarByClass(HdfsToHdfs.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/javac HdfsToHdfs.java
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HdfsToHdfs*class
[[email protected] q1]$ hadoop jar xx.jar HdfsToHdfs /input/hello.txt /output
[[email protected] q1]$ hadoop fs -cat /output/part-r-00000
hadoop 2
hello 6
hive 1
world 3
說明:我這個wordcount例子是hadoop2版本的,我的另一篇文章http://blog.csdn.net/m0_37739193/article/details/71132652裡的是hadoop1版本的例子,在hadoop0.20.0及以後同時包含了兩個版本的的API,所以兩個版本的程式碼都能執行
Hadoop MapReduce新舊API區別:
Hadoop的版本0.20.0包含有一個新的java MapReduce API,有時也稱為"上下文物件"(context object),旨在使API在今後更容易擴充套件。新的API 在型別上不相容先前的API,所以,需要重寫以前的應用程式才能使新的API發揮作用。
新的API傾向於使用抽象類,而不是介面,因為這更容易擴充套件。例如,可以無需修改類的實現而在抽象類中新增一個方法(即用預設的實現)。在新的API中, mapper和reducer現在都是抽象類。
--介面,嚴格的“協議約束”,只有方法宣告而沒有方法實現,要求所有實現類(抽象類除外)必須實現介面中的每個方法。
--抽象類,較寬鬆的“約束協議”,可為某些方法提供預設實現,而繼承類則可選擇是否重新實現這些方法。故而抽象類在類衍化方面更有優勢,即具有良好的向後相容性。
新的API放在org.apache.hadoop.mapreduce包(和子包)中。之前版本的API依舊放在org.apache.hadoop.mapred中。
新的API充分使用上下文物件,使使用者程式碼能與MapReduce系統通訊。例如,MapContext 基本具備了JobConf、OutputCollector和Reporter的功能。
新的API同時支援"推"(push)和"拉"(pull)式的迭代。這兩類API,均可以將鍵/值對記錄推給mapper,但除此之外,新的API也允許把記錄從map()方法中拉出。對reducer來說是一樣的。"拉"式處理資料的好處是可以實現資料的批量處理,而非逐條記錄地處理。
新增的API實現了配置的統一。舊API通過一個特殊的JobConf物件配置作業,該物件是Hadoop配置物件的一個擴充套件。在新的API中,我們丟棄這種區分,所有作業的配置均通過Configuration來完成。
新API中作業控制由Job類實現,而非JobClient類,新API中刪除了JobClient類。
輸出檔案的命名方式稍有不同。map的輸出檔名為part-m-nnnnn,而reduce的輸出為part-r-nnnnn(其中nnnnn表示分塊序號,為整數,且從0開始算。
將舊API寫的Mapper和Reducer類轉換為新API時,記住將map()和reduce()的簽名轉換為新形式。如果只是將類的繼承修改為對新的Mapper和Reducer類的繼承,編譯的時候也不會報錯或顯示警告資訊,因為新的Mapper和Reducer類同樣也提供了等價的map()和reduce()函式。但是,自己寫的mapper或reducer程式碼是不會被呼叫的,這會導致難以診斷的錯誤。