Hadoop詳解 ----------- shuffle原理、partitioner分割槽原理、Combiner程式設計、常見的MR演算法
阿新 • • 發佈:2018-12-20
Partitioner程式設計
Partition簡介
shuffle是通過分割槽partitioner 分配給Reduce的 一個Reducer對應一個記錄檔案Partitioner是shuffle的一部分partitioner執行時機:在mapper執行完成,Reducer還沒有執行的時候,mapper的輸出就是partitioner的輸入 即<k2,v2>partitioner 分割槽主要是用來提高效率的 例如從全國基站的資料中查詢北京基站的資料,如果計算時不分割槽全國的資料都放在一起,查詢的時候就相當於全表掃描 效率非常低,如果在第一次進行Mapreducer計算的時候按照省市進行分割槽,每個城市的基站資料都儲存在對應的每個檔案,那麼下次再進行查詢的時候直接從北京分割槽裡直接查詢 效率很高。分割槽的依據是具體業務需求,可以按照省市分割槽,時間進行分割槽等。如果不手動進行分割槽,Hadoop有一個預設的分割槽規則Partitioner原理
以上流程省略了shuffle的過程DataNode在此處用於下載jarNodeManager用於執行Yarn 由YarnChild執行Mapper或Reducer當啟動一個Reducer時會分配一個分割槽號 預設是按數字分割槽Partitioner是Shuffle的一部分,當Partition的返回值是N時 會將shuffle的結果輸出給對應的分割槽號為N的Reducer一個Reducer對應一個分割槽檔案 Reducer計算完畢後就會按照分割槽號寫入對應的分割槽檔案Partitioner程式設計
① 先分析一下具體的業務邏輯,確定大概有多少個分割槽② 首先書寫一個類,它要繼承org.apache.hadoop.mapreduce.Partitioner這個類③ 重寫public int getPartition這個方法,根據具體邏輯,讀資料庫或者配置返回相同的數字④ 在main方法中設定Partioner的類,job.setPartitionerClass(DataPartitioner.class);⑤ 設定Reducer的數量,job.setNumReduceTasks(6);例項如下:日誌資料:HTTP_20130313143750.dat- 1363157985066 13726230503 00-FD-07-A4-72-B8
:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 - 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
- 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
- 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
- 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視訊網站 15 12 1527 2106 200
- 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
- 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
- 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 資訊保安 20 20 3156 2936 200
- 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
- 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點統計 24 9 6960 690 200
- 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜尋引擎 28 27 3659 3538 200
- 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統計 3 3 1938 180 200
- 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
- 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
- 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200
- 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
- 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200
- 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜尋引擎 21 18 9531 2412 200
- 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜尋引擎 69 63 11058 48243 200
- 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
- 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
- 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
根據手機號的前幾位既可以查詢手機號和歸屬地或運營商的對映關係這種對映關係在實際開發中一般儲存在資料庫中,通過web專案的Service查詢資料庫得到需求:統計每個手機號的上行總流量,下行總流量,總流量,並按照手機號進行分割槽儲存。程式碼如下:DataBean(自定義Bean)- package liuxun.hadoop.mr.dc;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import org.apache.hadoop.io.Writable;
- public class DataBean implements Writable {
- private String tel;
- private long upPayLoad;
- private long downPayLoad;
- private long totalPayLoad;
- public DataBean() {
- }
- public DataBean(String tel, long upPayLoad, long downPayLoad) {
- this.tel = tel;
- this.upPayLoad = upPayLoad;
- this.downPayLoad = downPayLoad;
- this.totalPayLoad = upPayLoad + downPayLoad;
- }
- @Override
- public String toString() {
- return this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.totalPayLoad;
- }
- public void readFields(DataInput in) throws IOException {
- this.tel = in.readUTF();
- this.upPayLoad = in.readLong();
- this.downPayLoad = in.readLong();
- this.totalPayLoad = in.readLong();
- }
- // 注意兩點:寫入的順序和寫入的型別
- public void write(DataOutput out) throws IOException {
- out.writeUTF(tel);
- out.writeLong(upPayLoad);
- out.writeLong(downPayLoad);
- out.writeLong(totalPayLoad);
- }
- public String getTel() {
- return tel;
- }
- public void setTel(String tel) {
- this.tel = tel;
- }
- public long getUpPayLoad() {
- return upPayLoad;
- }
- public void setUpPayLoad(long upPayLoad) {
- this.upPayLoad = upPayLoad;
- }
- public long getDownPayLoad() {
- return downPayLoad;
- }
- public void setDownPayLoad(long downPayLoad) {
- this.downPayLoad = downPayLoad;
- }
- public long getTotalPayLoad() {
- return totalPayLoad;
- }
- public void setTotalPayLoad(long totalPayLoad) {
- this.totalPayLoad = totalPayLoad;
- }
- }
DataCountPartition (編寫計算模型)- package liuxun.hadoop.mr.dc;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Partitioner;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class DataCountPartition {
- public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean> {
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- // accept
- String line = value.toString();
- // split
- String[] fields = line.split("\t");
- String tel = fields[1];
- long up = Long.parseLong(fields[8]);
- long down = Long.parseLong(fields[9]);
- DataBean bean = new DataBean(tel, up, down);
- // send
- context.write(new Text(tel), bean);
- }
- }
- public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean> {
- @Override
- protected void reduce(Text key, Iterable<DataBean> values, Context context)
- throws IOException, InterruptedException {
- long up_sum = 0;
- long down_sum = 0;
- for (DataBean bean : values) {
- up_sum += bean.getUpPayLoad();
- down_sum += bean.getDownPayLoad();
- }
- DataBean bean = new DataBean("", up_sum, down_sum);
- context.write(key, bean);
- }
- }
- public static class ProviderPartitioner extends Partitioner<Text, DataBean> {
- private static Map<String, Integer> prividerMap = new HashMap<String, Integer>();
- static {
- // 實際開發時是從資料庫載入這種對映關係的
- // 1:中國移動 2:中國聯通 3:中國電信
- prividerMap.put("135", 1);
- prividerMap.put("136", 1);
- prividerMap.put("137", 1);
- prividerMap.put("150", 2);
- prividerMap.put("159", 2);
- prividerMap.put("182", 3);
- prividerMap.put("183", 3);
- }
- // 此方法的返回值是分割槽號
- // key: mapper一次輸出的key 這裡是手機號
- // key: mapper一次輸出的Value 這裡是DataBean
- // numPartitions:分割槽數量,由Reducer的數量決定,啟動幾個Reducer就會有幾個partition
- @Override
- public int getPartition(Text key, DataBean value, int numPartitions) {
- // 根據手機號得到運營商 此處根據key進行分割槽,實際開發中也可以根據value進行分割槽
- String account = key.toString();
- String sub_acc = account.substring(0, 3);
- Integer code = prividerMap.get(sub_acc);
- if (code == null) {
- code =0;
- }
- return code;
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
- job.setJarByClass(DataCountPartition.class);
- job.setMapperClass(DCMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(DataBean.class);
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- job.setReducerClass(DCReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(DataBean.class);
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setPartitionerClass(ProviderPartitioner.class);
- // 設定啟動Reducer的數量
- job.setNumReduceTasks(Integer.parseInt(args[2]));
- job.waitForCompletion(true);
- }
- }
① 首先將日誌資料上傳至HDFS ② 將以上程式打包成WCP.jar —>上傳至Linux主機—>hadoop jar /日誌地址 /統計結果地址 /reducer數量hadoop fs -put HTTP_20130313143750.dat /log.txthadoop jar WCP.jar /log.txt /logResult 4檢視統計結果[[email protected] Desktop]# hadoop fs -ls /logResultFound 5 items-rw-r--r-- 1 root supergroup 0 2017-08-31 19:02 /logResult/_SUCCESS-rw-r--r-- 1 root supergroup 175 2017-08-31 19:02 /logResult/part-r-00000-rw-r--r-- 1 root supergroup 241 2017-08-31 19:02 /logResult/part-r-00001-rw-r--r-- 1 root supergroup 80 2017-08-31 19:02 /logResult/part-r-00002-rw-r--r-- 1 root supergroup 55 2017-08-31 19:02 /logResult/part-r-00003[[email protected] Desktop]# hadoop fs -cat /logResult/part-r-0000013480253104 180 180 36013826544101 264 0 26413922314466 3008 3720 672813925057413 11058 48243 5930113926251106 240 0 24013926435656 132 1512 164484138413 4116 1432 5548[[email protected] Desktop]# hadoop fs -cat /logResult/part-r-0000113502468823 7335 110349 11768413560436666 1116 954 207013560439658 2034 5892 792613602846565 1938 2910 484813660577991 6960 690 765013719199419 240 0 24013726230503 2481 24681 2716213726238888 2481 24681 2716213760778710 120 120 240[[email protected] Desktop]# hadoop fs -cat /logResult/part-r-0000215013685858 3659 3538 719715920133257 3156 2936 609215989002119 1938 180 2118[[email protected] Desktop]# hadoop fs -cat /logResult/part-r-0000318211575961 1527 2106 363318320173382 9531 2412 11943注意:分割槽的程式不可在Eclipse上執行,因為在Eclipse上執行的是本地模式,始終只會啟動一個mapper和一個reducer 不能實現分割槽指定分割槽數如果小於寫入所需的最大分割槽數量 會丟擲異常如果大於寫入所需的最大分割槽數量 不會拋異常 但是多餘的分割槽不會儲存資料所以在指定分割槽的時候數量要大於或等於最所需的最大分割槽數量 排序
如果沒有自定義排序規則則 如果k2的型別是Text預設按照k2的字典順序進行排序MapReducer 實現原理就是迭代式程式設計,如果一個MapReduce無法完成具體的需求,可以實現多個MapReduce,就是可以將一個MapReduce的輸出的內容作為中間結果作為另一個MapReducer的輸入如果要實現排序 引數中的Bean物件要實現WritableComparable介面 此介面是Writable的子介面注意:如果業務比較複雜,可以編寫多個MapReduce迭代程式設計處理例項:交易資訊trade_info- [email protected] 6000 0 2014-02-20
- [email protected] 2000 0 2014-02-20
- [email protected] 0 100 2014-02-20
- [email protected] 3000 0 2014-02-20
- [email protected] 9000 0 2014-02-20
- [email protected] 0 200 2014-02-20
需求:將每個使用者的總支出、總收入以及總結餘統計出來,並進行排序,首先按照收入高低進行排序,收入相同的按照支出的多少進行排序程式碼編寫:① 自定義BeanInfo實現WritableComparable介面,並重寫compareTo方法和toString方法- package liuxun.hadoop.mr.sort;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import org.apache.hadoop.io.WritableComparable;
- public class InfoBean implements WritableComparable<InfoBean> {
- private String account; // 賬號
- private double income; // 收入
- private double expenses;// 支出
- private double surplus; // 結餘
- public void set(String account,double income,double expenses) {
- this.account = account;
- this.income = income;
- this.expenses = expenses;
- this.surplus = this.income - this.expenses;
- }
- // 序列化
- public void write(DataOutput out) throws IOException {
- out.writeUTF(account);
- out.writeDouble(income);
- out.writeDouble(expenses);
- out.writeDouble(surplus);
- }
- // 反序列化
- public void readFields(DataInput in) throws IOException {
- this.account = in.readUTF();
- this.income = in.readDouble();
- this.expenses = in.readDouble();
- this.surplus = in.readDouble();
- }
- public int compareTo(InfoBean o) {
- if (this.income == o.getIncome()) {
- return this.expenses > o.getExpenses() ? 1 : -1;
- }else {
- return this.income > o.getIncome() ? -1 :1;
- }
- }
- public String getAccount() {
- return account;
- }
- public void setAccount(String account) {
- this.account = account;
- }
- public double getIncome() {
- return income;
- }
- public void setIncome(double income) {
- this.income = income;
- }
- public double getExpenses() {
- return expenses;
- }
- public void setExpenses(double expenses) {
- this.expenses = expenses;
- }
- public double getSurplus() {
- return surplus;
- }
- public void setSurplus(double surplus) {
- this.surplus = surplus;
- }
- // 注意:toString方法決定了Bean寫入檔案的順序
- @Override
- public String toString() {
- return income+"\t"+expenses+"\t"+surplus+"\t";
- }
- }
② 編寫MR進行統計(SumStep)- package liuxun.hadoop.mr.sort;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class SumStep {
- public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean> {
- private Text k = new Text();
- private InfoBean v = new InfoBean();
- @Override
- protected void