1. 程式人生 > >Hadoop詳解 ----------- shuffle原理、partitioner分割槽原理、Combiner程式設計、常見的MR演算法

Hadoop詳解 ----------- shuffle原理、partitioner分割槽原理、Combiner程式設計、常見的MR演算法

Partitioner程式設計

Partition簡介

shuffle是通過分割槽partitioner 分配給Reduce的 一個Reducer對應一個記錄檔案Partitioner是shuffle的一部分partitioner執行時機:在mapper執行完成,Reducer還沒有執行的時候,mapper的輸出就是partitioner的輸入 即<k2,v2>partitioner 分割槽主要是用來提高效率的 例如從全國基站的資料中查詢北京基站的資料,如果計算時不分割槽全國的資料都放在一起,查詢的時候就相當於全表掃描 效率非常低,如果在第一次進行Mapreducer計算的時候按照省市進行分割槽,每個城市的基站資料都儲存在對應的每個檔案,那麼下次再進行查詢的時候直接從北京分割槽裡直接查詢 效率很高。分割槽的依據是具體業務需求,可以按照省市分割槽,時間進行分割槽等。如果不手動進行分割槽,Hadoop有一個預設的分割槽規則
Partitioner是partitioner的基類,如果需要定製partitioner也需要繼承該類。HashPartitioner是mapreduce的預設partitioner。計算方法是which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到當前的目的reducer。

Partitioner原理

以上流程省略了shuffle的過程DataNode在此處用於下載jarNodeManager用於執行Yarn 由YarnChild執行Mapper或Reducer當啟動一個Reducer時會分配一個分割槽號 預設是按數字分割槽Partitioner是Shuffle的一部分,當Partition的返回值是N時 會將shuffle的結果輸出給對應的分割槽號為N的Reducer一個Reducer對應一個分割槽檔案 Reducer計算完畢後就會按照分割槽號寫入對應的分割槽檔案

Partitioner程式設計

① 先分析一下具體的業務邏輯,確定大概有多少個分割槽② 首先書寫一個類,它要繼承org.apache.hadoop.mapreduce.Partitioner這個類③ 重寫public int getPartition這個方法,根據具體邏輯,讀資料庫或者配置返回相同的數字④ 在main方法中設定Partioner的類,job.setPartitionerClass(DataPartitioner.class);⑤ 設定Reducer的數量,job.setNumReduceTasks(6);例項如下:日誌資料:HTTP_20130313143750.dat
  1. 1363157985066 13726230503 00-FD-07-A4-72-B8
    :CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
  2. 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
  3. 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
  4. 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
  5. 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視訊網站 15 12 1527 2106 200
  6. 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
  7. 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
  8. 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 資訊保安 20 20 3156 2936 200
  9. 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
  10. 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點統計 24 9 6960 690 200
  11. 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜尋引擎 28 27 3659 3538 200
  12. 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統計 3 3 1938 180 200
  13. 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
  14. 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
  15. 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200
  16. 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
  17. 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200
  18. 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜尋引擎 21 18 9531 2412 200
  19. 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜尋引擎 69 63 11058 48243 200
  20. 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
  21. 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
  22. 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
根據手機號的前幾位既可以查詢手機號和歸屬地或運營商的對映關係這種對映關係在實際開發中一般儲存在資料庫中,通過web專案的Service查詢資料庫得到需求:統計每個手機號的上行總流量,下行總流量,總流量,並按照手機號進行分割槽儲存。程式碼如下:DataBean(自定義Bean)
  1. package liuxun.hadoop.mr.dc;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.Writable;
  6. public class DataBean implements Writable {
  7. private String tel;
  8. private long upPayLoad;
  9. private long downPayLoad;
  10. private long totalPayLoad;
  11. public DataBean() {
  12. }
  13. public DataBean(String tel, long upPayLoad, long downPayLoad) {
  14. this.tel = tel;
  15. this.upPayLoad = upPayLoad;
  16. this.downPayLoad = downPayLoad;
  17. this.totalPayLoad = upPayLoad + downPayLoad;
  18. }
  19. @Override
  20. public String toString() {
  21. return this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.totalPayLoad;
  22. }
  23. public void readFields(DataInput in) throws IOException {
  24. this.tel = in.readUTF();
  25. this.upPayLoad = in.readLong();
  26. this.downPayLoad = in.readLong();
  27. this.totalPayLoad = in.readLong();
  28. }
  29. // 注意兩點:寫入的順序和寫入的型別
  30. public void write(DataOutput out) throws IOException {
  31. out.writeUTF(tel);
  32. out.writeLong(upPayLoad);
  33. out.writeLong(downPayLoad);
  34. out.writeLong(totalPayLoad);
  35. }
  36. public String getTel() {
  37. return tel;
  38. }
  39. public void setTel(String tel) {
  40. this.tel = tel;
  41. }
  42. public long getUpPayLoad() {
  43. return upPayLoad;
  44. }
  45. public void setUpPayLoad(long upPayLoad) {
  46. this.upPayLoad = upPayLoad;
  47. }
  48. public long getDownPayLoad() {
  49. return downPayLoad;
  50. }
  51. public void setDownPayLoad(long downPayLoad) {
  52. this.downPayLoad = downPayLoad;
  53. }
  54. public long getTotalPayLoad() {
  55. return totalPayLoad;
  56. }
  57. public void setTotalPayLoad(long totalPayLoad) {
  58. this.totalPayLoad = totalPayLoad;
  59. }
  60. }
DataCountPartition (編寫計算模型)
  1. package liuxun.hadoop.mr.dc;
  2. import java.io.IOException;
  3. import java.util.HashMap;
  4. import java.util.Map;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Partitioner;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. public class DataCountPartition {
  16. public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean> {
  17. @Override
  18. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  19. // accept
  20. String line = value.toString();
  21. // split
  22. String[] fields = line.split("\t");
  23. String tel = fields[1];
  24. long up = Long.parseLong(fields[8]);
  25. long down = Long.parseLong(fields[9]);
  26. DataBean bean = new DataBean(tel, up, down);
  27. // send
  28. context.write(new Text(tel), bean);
  29. }
  30. }
  31. public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean> {
  32. @Override
  33. protected void reduce(Text key, Iterable<DataBean> values, Context context)
  34. throws IOException, InterruptedException {
  35. long up_sum = 0;
  36. long down_sum = 0;
  37. for (DataBean bean : values) {
  38. up_sum += bean.getUpPayLoad();
  39. down_sum += bean.getDownPayLoad();
  40. }
  41. DataBean bean = new DataBean("", up_sum, down_sum);
  42. context.write(key, bean);
  43. }
  44. }
  45. public static class ProviderPartitioner extends Partitioner<Text, DataBean> {
  46. private static Map<String, Integer> prividerMap = new HashMap<String, Integer>();
  47. static {
  48. // 實際開發時是從資料庫載入這種對映關係的
  49. // 1:中國移動 2:中國聯通 3:中國電信
  50. prividerMap.put("135", 1);
  51. prividerMap.put("136", 1);
  52. prividerMap.put("137", 1);
  53. prividerMap.put("150", 2);
  54. prividerMap.put("159", 2);
  55. prividerMap.put("182", 3);
  56. prividerMap.put("183", 3);
  57. }
  58. // 此方法的返回值是分割槽號
  59. // key: mapper一次輸出的key 這裡是手機號
  60. // key: mapper一次輸出的Value 這裡是DataBean
  61. // numPartitions:分割槽數量,由Reducer的數量決定,啟動幾個Reducer就會有幾個partition
  62. @Override
  63. public int getPartition(Text key, DataBean value, int numPartitions) {
  64. // 根據手機號得到運營商 此處根據key進行分割槽,實際開發中也可以根據value進行分割槽
  65. String account = key.toString();
  66. String sub_acc = account.substring(0, 3);
  67. Integer code = prividerMap.get(sub_acc);
  68. if (code == null) {
  69. code =0;
  70. }
  71. return code;
  72. }
  73. }
  74. public static void main(String[] args) throws Exception {
  75. Configuration conf = new Configuration();
  76. Job job = Job.getInstance(conf);
  77. job.setJarByClass(DataCountPartition.class);
  78. job.setMapperClass(DCMapper.class);
  79. job.setMapOutputKeyClass(Text.class);
  80. job.setMapOutputValueClass(DataBean.class);
  81. FileInputFormat.setInputPaths(job, new Path(args[0]));
  82. job.setReducerClass(DCReducer.class);
  83. job.setOutputKeyClass(Text.class);
  84. job.setOutputValueClass(DataBean.class);
  85. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  86. job.setPartitionerClass(ProviderPartitioner.class);
  87. // 設定啟動Reducer的數量
  88. job.setNumReduceTasks(Integer.parseInt(args[2]));
  89. job.waitForCompletion(true);
  90. }
  91. }
① 首先將日誌資料上傳至HDFS  ② 將以上程式打包成WCP.jar  —>上傳至Linux主機—>hadoop jar  /日誌地址  /統計結果地址 /reducer數量hadoop fs -put HTTP_20130313143750.dat /log.txthadoop jar WCP.jar /log.txt /logResult 4檢視統計結果[[email protected] Desktop]# hadoop fs -ls /logResultFound 5 items-rw-r--r--   1 root supergroup          0 2017-08-31 19:02 /logResult/_SUCCESS-rw-r--r--   1 root supergroup        175 2017-08-31 19:02 /logResult/part-r-00000-rw-r--r--   1 root supergroup        241 2017-08-31 19:02 /logResult/part-r-00001-rw-r--r--   1 root supergroup         80 2017-08-31 19:02 /logResult/part-r-00002-rw-r--r--   1 root supergroup         55 2017-08-31 19:02 /logResult/part-r-00003[[email protected] Desktop]# hadoop fs -cat /logResult/part-r-0000013480253104     180     180     36013826544101     264     0       26413922314466     3008    3720    672813925057413     11058   48243   5930113926251106     240     0       24013926435656     132     1512    164484138413        4116    1432    5548[[email protected] Desktop]# hadoop fs -cat /logResult/part-r-0000113502468823     7335    110349  11768413560436666     1116    954     207013560439658     2034    5892    792613602846565     1938    2910    484813660577991     6960    690     765013719199419     240     0       24013726230503     2481    24681   2716213726238888     2481    24681   2716213760778710     120     120     240[[email protected] Desktop]# hadoop fs -cat /logResult/part-r-0000215013685858     3659    3538    719715920133257     3156    2936    609215989002119     1938    180     2118[[email protected] Desktop]# hadoop fs -cat /logResult/part-r-0000318211575961     1527    2106    363318320173382     9531    2412    11943注意:分割槽的程式不可在Eclipse上執行,因為在Eclipse上執行的是本地模式,始終只會啟動一個mapper和一個reducer 不能實現分割槽指定分割槽數如果小於寫入所需的最大分割槽數量 會丟擲異常如果大於寫入所需的最大分割槽數量  不會拋異常 但是多餘的分割槽不會儲存資料所以在指定分割槽的時候數量要大於或等於最所需的最大分割槽數量 

排序

如果沒有自定義排序規則則 如果k2的型別是Text預設按照k2的字典順序進行排序MapReducer 實現原理就是迭代式程式設計,如果一個MapReduce無法完成具體的需求,可以實現多個MapReduce,就是可以將一個MapReduce的輸出的內容作為中間結果作為另一個MapReducer的輸入如果要實現排序 引數中的Bean物件要實現WritableComparable介面 此介面是Writable的子介面注意:如果業務比較複雜,可以編寫多個MapReduce迭代程式設計處理例項:交易資訊trade_info
  1. [email protected] 6000 0 2014-02-20
  2. [email protected] 2000 0 2014-02-20
  3. [email protected] 0 100 2014-02-20
  4. [email protected] 3000 0 2014-02-20
  5. [email protected] 9000 0 2014-02-20
  6. [email protected] 0 200 2014-02-20
需求:將每個使用者的總支出、總收入以及總結餘統計出來,並進行排序,首先按照收入高低進行排序,收入相同的按照支出的多少進行排序程式碼編寫:① 自定義BeanInfo實現WritableComparable介面,並重寫compareTo方法和toString方法
  1. package liuxun.hadoop.mr.sort;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.WritableComparable;
  6. public class InfoBean implements WritableComparable<InfoBean> {
  7. private String account; // 賬號
  8. private double income; // 收入
  9. private double expenses;// 支出
  10. private double surplus; // 結餘
  11. public void set(String account,double income,double expenses) {
  12. this.account = account;
  13. this.income = income;
  14. this.expenses = expenses;
  15. this.surplus = this.income - this.expenses;
  16. }
  17. // 序列化
  18. public void write(DataOutput out) throws IOException {
  19. out.writeUTF(account);
  20. out.writeDouble(income);
  21. out.writeDouble(expenses);
  22. out.writeDouble(surplus);
  23. }
  24. // 反序列化
  25. public void readFields(DataInput in) throws IOException {
  26. this.account = in.readUTF();
  27. this.income = in.readDouble();
  28. this.expenses = in.readDouble();
  29. this.surplus = in.readDouble();
  30. }
  31. public int compareTo(InfoBean o) {
  32. if (this.income == o.getIncome()) {
  33. return this.expenses > o.getExpenses() ? 1 : -1;
  34. }else {
  35. return this.income > o.getIncome() ? -1 :1;
  36. }
  37. }
  38. public String getAccount() {
  39. return account;
  40. }
  41. public void setAccount(String account) {
  42. this.account = account;
  43. }
  44. public double getIncome() {
  45. return income;
  46. }
  47. public void setIncome(double income) {
  48. this.income = income;
  49. }
  50. public double getExpenses() {
  51. return expenses;
  52. }
  53. public void setExpenses(double expenses) {
  54. this.expenses = expenses;
  55. }
  56. public double getSurplus() {
  57. return surplus;
  58. }
  59. public void setSurplus(double surplus) {
  60. this.surplus = surplus;
  61. }
  62. // 注意:toString方法決定了Bean寫入檔案的順序
  63. @Override
  64. public String toString() {
  65. return income+"\t"+expenses+"\t"+surplus+"\t";
  66. }
  67. }
② 編寫MR進行統計(SumStep)
  1. package liuxun.hadoop.mr.sort;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. public class SumStep {
  13. public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean> {
  14. private Text k = new Text();
  15. private InfoBean v = new InfoBean();
  16. @Override
  17. protected void