1. 程式人生 > >MR例子(統計各個手機號在某段時間內產生的總流量)

MR例子(統計各個手機號在某段時間內產生的總流量)

目的:統計各個手機號在某段時間類產生的總流量

準備檔案 (已經上傳到hdfs上 檔名data.txt)
這裡寫圖片描述

上圖中對應的欄位如下圖
這裡寫圖片描述

檔案及程式碼分析

所給的檔案是每一個使用者每一次上網產生的流量,先如今需要將相同使用者進行聚合。
最後輸出的結果欄位:手機號 上行總流量 下行總流量 總流量
map的輸入輸出都是以key value 形式存在。輸入的鍵值對為K1為整數 value為字串 , 輸出的鍵值對K2為字串(手機號),輸出相當於上行總流量 ,下行總流量 ,總流量的list。所以我們用一個物件(DataBean)來儲存它們。
reduce的輸入就是map的輸出(經過shuffle處理,這裡不做詳細說明),reduce輸出的形式為key為手機號(字串),value為物件(DataBean)的結果就是我們最後想要的結果。
map進行的業務處理就是取出目標檔案中的四個欄位,然後進行拆分
reduce進行的業務處理,主要是對map的輸出中的DataBean裡面的流量進行求和,最後輸出,下面直接上程式碼。

DataBean

package cn.master1.hadoop.mr.dc;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class DataBean implements Writable{

    private String telNo;

    private long upPayLoad;

    private long downPayLoad;

    private
long totalPayLoad; public DataBean() {} public DataBean(String telNo, long upPayLoad, long downPayLoad) { this.telNo = telNo; this.upPayLoad = upPayLoad; this.downPayLoad = downPayLoad; this.totalPayLoad = upPayLoad + downPayLoad; } @Override public
String toString() { return this.upPayLoad + "/t" + this.downPayLoad + "/t" + this.totalPayLoad; } public void write(DataOutput out) throws IOException { out.writeUTF(telNo); out.writeLong(upPayLoad); out.writeLong(downPayLoad); out.writeLong(totalPayLoad); } public void readFields(DataInput in) throws IOException { this.telNo = in.readUTF(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); this.totalPayLoad = in.readLong(); } public String getTelNo() { return telNo; } public void setTelNo(String telNo) { this.telNo = telNo; } public long getUpPayLoad() { return upPayLoad; } public void setUpPayLoad(long upPayLoad) { this.upPayLoad = upPayLoad; } public long getDownPayLoad() { return downPayLoad; } public void setDownPayLoad(long downPayLoad) { this.downPayLoad = downPayLoad; } public long getTotalPayLoad() { return totalPayLoad; } public void setTotalPayLoad(long totalPayLoad) { this.totalPayLoad = totalPayLoad; } }
package cn.master1.hadoop.mr.dc;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DataCount {

    public static void main(String[] args) throws Exception {
         Configuration conf = new Configuration();
         Job job = Job.getInstance(conf);

         job.setJarByClass(DataCount.class);

         job.setMapperClass(DCMapper.class);
         /*當k2 v2 和 k3 v3 型別一一對應時,此行和下面一行可以省略。*/
         job.setMapOutputKeyClass(Text.class);
         job.setMapOutputValueClass(DataBean.class);
         FileInputFormat.setInputPaths(job, new Path(args[0]));

         job.setReducerClass(DCReducer.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(DataBean.class);
         FileOutputFormat.setOutputPath(job, new Path(args[1]));

         job.waitForCompletion(true);
    }

    public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DataBean>.Context context)
                throws IOException, InterruptedException {
            //接收資料
            String line = value.toString();
            String[] fileds = line.split("/t");
            String telNo = fileds[1];
            long up = Long.parseLong(fileds[8]);
            long down = Long.parseLong(fileds[9]);
            DataBean bean  = new DataBean(telNo, up, down);
            context.write(new Text(telNo), bean);
        }

    }

    public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{

        @Override
        protected void reduce(Text key, Iterable<DataBean> v2s, Context context)
                throws IOException, InterruptedException {
            long up_sum = 0;
            long down_sum = 0;
            for(DataBean bean : v2s){
                up_sum += bean.getUpPayLoad();
                down_sum += bean.getDownPayLoad();
            }
            DataBean bean = new DataBean("", up_sum, down_sum);
            context.write(key, bean);
        }
    }
}

jar包

打成jar包(不指定入口程式)命名為examples.jar,放到虛擬機器跟目錄下,然後執行
hadoop jar /root/examples.jar cn.master1.hadoop.mr.dc.DataCount /data.txt /dataout
cn.master1.hadoop.mr.dc.DataCount指定執行的入口程式 /data.txt 目標檔案(存在hdfs上) /dataout輸出檔案(存放到hdfs上)

最後輸出結果如下
這裡寫圖片描述

下面簡單說一說MR的執行流程和hadoop的序列化

MR執行流程

MR執行流程

(1).客戶端提交一個mr的jar包給RM(resourceManage)(提交方式:hadoop jar ...)
(2).JobClient通過RPC和RM進行通訊,返回一個存放jar包的地址(HDFS)和jobId
(3).client將jar包寫入到HDFS當中(path = hdfs上的地址 + jobId)
(4).開始提交任務(任務的描述資訊,不是jar, 包括jobid,jar存放的位置,配置資訊等等)
(5).RM進行初始化任務
(6).讀取HDFS上的要處理的檔案,開始計算輸入分片,每一個分片對應一個NM(nodeManage)
(7).NM通過心跳機制領取任務(任務的描述資訊)
(8).下載所需的jar,配置檔案等。
(9).NM啟動一個java child子程序,用來執行具體的任務(MapperTask或ReducerTask)
(10).將結果寫入到HDFS當中。

hadoop序列化

  • 序列化的概念
    序列化(Serialization)是指把結構化物件轉化為位元組流。
    反序列化(Deserialization)是序列化的逆過程。即把位元組流轉回結構化物件。
    Java序列化(java.io.Serializable)

hadoop序列化並不是用的java自帶的序列化機制,java的序列化機制運用的比較廣泛,所以序列化和反序列化時儲存的東西過多,效率較低,而hadoop在序列化時,只需要儲存資料即可,因為只需要傳輸資料。hadoop具有特定的序列化機制。

  • 序列化格式特點:
    緊湊:高效使用儲存空間。
    快速:讀寫資料的額外開銷小
    可擴充套件:可透明地讀取老格式的資料
    互操作:支援多語言的互動

hadoop的序列化格式Writable
更多詳細介紹