1. 程式人生 > >MapReduce功能實現一---Hbase和Hdfs之間資料相互轉換

MapReduce功能實現一---Hbase和Hdfs之間資料相互轉換

MapReduce功能實現系列:

一、從Hbase表1中讀取資料再把統計結果存到表2

在Hbase中建立相應的表1:

create 'hello','cf'
put 'hello','1','cf:hui','hello world'
put 'hello','2','cf:hui','hello hadoop'
put 'hello','3','cf:hui','hello hive'
put 'hello','4','cf:hui','hello hadoop'
put 'hello','5','cf:hui','hello world'
put 'hello','6','cf:hui','hello world'

java程式碼:
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class HBaseToHbase {  
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
		String hbaseTableName1 = "hello";  
		String hbaseTableName2 = "mytb2";  
		  
		prepareTB2(hbaseTableName2);  
		  
		Configuration conf = new Configuration();  
		  
		Job job = Job.getInstance(conf);  
		job.setJarByClass(HBaseToHbase.class);  
		job.setJobName("mrreadwritehbase");  
		  
		Scan scan = new Scan();  
		scan.setCaching(500);  
		scan.setCacheBlocks(false);  
		  
		TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, IntWritable.class, job);  
		TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job);  
		System.exit(job.waitForCompletion(true) ? 1 : 0);  
	}  
	  
	public static class doMapper extends TableMapper<Text, IntWritable>{  
		private final static IntWritable one = new IntWritable(1);  
		@Override  
		protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {  
			String rowValue = Bytes.toString(value.list().get(0).getValue()); 
			context.write(new Text(rowValue), one);  
		}  
	}  
	  
	public static class doReducer extends TableReducer<Text, IntWritable, NullWritable>{  
		@Override  
		protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {  
			System.out.println(key.toString());  
			int sum = 0;  
			Iterator<IntWritable> haha = values.iterator();  
			while (haha.hasNext()) {  
				sum += haha.next().get();  
			}  
			Put put = new Put(Bytes.toBytes(key.toString()));  
			put.add(Bytes.toBytes("mycolumnfamily"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));               
			context.write(NullWritable.get(), put);  
		}  
	}  
	  
	public static void prepareTB2(String hbaseTableName) throws IOException{  
		HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName);  
		HColumnDescriptor columnDesc = new HColumnDescriptor("mycolumnfamily");  
		tableDesc.addFamily(columnDesc);  
		Configuration  cfg = HBaseConfiguration.create();  
		HBaseAdmin admin = new HBaseAdmin(cfg);  
		if (admin.tableExists(hbaseTableName)) {  
			System.out.println("Table exists,trying drop and create!");  
			admin.disableTable(hbaseTableName);  
			admin.deleteTable(hbaseTableName);  
			admin.createTable(tableDesc);  
		} else {  
			System.out.println("create table: "+ hbaseTableName);  
			admin.createTable(tableDesc);  
		}  
	}  
}  

在Linux中執行該程式碼:
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/javac HBaseToHbase.java 
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HBaseToHbase*class
[[email protected] q1]$ hadoop jar xx.jar HBaseToHbase

檢視mytb2表:
hbase(main):009:0> scan 'mytb2'
ROW                                                          COLUMN+CELL                                                                                                                                                                     
 hello hadoop                                                column=mycolumnfamily:count, timestamp=1489817182454, value=2                                                                                                                   
 hello hive                                                  column=mycolumnfamily:count, timestamp=1489817182454, value=1                                                                                                                   
 hello world                                                 column=mycolumnfamily:count, timestamp=1489817182454, value=3                                                                                                                   
3 row(s) in 0.0260 seconds

二、從Hbase表1中讀取資料再把結果存Hdfs中

1.將表1的內容不統計輸出:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class HbaseToHdfs {  
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
		
	  String tablename = "hello";
	  Configuration conf = HBaseConfiguration.create();
	  conf.set("hbase.zookeeper.quorum", "h71");
	  Job job = new Job(conf, "WordCountHbaseReader");
	  job.setJarByClass(HbaseToHdfs.class);
	  Scan scan = new Scan();
	  TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, Text.class, job);
	  job.setReducerClass(WordCountHbaseReaderReduce.class);
	  FileOutputFormat.setOutputPath(job, new Path(args[0]));
	  MultipleOutputs.addNamedOutput(job, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
	  System.exit(job.waitForCompletion(true) ? 0 : 1);
	}  
	  
	public static class doMapper extends TableMapper<Text, Text>{  
		@Override  
		protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {  
			String rowValue = Bytes.toString(value.list().get(0).getValue());
			context.write(new Text(rowValue), new Text("one"));  
		}  
	}  
	
	public static class WordCountHbaseReaderReduce extends Reducer<Text,Text,Text,NullWritable>{
		private Text result = new Text();
		@Override
	  protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
			for(Text val:values){
				result.set(val);
				context.write(key, NullWritable.get());
			}
		}
	}
}  

在Linux中執行該程式碼:
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/javac HbaseToHdfs.java
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseToHdfs*class
[[email protected] q1]$ hadoop jar xx.jar HbaseToHdfs /output
注意:/output目錄不能存在,如果存在就刪除掉

[[email protected] q1]$ hadoop fs -ls /output
Found 2 items
-rw-r--r--   2 hadoop supergroup          0 2017-03-18 14:28 /output/_SUCCESS
-rw-r--r--   2 hadoop supergroup         73 2017-03-18 14:28 /output/part-r-00000
[[email protected] q1]$ hadoop fs -cat /output/part-r-00000
hello hadoop
hello hadoop
hello hive
hello world
hello world
hello world

2.將表1的內容統計輸出:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class HbaseToHdfs1 {  
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
		
	  String tablename = "hello";
	  Configuration conf = HBaseConfiguration.create();
	  conf.set("hbase.zookeeper.quorum", "h71");
	  Job job = new Job(conf, "WordCountHbaseReader");
	  job.setJarByClass(HbaseToHdfs1.class);
	  Scan scan = new Scan();
	  TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job);
	  job.setReducerClass(WordCountHbaseReaderReduce.class);
	  FileOutputFormat.setOutputPath(job, new Path(args[0]));
	  MultipleOutputs.addNamedOutput(job, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
	  System.exit(job.waitForCompletion(true) ? 0 : 1);
	}  
	  
	public static class doMapper extends TableMapper<Text, IntWritable>{  
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text(); 
		@Override  
		protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { 
		/*
		String rowValue = Bytes.toString(value.list().get(0).getValue());
	  		context.write(new Text(rowValue), one);
	  	*/
	  	String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" ");
	 		for (String str: rowValue){
			 word.set(str);
			 context.write(word,one);
			}
		}  
	}  
	
	public static class WordCountHbaseReaderReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
	  @Override
		public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
	  	int total=0;
	  		for(IntWritable val:values){
	  	  	total++;
	  	  }
	  	context.write(key, new IntWritable(total));
	  }
	}
}  

[[email protected] q1]$ hadoop fs -cat /output/part-r-00000
hadoop  2
hello   6
hive    1
world   3

三、讀取Hdfs檔案將統計結果存入到Hbase表中

建立檔案並上傳到Hdfs中:

[[email protected] q1]$ vi hello.txt
hello world
hello hadoop
hello hive
hello hadoop
hello world
hello world
[[email protected] q1]$ hadoop fs -mkdir /input
[[email protected] q1]$ hadoop fs -put hello.txt /input

java程式碼:
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class HdfsToHBase {

	public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
		private IntWritable i = new IntWritable(1);
		@Override
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String s[] = value.toString().trim().split("/n");
			for (String m : s) {
				context.write(new Text(m), i);
			}
		}
	}
	
	public static class Reduce extends TableReducer<Text, IntWritable, NullWritable> {
		@Override
	  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable i : values) {
				sum += i.get();
			}
			Put put = new Put(Bytes.toBytes(key.toString()));
			// 列族為cf,列為count,列值為數目
			put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
			context.write(NullWritable.get(), put);
		}
	}
	
	public static void createHBaseTable(String tableName) throws IOException {
		HTableDescriptor htd = new HTableDescriptor(tableName);
		HColumnDescriptor col = new HColumnDescriptor("cf");
		htd.addFamily(col);
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", "h71");
		HBaseAdmin admin = new HBaseAdmin(conf);
		if (admin.tableExists(tableName)) {
			System.out.println("table exists, trying to recreate table......");
			admin.disableTable(tableName);
			admin.deleteTable(tableName);
		}
		System.out.println("create new table:" + tableName);
		admin.createTable(htd);
	}
	
	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		//將結果存入hbase的表名
		String tableName = "mytb2";
		Configuration conf = new Configuration();
		conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
		createHBaseTable(tableName);
		String input = args[0];
		Job job = new Job(conf, "WordCount table with " + input);
		job.setJarByClass(HdfsToHBase.class);
		job.setNumReduceTasks(3);
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TableOutputFormat.class);
		FileInputFormat.addInputPath(job, new Path(input));
//	FileInputFormat.setInputPaths(job, new Path(input));	//這種方法也可以
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

[[email protected] q1]$ /usr/jdk1.7.0_25/bin/javac HdfsToHBase.java
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HdfsToHBase*class
[[email protected] q1]$ hadoop jar xx.jar HdfsToHBase /input/hello.txt
hbase(main):011:0> scan 'mytb2'
ROW                                                          COLUMN+CELL                                                                                                                                                                     
 hello hadoop                                                column=cf:count, timestamp=1489819702236, value=2                                                                                                                               
 hello hive                                                  column=cf:count, timestamp=1489819702236, value=1                                                                                                                               
 hello world                                                 column=cf:count, timestamp=1489819704448, value=3                                                                                                                               
3 row(s) in 0.3260 seconds

四、從Hdfs到Hdfs(其實就是mapreduce的經典例子wordcount)

java程式碼:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class HdfsToHdfs{
	
	public static class WordCountMapper extends Mapper<Object,Text,Text,IntWritable>{
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();
		@Override
		public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
		  String[] words = value.toString().split(" ");
		  for (String str: words){
				word.set(str);
				context.write(word,one);
		  }
		}
	}     
	
	public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
	  @Override
	  public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
			int total=0;
			for (IntWritable val : values){
			  total++;
			}
			context.write(key, new IntWritable(total));
		}   
	}
	 
	public static void main (String[] args) throws Exception{
	  Configuration conf = new Configuration();
	
	  Job job = new Job(conf, "word count");
	  job.setJarByClass(HdfsToHdfs.class);
	  job.setMapperClass(WordCountMapper.class);
	  job.setReducerClass(WordCountReducer.class);
	  job.setOutputKeyClass(Text.class);
	  job.setOutputValueClass(IntWritable.class);
	  
	  FileInputFormat.setInputPaths(job, new Path(args[0]));
	  FileOutputFormat.setOutputPath(job, new Path(args[1]));
	  System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

[[email protected] q1]$ /usr/jdk1.7.0_25/bin/javac HdfsToHdfs.java
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HdfsToHdfs*class
[[email protected] q1]$ hadoop jar xx.jar HdfsToHdfs /input/hello.txt /output

[[email protected] q1]$ hadoop fs -cat /output/part-r-00000
hadoop  2
hello   6
hive    1
world   3

說明:我這個wordcount例子是hadoop2版本的,我的另一篇文章http://blog.csdn.net/m0_37739193/article/details/71132652裡的是hadoop1版本的例子,在hadoop0.20.0及以後同時包含了兩個版本的的API,所以兩個版本的程式碼都能執行

Hadoop MapReduce新舊API區別:
        Hadoop的版本0.20.0包含有一個新的java MapReduce API,有時也稱為"上下文物件"(context object),旨在使API在今後更容易擴充套件。新的API 在型別上不相容先前的API,所以,需要重寫以前的應用程式才能使新的API發揮作用。
        新的API傾向於使用抽象類,而不是介面,因為這更容易擴充套件。例如,可以無需修改類的實現而在抽象類中新增一個方法(即用預設的實現)。在新的API中, mapper和reducer現在都是抽象類。

--介面,嚴格的“協議約束”,只有方法宣告而沒有方法實現,要求所有實現類(抽象類除外)必須實現介面中的每個方法。

--抽象類,較寬鬆的“約束協議”,可為某些方法提供預設實現,而繼承類則可選擇是否重新實現這些方法。故而抽象類在類衍化方面更有優勢,即具有良好的向後相容性。
        新的API放在org.apache.hadoop.mapreduce包(和子包)中。之前版本的API依舊放在org.apache.hadoop.mapred中。
        新的API充分使用上下文物件,使使用者程式碼能與MapReduce系統通訊。例如,MapContext 基本具備了JobConf、OutputCollector和Reporter的功能。
        新的API同時支援"推"(push)和"拉"(pull)式的迭代。這兩類API,均可以將鍵/值對記錄推給mapper,但除此之外,新的API也允許把記錄從map()方法中拉出。對reducer來說是一樣的。"拉"式處理資料的好處是可以實現資料的批量處理,而非逐條記錄地處理。
        新增的API實現了配置的統一。舊API通過一個特殊的JobConf物件配置作業,該物件是Hadoop配置物件的一個擴充套件。在新的API中,我們丟棄這種區分,所有作業的配置均通過Configuration來完成。
        新API中作業控制由Job類實現,而非JobClient類,新API中刪除了JobClient類。
        輸出檔案的命名方式稍有不同。map的輸出檔名為part-m-nnnnn,而reduce的輸出為part-r-nnnnn(其中nnnnn表示分塊序號,為整數,且從0開始算。
        將舊API寫的Mapper和Reducer類轉換為新API時,記住將map()和reduce()的簽名轉換為新形式。如果只是將類的繼承修改為對新的Mapper和Reducer類的繼承,編譯的時候也不會報錯或顯示警告資訊,因為新的Mapper和Reducer類同樣也提供了等價的map()和reduce()函式。但是,自己寫的mapper或reducer程式碼是不會被呼叫的,這會導致難以診斷的錯誤。