1. 程式人生 > >MapReduce功能實現

MapReduce功能實現

MapReduce功能實現系列:

MapReduce功能實現一---Hbase和Hdfs之間資料相互轉換

MapReduce功能實現二---排序

MapReduce功能實現三---Top N

MapReduce功能實現四---小綜合(從hbase中讀取資料統計並在hdfs中降序輸出Top 3)

MapReduce功能實現五---去重(Distinct)、計數(Count)

MapReduce功能實現六---最大值(Max)、求和(Sum)、平均值(Avg)

MapReduce功能實現七---小綜合(多個job序列處理計算平均值)


MapReduce功能實現八---分割槽(Partition)

MapReduce功能實現九---Pv、Uv

MapReduce功能實現十---倒排索引(Inverted Index)

MapReduce功能實現十一---join


一、從Hbase表1中讀取資料再把統計結果存到表2

在Hbase中建立相應的表1:


  
  1. create 'hello', 'cf'
  2. put 'hello'
    , '1', 'cf:hui', 'hello world'
  3. put 'hello', '2', 'cf:hui', 'hello hadoop'
  4. put 'hello', '3', 'cf:hui', 'hello hive'
  5. put 'hello'
    , '4', 'cf:hui', 'hello hadoop'
  6. put 'hello', '5', 'cf:hui', 'hello world'
  7. put 'hello', '6', 'cf:hui', 'hello world'

java程式碼:

  
  1. import java.io.IOException;
  2. import java.util.Iterator;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.hbase.HBaseConfiguration;
  5. import org.apache.hadoop.hbase.HColumnDescriptor;
  6. import org.apache.hadoop.hbase.HTableDescriptor;
  7. import org.apache.hadoop.hbase.client.HBaseAdmin;
  8. import org.apache.hadoop.hbase.client.Put;
  9. import org.apache.hadoop.hbase.client.Result;
  10. import org.apache.hadoop.hbase.client.Scan;
  11. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  12. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  13. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  14. import org.apache.hadoop.hbase.mapreduce.TableReducer;
  15. import org.apache.hadoop.hbase.util.Bytes;
  16. import org.apache.hadoop.io.IntWritable;
  17. import org.apache.hadoop.io.NullWritable;
  18. import org.apache.hadoop.io.Text;
  19. import org.apache.hadoop.mapreduce.Job;
  20. public class HBaseToHbase {
  21. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  22. String hbaseTableName1 = "hello";
  23. String hbaseTableName2 = "mytb2";
  24. prepareTB2(hbaseTableName2);
  25. Configuration conf = new Configuration();
  26. Job job = Job.getInstance(conf);
  27. job.setJarByClass(HBaseToHbase.class);
  28. job.setJobName( "mrreadwritehbase");
  29. Scan scan = new Scan();
  30. scan.setCaching( 500);
  31. scan.setCacheBlocks( false);
  32. TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, IntWritable.class, job);
  33. TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job);
  34. System.exit(job.waitForCompletion( true) ? 1 : 0);
  35. }
  36. public static class doMapper extends TableMapper<Text, IntWritable>{
  37. private final static IntWritable one = new IntWritable( 1);
  38. @Override
  39. protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
  40. String rowValue = Bytes.toString(value.list().get( 0).getValue());
  41. context.write( new Text(rowValue), one);
  42. }
  43. }
  44. public static class doReducer extends TableReducer<Text, IntWritable, NullWritable>{
  45. @Override
  46. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  47. System.out.println(key.toString());
  48. int sum = 0;
  49. Iterator<IntWritable> haha = values.iterator();
  50. while (haha.hasNext()) {
  51. sum += haha.next().get();
  52. }
  53. Put put = new Put(Bytes.toBytes(key.toString()));
  54. put.add(Bytes.toBytes( "mycolumnfamily"), Bytes.toBytes( "count"), Bytes.toBytes(String.valueOf(sum)));
  55. context.write(NullWritable.get(), put);
  56. }
  57. }
  58. public static void prepareTB2(String hbaseTableName) throws IOException{
  59. HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName);
  60. HColumnDescriptor columnDesc = new HColumnDescriptor( "mycolumnfamily");
  61. tableDesc.addFamily(columnDesc);
  62. Configuration cfg = HBaseConfiguration.create();
  63. HBaseAdmin admin = new HBaseAdmin(cfg);
  64. if (admin.tableExists(hbaseTableName)) {
  65. System.out.println( "Table exists,trying drop and create!");
  66. admin.disableTable(hbaseTableName);
  67. admin.deleteTable(hbaseTableName);
  68. admin.createTable(tableDesc);
  69. } else {
  70. System.out.println( "create table: "+ hbaseTableName);
  71. admin.createTable(tableDesc);
  72. }
  73. }
  74. }

在Linux中執行該程式碼:

  
  1. [[email protected] q1]$ /usr/jdk1. 7.0_25/bin/javac HBaseToHbase.java
  2. [[email protected] q1]$ /usr/jdk1. 7.0_25/bin/jar cvf xx.jar HBaseToHbase* class
  3. [[email protected] q1]$ hadoop jar xx.jar HBaseToHbase

檢視mytb2表:

  
  1. hbase(main): 009: 0> scan 'mytb2'
  2. ROW COLUMN+CELL
  3. hello hadoop column=mycolumnfamily:count, timestamp= 1489817182454, value= 2
  4. hello hive column=mycolumnfamily:count, timestamp= 1489817182454, value= 1
  5. hello world column=mycolumnfamily:count, timestamp= 1489817182454, value= 3
  6. 3 row(s) in 0.0260 seconds

二、從Hbase表1中讀取資料再把結果存Hdfs中

1.將表1的內容不統計輸出:


  
  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.hbase.HBaseConfiguration;
  5. import org.apache.hadoop.hbase.client.Result;
  6. import org.apache.hadoop.hbase.client.Scan;
  7. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  8. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  9. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  10. import org.apache.hadoop.hbase.util.Bytes;
  11. import org.apache.hadoop.io.NullWritable;
  12. import org.apache.hadoop.io.Text;
  13. import org.apache.hadoop.io.Writable;
  14. import org.apache.hadoop.io.WritableComparable;
  15. import org.apache.hadoop.mapreduce.Job;
  16. import org.apache.hadoop.mapreduce.Reducer;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
  19. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  20. public class HbaseToHdfs {
  21. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  22. String tablename = "hello";
  23. Configuration conf = HBaseConfiguration.create();
  24. conf.set( "hbase.zookeeper.quorum", "h71");
  25. Job job = new Job(conf, "WordCountHbaseReader");
  26. job.setJarByClass(HbaseToHdfs.class);
  27. Scan scan = new Scan();
  28. TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, Text.class, job);
  29. job.setReducerClass(WordCountHbaseReaderReduce.class);
  30. FileOutputFormat.setOutputPath(job, new Path(args[ 0]));
  31. MultipleOutputs.addNamedOutput(job, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
  32. System.exit(job.waitForCompletion( true) ? 0 : 1);
  33. }
  34. public static class doMapper extends TableMapper<Text, Text>{
  35. @Override
  36. protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
  37. String rowValue = Bytes.toString(value.list().get( 0).getValue());
  38. context.write( new Text(rowValue), new Text( "one"));
  39. }
  40. }
  41. public static class WordCountHbaseReaderReduce extends Reducer<Text,Text,Text,NullWritable>{
  42. private Text result = new Text();
  43. @Override
  44. protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
  45. for(Text val:values){
  46. result.set(val);
  47. context.write(key, NullWritable.get());
  48. }
  49. }
  50. }
  51. }

在Linux中執行該程式碼:
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/javac HbaseToHdfs.java
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseToHdfs*class
[[email protected] q1]$ hadoop jar xx.jar HbaseToHdfs /output
注意:/output目錄不能存在,如果存在就刪除掉


[[email protected] q1]$ hadoop fs -ls /output
Found 2 items
-rw-r--r--   2 hadoop supergroup          0 2017-03-18 14:28 /output/_SUCCESS
-rw-r--r--   2 hadoop supergroup         73 2017-03-18 14:28 /output/part-r-00000
[[email protected] q1]$ hadoop fs -cat /output/part-r-00000
hello hadoop
hello hadoop
hello hive
hello world
hello world
hello world


2.將表1的內容統計輸出:


  
  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.hbase.HBaseConfiguration;
  5. import org.apache.hadoop.hbase.client.Result;
  6. import org.apache.hadoop.hbase.client.Scan;
  7. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  8. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  9. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  10. import org.apache.hadoop.hbase.util.Bytes;
  11. import org.apache.hadoop.io.IntWritable;
  12. import org.apache.hadoop.io.Text;
  13. import org.apache.hadoop.io.Writable;
  14. import org.apache.hadoop.io.WritableComparable;
  15. import org.apache.hadoop.mapreduce.Job;
  16. import org.apache.hadoop.mapreduce.Reducer;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
  19. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  20. public class HbaseToHdfs1 {
  21. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  22. String tablename = "hello";
  23. Configuration conf = HBaseConfiguration.create();
  24. conf.set( "hbase.zookeeper.quorum", "h71");
  25. Job job = new Job(conf, "WordCountHbaseReader");
  26. job.setJarByClass(HbaseToHdfs1.class);
  27. Scan scan = new Scan();
  28. TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job);
  29. job.setReducerClass(WordCountHbaseReaderReduce.class);
  30. FileOutputFormat.setOutputPath(job, new Path(args[ 0]));
  31. MultipleOutputs.addNamedOutput(job, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
  32. System.exit(job.waitForCompletion( true) ? 0 : 1);
  33. }
  34. public static class doMapper extends TableMapper<Text, IntWritable>{
  35. private final static IntWritable one = new IntWritable( 1);
  36. private Text word = new Text();
  37. @Override
  38. protected void map(Immutabl