1. 程式人生 > >大資料之Hadoop學習——動手實戰學習MapReduce程式設計例項

大資料之Hadoop學習——動手實戰學習MapReduce程式設計例項

文章目錄


這裡放一個我學習MapReduce的程式設計例項專案吧,本來是想把這些分開寫成多篇文章的,能夠詳細敘述我學習過程中感想。但無奈,時間不夠,只好在Github上建立了該專案,在程式碼中由較為詳細的註釋,我想也足夠了吧。
josonle/MapReduce-Demo
該專案有些題目是參考了網上幾篇部落格,但程式碼實現是本人實現的。其次,所謂的MapReduce學習流程是參照老師上課所講的PPT上的流程【某985大資料課程PPT】,我想老師以這樣的流程授課肯定是有道理的。專案中也放了老師提供的幾個參考Demo檔案。

該專案還在更新中,有些程式碼還沒實現,慢慢來吧。

版權宣告:本文為博主原創文章,未經博主允許不得轉載(https://blog.csdn.net/lzw2016/)


一、MapReduce程式設計例項

1.自定義物件序列化

需求分析

需要統計手機使用者流量日誌,日誌內容例項:

flowdata.log

要把同一個使用者的上行流量、下行流量進行累加,並計算出綜合 。例如上面的13897230503有兩條記錄,就要對這兩條記錄進行累加,計算總和,得到:13897230503,500,1600,2100

報錯:Exception in thread “main” java.lang.IllegalArgumentException: Wrong FS: hdfs://192.168.17.10:9000/workspace/flowStatistics/output, expected: file:///

解決:1、將core-site.xml 和hdfs-site.xml拷貝到專案裡去就可以,原因是訪問遠端的HDFS 需要通過URI來獲得FileSystem
	2、在專案中,Configuration物件設定fs.defaultFS 【推薦這個,**大小寫別拼錯,我就是拼錯了找了半天**】

		String namenode_ip = "192.168.17.10";
		String hdfs = "hdfs://"+namenode_ip+":9000";
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", hdfs);

解答

一、正常處理即可,不過在處理500 1400 這種時靈活變通一下即可
public static class FlowMapper extends Mapper<Object, Text, Text, Text>{
		
		public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
			String[] strs = value.toString().split("\t");
			Text phone = new Text(strs[0]);
			Text flow = new Text(strs[1]+"\t"+strs[2]);
			context.write(phone, flow);
		}
	}
	
	public static class FlowReducer extends Reducer<Text, Text, Text, Text>{
		public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
			int upFlow = 0;
			int downFlow = 0;
			
			for (Text value : values) {
				String[] strs = value.toString().split("\t");
				upFlow += Integer.parseInt(strs[0].toString());
				downFlow += Integer.parseInt(strs[1].toString());
			}
			int sumFlow = upFlow+downFlow;
			
			context.write(key,new Text(upFlow+"\t"+downFlow+"\t"+sumFlow));
		}
	}

二、自定義一個實現Writable介面的可序列化的物件Flow,包含資料形式如 upFlow downFlow sumFlow

public static class FlowWritableMapper extends Mapper<Object, Text, Text, FlowWritable> {
		public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
			String[] strs = value.toString().split("\t");
			Text phone = new Text(strs[0]);
			FlowWritable flow = new FlowWritable(Integer.parseInt(strs[1]),Integer.parseInt(strs[2]));
			context.write(phone, flow);
		}
	}
	public static class FlowWritableReducer extends Reducer<Text, FlowWritable, Text, FlowWritable>{
		public void reduce(Text key,Iterable<FlowWritable> values,Context context) throws IOException, InterruptedException {
			int upFlow = 0;
			int downFlow = 0;
			
			for (FlowWritable value : values) {
				upFlow += value.getUpFlow();
				downFlow += value.getDownFlow();
			}
			
			context.write(key,new FlowWritable(upFlow,downFlow));
		}
	}
	
	public static class FlowWritable implements Writable{
		private int upFlow,downFlow,sumFlow;

		public FlowWritable(int upFlow,int downFlow) {
			this.upFlow = upFlow;
			this.downFlow = downFlow;
			this.sumFlow = upFlow+downFlow;
		}
		
		public int getDownFlow() {
			return downFlow;
		}

		public void setDownFlow(int downFlow) {
			this.downFlow = downFlow;
		}

		public int getUpFlow() {
			return upFlow;
		}

		public void setUpFlow(int upFlow) {
			this.upFlow = upFlow;
		}

		public int getSumFlow() {
			return sumFlow;
		}

		public void setSumFlow(int sumFlow) {
			this.sumFlow = sumFlow;
		}
		// writer和readFields方法務必實現,序列化資料的關鍵
		@Override
		public void write(DataOutput out) throws IOException {
			// TODO Auto-generated method stub
			out.writeInt(upFlow);
			out.writeInt(downFlow);
			out.writeInt(sumFlow);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			// TODO Auto-generated method stub
			upFlow = in.readInt();
			downFlow = in.readInt();
			sumFlow = in.readInt();
		}

		@Override
		public String toString() {
			// TODO Auto-generated method stub
			return upFlow+"\t"+downFlow+"\t"+sumFlow;
		}
	}

注意: 要根據具體情況在job中設定Mapper、Reducer類及輸出的key、value型別
具體見程式碼

2.資料去重

需求分析

需求很簡單,就是把檔案中重複資料去掉。比如說統計類似如下檔案中不包含重複日期資料的日期

2017-02-14 1
2016-02-01 2
2017-07-10 3
2016-02-26 4
2015-01-19 5
2016-04-29 6
2016-05-10 7
2015-11-20 8
2017-05-23 9
2014-02-26 10

解答思路

只要搞清楚了MR的流程這個就很簡單,reducer的輸入類似<key3,[v1,v2,v3…]>,這個地方輸入的key3是沒有重複值的。所以利用這一點,Mapper輸出的key儲存日期資料,value置為空即可 【這裡可以使用NullWritable型別】

還有就是,不一定是日期去重,去重一行資料也是如此,key儲存這一行資料即可

public static class DateDistinctMapper extends Mapper<Object, Text, Text, NullWritable> {		
		public void map(Object key, Text value, Context context ) 
				throws IOException, InterruptedException {
	    	String[] strs = value.toString().split(" ");
	    	Text date = new Text(strs[0]);//取到日期作為key
			context.write(date, NullWritable.get());
	    }
	}
  
public static class DateDistinctReducer extends Reducer<Text,NullWritable,Text,NullWritable>{
    
		public void reduce(Text key, Iterable<NullWritable> values, Context context) 
				throws IOException, InterruptedException {
			context.write(key, NullWritable.get());
	    }
	}

3.資料排序、二次排序

需求分析

這一類問題很多,像學生按成績排序,手機使用者流量按上行流量升序,下行流量降序排序等等

  1. 日期計數升序排序

  2. 日期計數降序排序

    //日期	日期出現的次數
    2015-01-27	7
    2015-01-28	3
    2015-01-29	7
    2015-01-30	6
    2015-01-31	7
    2015-02-01	15
    2015-02-02	10
    2015-02-03	9
    2015-02-04	12
    2015-02-05	14
    
  3. 手機使用者流量按上行流量升序,下行流量降序排序

解答思路

MapReduce是預設會對key進行升序排序的,可以利用這一點實現某些排序

  • 單列排序
    • 升序還是降序排序
    • 可以利用Shuffle預設對key排序的規則;
    • 自定義繼承WritableComparator的排序類,實現compare方法
  • 二次排序
    • 實現可序列化的比較類WritableComparable,並實現compareTo方法(同樣可指定升序降序)
日期按計數升序排序
public static class SortMapper extends Mapper<Object, Text, IntWritable, Text> {
		private IntWritable num = new IntWritable();

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			String[] strs = value.toString().split("\t");
			num.set(Integer.parseInt(strs[1]));
			// 將次數作為key進行升序排序
			context.write(num, new Text(strs[0]));
			System.out.println(num.get()+","+strs[0]);
		}
	}

	public static class SortReducer extends Reducer<IntWritable, Text, Text, IntWritable> {

		public void reduce(IntWritable key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			for (Text value : values) {
				// 排序後再次顛倒k-v,將日期作為key
				System.out.println(value.toString()+":"+key.get());
				context.write(value, key);
			}
		}
	}
日期按計數降序排序

實現自定義的排序比較器,繼承WritableComparator類,並實現其compare方法

public static class MyComparator extends WritableComparator {
		public MyComparator() {
			// TODO Auto-generated constructor stub
			super(IntWritable.class, true);
		}

		@Override
		@SuppressWarnings({ "rawtypes", "unchecked" }) // 不檢查型別
		public int compare(WritableComparable a, WritableComparable b) {
			// CompareTo方法,返回值為1則降序,-1則升序
			// 預設是a.compareTo(b),a比b小返回-1,現在反過來返回1,就變成了降序
			return b.compareTo(a);
	}

所使用的Mapper、Reducer同上面升序排序的,其次,要在main函式中指定自定義的排序比較器

job.setSortComparatorClass(MyComparator.class);

手機使用者流量按上行流量升序,下行流量降序排序

同第一個例項類似,要自定義物件序列化,同時也要可比較,實現WritableComparable介面,並實現CompareTo方法

我這裡是將之前統計好的使用者流量資料作為輸入資料

public static class MySortKey implements WritableComparable<MySortKey> {
		private int upFlow;
		private int downFlow;
		private int sumFlow;

		public void FlowSort(int up, int down) {
			upFlow = up;
			downFlow = down;
			sumFlow = up + down;
		}

		public int getUpFlow() {
			return upFlow;
		}
		public void setUpFlow(int upFlow) {
			this.upFlow = upFlow;
		}
		public int getDownFlow() {
			return downFlow;
		}
		public void setDownFlow(int downFlow) {
			this.downFlow = downFlow;
		}
		public int getSumFlow() {
			return sumFlow;
		}
		public void setSumFlow(int sumFlow) {
			this.sumFlow = sumFlow;
		}

		@Override
		public void write(DataOutput out) throws IOException {
			// TODO Auto-generated method stub
			out.writeInt(upFlow);
			out.writeInt(downFlow);
			out.writeInt(sumFlow);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			// TODO Auto-generated method stub
			upFlow = in.readInt();
			downFlow = in.readInt();
			sumFlow = in.readInt();
		}

		@Override
		public int compareTo(MySortKey o) {
			if ((this.upFlow - o.upFlow) == 0) {// 上行流量相等,比較下行流量
				return o.downFlow - this.downFlow;// 按downFlow降序排序
			} else {
				return this.upFlow - o.upFlow;// 按upFlow升序排
			}
		}

		@Override
		public String toString() {
			// TODO Auto-generated method stub
			return upFlow + "\t" + downFlow + "\t" + sumFlow;
		}
	}

	public static class SortMapper extends Mapper<Object, Text, MySortKey, Text> {
		Text phone = new Text();
		MySortKey mySortKey = new MySortKey();

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			String[] lists = value.toString().split("\t");
			phone.set(lists[0]);
			mySortKey.setUpFlow(Integer.parseInt(lists[1]));
			mySortKey.setDownFlow(Integer.parseInt(lists[2]));
			context.write(mySortKey, phone);// 調換手機號和流量計數,後者作為排序鍵
		}
	}

	public static class SortReducer extends Reducer<MySortKey, Text, Text, MySortKey> {
		public void reduce(MySortKey key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			for (Text value : values) {
				System.out.println(value.toString()+","+key.toString());
				context.write(value, key);// 再次把手機號和流量計數調換
			}
		}
	}

4.自定義分割槽

需求分析

還是以上個例子的手機使用者流量日誌為例,在上個例子的統計需要基礎上新增一個新需求:按省份統計,不同省份的手機號放到不同的檔案裡。

例如137表示屬於河北,138屬於河南,那麼在結果輸出時,他們分別在不同的檔案中。

解答思路

挺簡單的,看過我之前結合原始碼解讀MapReduce過程的話,就知道這其實就是一個分割槽的問題。定義自己的分割槽規則,一個分割槽會對應一個reduce,會輸出到一個檔案。

而你需要做的就是基礎partitioner類,並實現getPartition方法,其餘過程同第一個例子

// 自定義分割槽類
public static class PhoneNumberPartitioner extends Partitioner<Text, FlowWritable> {
		private static HashMap<String, Integer> numberDict = new HashMap<>();
		static {
			numberDict.put("133", 0);
			numberDict.put("135", 1);
			numberDict.put("137", 2);
			numberDict.put("138", 3);
		}

		@Override
		public int getPartition(Text key, FlowWritable value, 
            
           

相關推薦

資料Hadoop學習——動手實戰學習MapReduce程式設計例項

文章目錄 一、MapReduce程式設計例項 1.自定義物件序列化 需求分析 報錯:Exception in thread "main" java.lang.IllegalArgumentExcept

資料Hadoop學習(環境配置)——Hadoop偽分散式叢集搭建

title: Hadoop偽分散式叢集搭建 date: 2018-11-14 15:17:20 tags: Hadoop categories: 大資料 點選檢視我的部落格: Josonlee’s Blog 文章目錄 前言準備 偽分

資料Hadoop學習《一》——認識HDFS

title: 大資料之Hadoop學習<一>————認識HDFS date: 2018-11-12 20:31:36 tags: Hadoop categories: 大資料 toc: true 點選檢視我的部落格:Josonlee’s Blog 文

資料hadoop 環境搭建從零開始——實戰訓練

        這裡的前提是要先安裝一個乾淨的CentOS系統,我這裡用的是CentOS6.6,安裝教程參考另一篇部落格:https://blog.csdn.net/gaofengyan/article/details/85054337 目錄 ha

資料hadoop / hive / hbase 的區別是什麼?有什麼應用場景?

文章目錄 1. hadoop 2. hive 3. hbase 總結 1. hadoop 它是一個分散式計算+分散式檔案系統,前者其實就是 MapReduce,後者是 HDFS 。後者可以獨立執行,前者可以選擇性使用,也

最詳細的資料Hadoop分散式系統架構解析!沒有之一!

Hadoop 由許多元素構成。其最底部是 Hadoop Distributed File System(HDFS),它儲存 Hadoop 叢集中所有儲存節點上的檔案。HDFS(對於本文)的上一層是MapReduce引擎,該引擎由 JobTrackers 和 TaskTrack

資料hadoop分散式計算框架MapReduce

一、MapReduce構成 MapReduce分為兩部分,即map和reduce。 其中map是入隊(key,value),reduce則是聚合(計算)。 map過程的輸出時reduce過程的輸入。 需要注意的是這裡map中的key是可以重複的,reduce做聚

資料hadoop對比spark------資料儲存

1.Hadoop的資料都是來自於哪裡: 答案:磁碟。 2.map與reduce可以同時執行嗎? 答案:不能,由什麼決定的,shuffle過程決定的。 3.spark為什麼比hadoop要快,sprak儘量的避免從磁碟中進行讀取,以及配置資訊和計算資料,對比這些特性,極

資料Hadoop生態系統概述

一、什麼是大資料        首先,我們來了解一下,什麼是大資料?大資料(BigData)是指無法在一定時間內用常規軟體工具進行捕捉、管理和處理的資料集合,是需要新處理模式才能具有更強的決策力、洞察發現力和流程優化能力的海量、高增長率、多樣化的資訊資產。由IBM提出的大資料

資料hadoop(檔案系統HDFS)

一 HDFS概述1.1 概念HDFS,它是一個檔案系統,用於儲存檔案,通過目錄樹來定位檔案;其次,它是分散式的,由很多伺服器聯合起來實現其功能,叢集中的伺服器有各自的角色。HDFS的設計適合一次寫入,多次讀出的場景,且不支援檔案的修改。適合用來做資料分析,並不適合用來做網盤應

資料hadoop機架感知

Hadoop是一個能夠對大量資料進行分散式處理的軟體框架,實現了Google的MapReduce程式設計模型和框架,能夠把應用程式分割成許多的小的工作單元(塊),並把這些單元放到任何叢集節點上執行。在MapReduce中,一個準備提交執行的應用程式稱為“作業(job)”,而從一個作業劃分出得、運行於各個計算節

初探資料Hadoop簡介

一、Hadoop的主要作用         Hadoop主要解決海量資料的儲存和海量資料的分析計算。 二、Hadoop框架技術的組成 1、HDFS:         HDFS是一個檔案系統,用來儲存檔案的

資料hadoop【hdfs】

目錄 1、HDFS體系結構 2、HDFS Shell操作 3、HDFS Java API 4、HDFS和RPC 5、HDFS High Availability 6、HDFS資料回收和簡單運維 =======

資料HadoopMapReduce(四))------->企業優化

6.1 MapReduce 跑的慢的原因 Mapreduce 程式效率的瓶頸在於兩點: 1)計算機效能        CPU、記憶體、磁碟健康、網路 2)I/O 操作優化 (1)資料傾斜 (2)map和reduce數設定不合理 (3)reduce等待過久 (4)小檔案過多

資料hadoop面試題4

2.23. 我們開發job時,是否可以去掉reduce階段。可以。設定reduce數為0 即可。2.24. datanode在什麼情況下不會備份datanode在強制關閉或者非正常斷電不會備份。2.25. combiner出現在那個過程出現在map階段的map方法後等。2.

資料hadoop單機版虛擬機器Vmware安裝教程

        為深入學習hadoop,需要在個人電腦中安裝cloudera_centos虛擬機器。本篇文件介紹的就是關於cloudera_centos虛擬機器的安裝教程。(推薦使用virtualbox

資料hadoop-小檔案合併、Mapreduce原理)

hadoop-小檔案合併 package com.andy.merge; import org.apache.hadoo

第127講:Hadoop叢集管理安全模式解析及動手實戰學習筆記

第127講:Hadoop叢集管理之安全模式解析及動手實戰學習筆記 hadoop在啟動時namenode會把fsimage載入進記憶體,同時和edits內容合併,以此建立整個檔案系統的元資料的映象(記憶體級別),所以客戶端可以通過namenode訪問檔案系統的資訊。完成後變成

Hadoop 權威指南 - 資料的儲存與分析》學習筆記

第一章 初識Hadoop 1.2 資料的儲存與分析 對多個硬碟中的資料並行進行讀/寫資料,有以下兩個重要問題: 硬體故障問題。解決方案:複製(replication),系統儲存資料的副本(replica)。 以某種方式結合大部分資料來共同完成分析。MapReduce

資料Spark(七)--- Spark機器學習,樸素貝葉斯,酒水評估和分類案例學習,垃圾郵件過濾學習案例,電商商品推薦,電影推薦學習案例

一、Saprk機器學習介紹 ------------------------------------------------------------------ 1.監督學習 a.有訓練資料集,符合規範的資料 b.根據資料集,產生一個推斷函式