1. 程式人生 > >(防坑筆記)hadoop3.0 (三) MapReduce流程及序列化、偏移值(MapReduce)

(防坑筆記)hadoop3.0 (三) MapReduce流程及序列化、偏移值(MapReduce)

防坑留言:

一種將資料量分成小塊計算後再彙總的一種方式吧,

基本理解

        一張圖簡單構建MapReduce的基本思路


map():相當於分解任務的集合吧

reduce(): 相當於對分解任務運算結果的彙總 

以上的兩種函式的形參都是K/V結構

Mapper的任務過程


(其中的mappe任務是一個java程序)

MapReduce執行的時候,通過Mapper執行的任務讀取HDFS中的資料檔案,然後呼叫自己的方法,處理資料,最後輸出。Reducer任務會接收Mapper任務輸出的資料,作為自己的輸入資料,呼叫自己的方法,最後輸出到HDFS的檔案中。

一共是如下6階段

1.第一階段是把輸入檔案按照一定的標準分片(InputSplit),每個輸入片的大小是固定的。預設情況下,輸入片(InputSplit)的大小與資料塊(Block)的大小是相同的。如果資料塊(Block)的大小是預設值64MB,輸入檔案有兩個,一個是32MB,一個是72MB,那麼小的檔案是一個輸入片,大檔案會分為兩個資料塊64MB和8MB,一共產生三個輸入片。每一個輸入片由一個Mapper程序處理。這裡的三個輸入片,會有三個Mapper程序處理。

2.第二階段是對輸入片中的記錄按照一定的規則解析成鍵值對。有個預設規則是把每一行文字內容解析成鍵值對。“鍵”是每一行的起始位置(單位是位元組),“值”是本行的文字內容。

3.第三階段是呼叫Mapper類中的map方法。第二階段中解析出來的每一個鍵值對,呼叫一次map方法。如果有1000個鍵值對,就會呼叫1000次map方法。每一次呼叫map方法會輸出零個或者多個鍵值對。

4.第四階段是按照一定的規則對第三階段輸出的鍵值對進行分割槽。比較是基於鍵進行的。比如我們的鍵表示省份(如北京、上海、山東等),那麼就可以按照不同省份進行分割槽,同一個省份的鍵值對劃分到一個區中。預設是隻有一個區。分割槽的數量就是Reducer任務執行的數量。預設只有一個Reducer任務。

5.第五階段是對每個分割槽中的鍵值對進行排序。首先,按照鍵進行排序,對於鍵相同的鍵值對,按照值進行排序。比如三個鍵值對<2,2>、<1,3>、<2,1>,鍵和值分別是整數。那麼排序後的結果是<1,3>、<2,1>、<2,2>。如果有第六階段,那麼進入第六階段;如果沒有,直接輸出到本地的linux檔案中。

6.第六階段是對資料進行歸約處理,也就是reduce處理。鍵相等的鍵值對會呼叫一次reduce方法。經過這一階段,資料量會減少。歸約後的資料輸出到本地的linux檔案中。本階段預設是沒有的,需要使用者自己增加這一階段的程式碼。


其實說白了,整個mapreduce的過程,就是將任務按需求的切割成一個個塊(block),分別對其進行操作運算,之後再重新彙總的一種過程。

直接拉一個通用手機瀏覽分析例子

源資料 (3個手機號及其在指定網站的上下傳流量

1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視訊網站 15 12 1527 2106 200

1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 資訊保安 20 20 3156 2936 200

1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200

通解:

在此之前,先對序列化做一個初步的認識,序列化後,可以將結構化的資料轉為位元組流,方便傳輸與永久性儲存

而hadoop也有自己的一套序列化讀寫介面 Writable,內部僅有兩個方法write()、readFields(),便於其叢集時節點的通訊。

這裡的Text,從原始碼中看出,它是針對於編碼格式且轉換為utf-8的,同時繼承與實現中也同時實現了Comparable方法,代表可排序的,可以當做一個編碼為utf-8的特殊String型別;

 KpiWritable.java (類似一個pojo,但是實現了hadoop的序列化介面Writable)

package hadoop.Writable.Serialization;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
* @ClassName: KpiWritable
* @Description: 封裝KpiWritable型別
* @author linge
* @date 2017年12月27日 下午5:12:18
*
*/
public class KpiWritable implements Writable{
	private long upPackNum;     // 上行資料包數,單位:個
	private long downPackNum;  // 下行資料包數,單位:個
	private long upPayLoad;    // 上行總流量,單位:byte
	private long downPayLoad;  // 下行總流量,單位:byte
	//方便序列化
	public KpiWritable() {
		super();
	}
	
    public KpiWritable(long upPackNum, long downPackNum, long upPayLoad, long downPayLoad) {
		super();
		this.upPackNum = upPackNum;
		this.downPackNum = downPackNum;
		this.upPayLoad = upPayLoad;
		this.downPayLoad = downPayLoad;
	}

	public KpiWritable(String upPack, String downPack, String upPay,
            String downPay) {//這個建構函式主要是為了在分發處理資料的時候方便字串插入
        upPackNum = Long.parseLong(upPack);
        downPackNum = Long.parseLong(downPack);
        upPayLoad = Long.parseLong(upPay);
        downPayLoad = Long.parseLong(downPay);
    }
	
	@Override
public String toString() {//這裡是方便分割槽,分片使用
		 String result = upPackNum + "\t" + downPackNum + "\t" + upPayLoad
                 + "\t" + downPayLoad;
         return result;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		 out.writeLong(upPackNum);
		 out.writeLong(downPackNum);
		 out.writeLong(upPayLoad);
		 out.writeLong(downPayLoad);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		 upPackNum = in.readLong();
		 downPackNum = in.readLong();
		 upPayLoad = in.readLong();
		 downPayLoad = in.readLong();
	}

	public long getUpPackNum() {
		return upPackNum;
	}

	public void setUpPackNum(long upPackNum) {
		this.upPackNum = upPackNum;
	}

	public long getDownPackNum() {
		return downPackNum;
	}

	public void setDownPackNum(long downPackNum) {
		this.downPackNum = downPackNum;
	}

	public long getUpPayLoad() {
		return upPayLoad;
	}

	public void setUpPayLoad(long upPayLoad) {
		this.upPayLoad = upPayLoad;
	}

	public long getDownPayLoad() {
		return downPayLoad;
	}

	public void setDownPayLoad(long downPayLoad) {
		this.downPayLoad = downPayLoad;
	}
	
	
}


MyMapper.java  分散處理,也就是前面說的分割槽,分片

package hadoop.Writable.Serialization;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 //LongWritable(偏移值) 是因為所有引數都是long型別,也可以用自定義的類,那樣只需要讓pojo類實現與LongWritable一樣的WritableComparable介面就好,裡面同樣實現了writable介面
public class MyMapper extends Mapper<LongWritable, Text, Text,KpiWritable>{
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, KpiWritable>.Context context)
			throws IOException, InterruptedException {
		String[] spilted = key.toString().split("\t");
		String string = spilted[1];//手機號碼
		Text k2 = new Text(string);//手機號碼作為主鍵
	    KpiWritable v2 = new KpiWritable(spilted[6], spilted[7],spilted[8], spilted[9]);
	    context.write(k2, v2);
	}
}

MyReducer.java對資料進行歸約處理

package hadoop.Writable.Serialization;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text,KpiWritable, Text, KpiWritable>{

	@Override
	protected void reduce(Text k2, Iterable<KpiWritable> arg1,
			Reducer<Text, KpiWritable, Text, KpiWritable>.Context context) throws IOException, InterruptedException {
		//用於統計的引數
		 long upPackNum = 0L;
         long downPackNum = 0L;
         long upPayLoad = 0L;
         long downPayLoad = 0L;
		for (KpiWritable kpiWritable : arg1) {
			upPackNum+=kpiWritable.getUpPackNum();
			downPackNum+=kpiWritable.getDownPackNum();
			upPayLoad+=kpiWritable.getUpPayLoad();
			downPayLoad+=kpiWritable.getDownPayLoad();
		}
		//k2這裡的k2是mapper拆分的key,也就是手機號碼
		KpiWritable v3 = new KpiWritable(upPackNum, downPackNum, upPayLoad, downPayLoad);
		context.write(k2, v3);
	}
}


程式碼整合實現Tool介面,通過ToolRunner來執行應用程式

 @Override
    public int run(String[] args) throws Exception {
        // 首先刪除輸出目錄已生成的檔案
        FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf());
        Path outPath = new Path(OUTPUT_PATH);
        if (fs.exists(outPath)) {
            fs.delete(outPath, true);
        }
        // 定義一個作業
        Job job = Job.getInstance(getConf(),"lingeJob");
        // 設定輸入目錄
        FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
        // 設定自定義Mapper類
        job.setMapperClass(MyMapper.class);
        // 指定<k2,v2>的型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(KpiWritable.class);
        // 設定自定義Reducer類
        job.setReducerClass(MyReducer.class);
        // 指定<k3,v3>的型別
        job.setOutputKeyClass(Text.class);
        job.setOutputKeyClass(KpiWritable.class);
        // 設定輸出目錄
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        // 提交作業
        Boolean res = job.waitForCompletion(true);
        if(res){
            System.out.println("Process success!");
            System.exit(0);
        }
        else{
            System.out.println("Process failed!");
            System.exit(1);
        }
        return 0;
    }

main方法呼叫註冊
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            int res = ToolRunner.run(conf, new MyKpiJob(), args);
            System.exit(res);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

完整程式碼
package hadoop.Writable.Deserialization;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyKpiJob extends Configured implements Tool {

    /*
     * 自定義資料型別KpiWritable
     */
    public static class KpiWritable implements WritableComparable<KpiWritable>{

        long upPackNum; // 上行資料包數,單位:個
        long downPackNum; // 下行資料包數,單位:個
        long upPayLoad; // 上行總流量,單位:byte
        long downPayLoad; // 下行總流量,單位:byte

        public KpiWritable() {
        }

        public KpiWritable(String upPack, String downPack, String upPay,
                String downPay) {
            upPackNum = Long.parseLong(upPack);
            downPackNum = Long.parseLong(downPack);
            upPayLoad = Long.parseLong(upPay);
            downPayLoad = Long.parseLong(downPay);
        }

        @Override
        public String toString() {
            String result = upPackNum + "\t" + downPackNum + "\t" + upPayLoad
                    + "\t" + downPayLoad;
            return result;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(upPackNum);
            out.writeLong(downPackNum);
            out.writeLong(upPayLoad);
            out.writeLong(downPayLoad);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            upPackNum = in.readLong();
            downPackNum = in.readLong();
            upPayLoad = in.readLong();
            downPayLoad = in.readLong();
        }

		@Override
		public int compareTo(KpiWritable o) {
			// TODO Auto-generated method stub
			return (int) (o.downPackNum-this.downPackNum);
		}

    }

    /*
     * 自定義Mapper類,重寫了map方法
     */
    public static class MyMapper extends
            Mapper<LongWritable, Text, Text, KpiWritable> {
        protected void map(
                LongWritable k1,
                Text v1,
                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, KpiWritable>.Context context)
                throws IOException, InterruptedException {
            String[] spilted = v1.toString().split("\\s+");
            System.out.println(spilted.length);
            if(spilted.length>1){
            String msisdn = spilted[1]; // 獲取手機號碼
            Text k2 = new Text(msisdn); // 轉換為Hadoop資料型別並作為k2
            KpiWritable v2 = new KpiWritable(spilted[6], spilted[7],
                    spilted[8], spilted[9]);
            context.write(k2, v2);
            }
        };
    }

    /*
     * 自定義Reducer類,重寫了reduce方法
     */
    public static class MyReducer extends
            Reducer<Text, KpiWritable, Text, KpiWritable> {
        protected void reduce(
                Text k2,
                java.lang.Iterable<KpiWritable> v2s,
                org.apache.hadoop.mapreduce.Reducer<Text, KpiWritable, Text, KpiWritable>.Context context)
                throws IOException, InterruptedException {
            long upPackNum = 0L;
            long downPackNum = 0L;
            long upPayLoad = 0L;
            long downPayLoad = 0L;
            for (KpiWritable kpiWritable : v2s) {
                upPackNum += kpiWritable.upPackNum;
                downPackNum += kpiWritable.downPackNum;
                upPayLoad += kpiWritable.upPayLoad;
                downPayLoad += kpiWritable.downPayLoad;
            }

            KpiWritable v3 = new KpiWritable(upPackNum + "", downPackNum + "",
                    upPayLoad + "", downPayLoad + "");
            System.out.println(k2+"\nupPackNum:"+upPackNum+"/ndownPackNum:"+downPackNum+"/n:upPayLoad:"+upPayLoad+"/ndownPayLoad:"+downPayLoad);
            context.write(k2, v3);
        };
    }

    // 輸入檔案目錄
    public static final String INPUT_PATH = "hdfs://192.168.88.129:9000/ha.txt";
    // 輸出檔案目錄
    public static final String OUTPUT_PATH = "hdfs://192.168.88.129:9000/out";

    @Override
    public int run(String[] args) throws Exception {
        // 首先刪除輸出目錄已生成的檔案
        FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf());
        Path outPath = new Path(OUTPUT_PATH);
        if (fs.exists(outPath)) {
            fs.delete(outPath, true);
        }
        // 定義一個作業
        Job job = Job.getInstance(getConf(),"lingeJob");
        // 設定輸入目錄
        FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
        // 設定自定義Mapper類
        job.setMapperClass(MyMapper.class);
        // 指定<k2,v2>的型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(KpiWritable.class);
        // 設定自定義Reducer類
        job.setReducerClass(MyReducer.class);
        // 指定<k3,v3>的型別
        job.setOutputKeyClass(Text.class);
        job.setOutputKeyClass(KpiWritable.class);
        // 設定輸出目錄
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        // 提交作業
        Boolean res = job.waitForCompletion(true);
        if(res){
            System.out.println("Process success!");
            System.exit(0);
        }
        else{
            System.out.println("Process failed!");
            System.exit(1);
        }
        return 0;
    }

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            int res = ToolRunner.run(conf, new MyKpiJob(), args);
            System.exit(res);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


整個操作中,我們主要是對Mapper與Reduce類的操作比較多,也主要是這些部分構成我們的業務需求實現。