大資料之Hadoop學習——動手實戰學習MapReduce程式設計例項
文章目錄
- 一、MapReduce程式設計例項
- 1.自定義物件序列化
- 需求分析
- 報錯:Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://192.168.17.10:9000/workspace/flowStatistics/output, expected: file:///
- 解答
- 2.資料去重
- 3.資料排序、二次排序
- 4.自定義分割槽
- 5.計算出每組訂單中金額最大的記錄
- 多檔案輸入輸出、及不同輸入輸出格式化型別
- 8.join操作
- 9.**計算出使用者間的共同好友**
- 二、MapReduce理論基礎
- 三、Hadoop、Spark學習路線及資源收納
- 四、MapReduce書籍推薦
- 五、MapReduce實戰系統學習流程
- 詞頻統計
- 資料去重
- 資料排序
- 求平均值、中位數、標準差、最大/小值、計數
- 分組、分割槽
- 資料輸入輸出格式化
- 多檔案輸入、輸出
- 單表關聯
- 多表關聯
- 倒排索引
- TopN
- PeopleRank演算法實現
- 推薦系統——協同過濾演算法實現
- 六、資料
- 七、關於我
這裡放一個我學習MapReduce的程式設計例項專案吧,本來是想把這些分開寫成多篇文章的,能夠詳細敘述我學習過程中感想。但無奈,時間不夠,只好在Github上建立了該專案,在程式碼中由較為詳細的註釋,我想也足夠了吧。
josonle/MapReduce-Demo
該專案有些題目是參考了網上幾篇部落格,但程式碼實現是本人實現的。其次,所謂的MapReduce學習流程是參照老師上課所講的PPT上的流程【某985大資料課程PPT】,我想老師以這樣的流程授課肯定是有道理的。專案中也放了老師提供的幾個參考Demo檔案。
該專案還在更新中,有些程式碼還沒實現,慢慢來吧。
版權宣告:本文為博主原創文章,未經博主允許不得轉載(https://blog.csdn.net/lzw2016/)
一、MapReduce程式設計例項
1.自定義物件序列化
需求分析
需要統計手機使用者流量日誌,日誌內容例項:
要把同一個使用者的上行流量、下行流量進行累加,並計算出綜合 。例如上面的13897230503有兩條記錄,就要對這兩條記錄進行累加,計算總和,得到:13897230503,500,1600,2100
報錯:Exception in thread “main” java.lang.IllegalArgumentException: Wrong FS: hdfs://192.168.17.10:9000/workspace/flowStatistics/output, expected: file:///
解決:1、將core-site.xml 和hdfs-site.xml拷貝到專案裡去就可以,原因是訪問遠端的HDFS 需要通過URI來獲得FileSystem
2、在專案中,Configuration物件設定fs.defaultFS 【推薦這個,**大小寫別拼錯,我就是拼錯了找了半天**】
String namenode_ip = "192.168.17.10";
String hdfs = "hdfs://"+namenode_ip+":9000";
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfs);
解答
一、正常處理即可,不過在處理500 1400
這種時靈活變通一下即可
public static class FlowMapper extends Mapper<Object, Text, Text, Text>{
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split("\t");
Text phone = new Text(strs[0]);
Text flow = new Text(strs[1]+"\t"+strs[2]);
context.write(phone, flow);
}
}
public static class FlowReducer extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
int upFlow = 0;
int downFlow = 0;
for (Text value : values) {
String[] strs = value.toString().split("\t");
upFlow += Integer.parseInt(strs[0].toString());
downFlow += Integer.parseInt(strs[1].toString());
}
int sumFlow = upFlow+downFlow;
context.write(key,new Text(upFlow+"\t"+downFlow+"\t"+sumFlow));
}
}
二、自定義一個實現Writable介面的可序列化的物件Flow,包含資料形式如 upFlow downFlow sumFlow
public static class FlowWritableMapper extends Mapper<Object, Text, Text, FlowWritable> {
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split("\t");
Text phone = new Text(strs[0]);
FlowWritable flow = new FlowWritable(Integer.parseInt(strs[1]),Integer.parseInt(strs[2]));
context.write(phone, flow);
}
}
public static class FlowWritableReducer extends Reducer<Text, FlowWritable, Text, FlowWritable>{
public void reduce(Text key,Iterable<FlowWritable> values,Context context) throws IOException, InterruptedException {
int upFlow = 0;
int downFlow = 0;
for (FlowWritable value : values) {
upFlow += value.getUpFlow();
downFlow += value.getDownFlow();
}
context.write(key,new FlowWritable(upFlow,downFlow));
}
}
public static class FlowWritable implements Writable{
private int upFlow,downFlow,sumFlow;
public FlowWritable(int upFlow,int downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow+downFlow;
}
public int getDownFlow() {
return downFlow;
}
public void setDownFlow(int downFlow) {
this.downFlow = downFlow;
}
public int getUpFlow() {
return upFlow;
}
public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
}
public int getSumFlow() {
return sumFlow;
}
public void setSumFlow(int sumFlow) {
this.sumFlow = sumFlow;
}
// writer和readFields方法務必實現,序列化資料的關鍵
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(upFlow);
out.writeInt(downFlow);
out.writeInt(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
upFlow = in.readInt();
downFlow = in.readInt();
sumFlow = in.readInt();
}
@Override
public String toString() {
// TODO Auto-generated method stub
return upFlow+"\t"+downFlow+"\t"+sumFlow;
}
}
注意: 要根據具體情況在job中設定Mapper、Reducer類及輸出的key、value型別
具體見程式碼
2.資料去重
需求分析
需求很簡單,就是把檔案中重複資料去掉。比如說統計類似如下檔案中不包含重複日期資料的日期
2017-02-14 1
2016-02-01 2
2017-07-10 3
2016-02-26 4
2015-01-19 5
2016-04-29 6
2016-05-10 7
2015-11-20 8
2017-05-23 9
2014-02-26 10
解答思路
只要搞清楚了MR的流程這個就很簡單,reducer的輸入類似<key3,[v1,v2,v3…]>,這個地方輸入的key3是沒有重複值的。所以利用這一點,Mapper輸出的key儲存日期資料,value置為空即可 【這裡可以使用NullWritable型別】
還有就是,不一定是日期去重,去重一行資料也是如此,key儲存這一行資料即可
public static class DateDistinctMapper extends Mapper<Object, Text, Text, NullWritable> {
public void map(Object key, Text value, Context context )
throws IOException, InterruptedException {
String[] strs = value.toString().split(" ");
Text date = new Text(strs[0]);//取到日期作為key
context.write(date, NullWritable.get());
}
}
public static class DateDistinctReducer extends Reducer<Text,NullWritable,Text,NullWritable>{
public void reduce(Text key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
3.資料排序、二次排序
需求分析
這一類問題很多,像學生按成績排序,手機使用者流量按上行流量升序,下行流量降序排序等等
-
日期計數升序排序
-
日期計數降序排序
//日期 日期出現的次數 2015-01-27 7 2015-01-28 3 2015-01-29 7 2015-01-30 6 2015-01-31 7 2015-02-01 15 2015-02-02 10 2015-02-03 9 2015-02-04 12 2015-02-05 14
-
手機使用者流量按上行流量升序,下行流量降序排序
解答思路
MapReduce是預設會對key進行升序排序的,可以利用這一點實現某些排序
- 單列排序
- 升序還是降序排序
- 可以利用Shuffle預設對key排序的規則;
- 自定義繼承WritableComparator的排序類,實現compare方法
- 二次排序
- 實現可序列化的比較類WritableComparable,並實現compareTo方法(同樣可指定升序降序)
日期按計數升序排序
public static class SortMapper extends Mapper<Object, Text, IntWritable, Text> {
private IntWritable num = new IntWritable();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] strs = value.toString().split("\t");
num.set(Integer.parseInt(strs[1]));
// 將次數作為key進行升序排序
context.write(num, new Text(strs[0]));
System.out.println(num.get()+","+strs[0]);
}
}
public static class SortReducer extends Reducer<IntWritable, Text, Text, IntWritable> {
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
// 排序後再次顛倒k-v,將日期作為key
System.out.println(value.toString()+":"+key.get());
context.write(value, key);
}
}
}
日期按計數降序排序
實現自定義的排序比較器,繼承WritableComparator類,並實現其compare方法
public static class MyComparator extends WritableComparator {
public MyComparator() {
// TODO Auto-generated constructor stub
super(IntWritable.class, true);
}
@Override
@SuppressWarnings({ "rawtypes", "unchecked" }) // 不檢查型別
public int compare(WritableComparable a, WritableComparable b) {
// CompareTo方法,返回值為1則降序,-1則升序
// 預設是a.compareTo(b),a比b小返回-1,現在反過來返回1,就變成了降序
return b.compareTo(a);
}
所使用的Mapper、Reducer同上面升序排序的,其次,要在main函式中指定自定義的排序比較器
job.setSortComparatorClass(MyComparator.class);
手機使用者流量按上行流量升序,下行流量降序排序
同第一個例項類似,要自定義物件序列化,同時也要可比較,實現WritableComparable介面,並實現CompareTo方法
我這裡是將之前統計好的使用者流量資料作為輸入資料
public static class MySortKey implements WritableComparable<MySortKey> {
private int upFlow;
private int downFlow;
private int sumFlow;
public void FlowSort(int up, int down) {
upFlow = up;
downFlow = down;
sumFlow = up + down;
}
public int getUpFlow() {
return upFlow;
}
public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
}
public int getDownFlow() {
return downFlow;
}
public void setDownFlow(int downFlow) {
this.downFlow = downFlow;
}
public int getSumFlow() {
return sumFlow;
}
public void setSumFlow(int sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(upFlow);
out.writeInt(downFlow);
out.writeInt(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
upFlow = in.readInt();
downFlow = in.readInt();
sumFlow = in.readInt();
}
@Override
public int compareTo(MySortKey o) {
if ((this.upFlow - o.upFlow) == 0) {// 上行流量相等,比較下行流量
return o.downFlow - this.downFlow;// 按downFlow降序排序
} else {
return this.upFlow - o.upFlow;// 按upFlow升序排
}
}
@Override
public String toString() {
// TODO Auto-generated method stub
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
public static class SortMapper extends Mapper<Object, Text, MySortKey, Text> {
Text phone = new Text();
MySortKey mySortKey = new MySortKey();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] lists = value.toString().split("\t");
phone.set(lists[0]);
mySortKey.setUpFlow(Integer.parseInt(lists[1]));
mySortKey.setDownFlow(Integer.parseInt(lists[2]));
context.write(mySortKey, phone);// 調換手機號和流量計數,後者作為排序鍵
}
}
public static class SortReducer extends Reducer<MySortKey, Text, Text, MySortKey> {
public void reduce(MySortKey key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
System.out.println(value.toString()+","+key.toString());
context.write(value, key);// 再次把手機號和流量計數調換
}
}
}
4.自定義分割槽
需求分析
還是以上個例子的手機使用者流量日誌為例,在上個例子的統計需要基礎上新增一個新需求:按省份統計,不同省份的手機號放到不同的檔案裡。
例如137表示屬於河北,138屬於河南,那麼在結果輸出時,他們分別在不同的檔案中。
解答思路
挺簡單的,看過我之前結合原始碼解讀MapReduce過程的話,就知道這其實就是一個分割槽的問題。定義自己的分割槽規則,一個分割槽會對應一個reduce,會輸出到一個檔案。
而你需要做的就是基礎partitioner類,並實現getPartition方法,其餘過程同第一個例子
// 自定義分割槽類
public static class PhoneNumberPartitioner extends Partitioner<Text, FlowWritable> {
private static HashMap<String, Integer> numberDict = new HashMap<>();
static {
numberDict.put("133", 0);
numberDict.put("135", 1);
numberDict.put("137", 2);
numberDict.put("138", 3);
}
@Override
public int getPartition(Text key, FlowWritable value,
相關推薦
大資料之Hadoop學習——動手實戰學習MapReduce程式設計例項
文章目錄
一、MapReduce程式設計例項
1.自定義物件序列化
需求分析
報錯:Exception in thread "main" java.lang.IllegalArgumentExcept
大資料之Hadoop學習(環境配置)——Hadoop偽分散式叢集搭建
title: Hadoop偽分散式叢集搭建 date: 2018-11-14 15:17:20 tags: Hadoop categories: 大資料 點選檢視我的部落格: Josonlee’s Blog
文章目錄
前言準備
偽分
大資料之Hadoop學習《一》——認識HDFS
title: 大資料之Hadoop學習<一>————認識HDFS date: 2018-11-12 20:31:36 tags: Hadoop categories: 大資料 toc: true 點選檢視我的部落格:Josonlee’s Blog
文
大資料之hadoop 環境搭建從零開始——實戰訓練
這裡的前提是要先安裝一個乾淨的CentOS系統,我這裡用的是CentOS6.6,安裝教程參考另一篇部落格:https://blog.csdn.net/gaofengyan/article/details/85054337
目錄
ha
大資料之hadoop / hive / hbase 的區別是什麼?有什麼應用場景?
文章目錄
1. hadoop
2. hive
3. hbase
總結
1. hadoop
它是一個分散式計算+分散式檔案系統,前者其實就是 MapReduce,後者是 HDFS 。後者可以獨立執行,前者可以選擇性使用,也
最詳細的大資料之Hadoop分散式系統架構解析!沒有之一!
Hadoop 由許多元素構成。其最底部是 Hadoop Distributed File System(HDFS),它儲存 Hadoop 叢集中所有儲存節點上的檔案。HDFS(對於本文)的上一層是MapReduce引擎,該引擎由 JobTrackers 和 TaskTrack
大資料之hadoop分散式計算框架MapReduce
一、MapReduce構成
MapReduce分為兩部分,即map和reduce。
其中map是入隊(key,value),reduce則是聚合(計算)。
map過程的輸出時reduce過程的輸入。
需要注意的是這裡map中的key是可以重複的,reduce做聚
大資料之hadoop對比spark------資料儲存
1.Hadoop的資料都是來自於哪裡:
答案:磁碟。
2.map與reduce可以同時執行嗎?
答案:不能,由什麼決定的,shuffle過程決定的。
3.spark為什麼比hadoop要快,sprak儘量的避免從磁碟中進行讀取,以及配置資訊和計算資料,對比這些特性,極
大資料之Hadoop生態系統概述
一、什麼是大資料 首先,我們來了解一下,什麼是大資料?大資料(BigData)是指無法在一定時間內用常規軟體工具進行捕捉、管理和處理的資料集合,是需要新處理模式才能具有更強的決策力、洞察發現力和流程優化能力的海量、高增長率、多樣化的資訊資產。由IBM提出的大資料
大資料之hadoop(檔案系統HDFS)
一 HDFS概述1.1 概念HDFS,它是一個檔案系統,用於儲存檔案,通過目錄樹來定位檔案;其次,它是分散式的,由很多伺服器聯合起來實現其功能,叢集中的伺服器有各自的角色。HDFS的設計適合一次寫入,多次讀出的場景,且不支援檔案的修改。適合用來做資料分析,並不適合用來做網盤應
大資料之hadoop機架感知
Hadoop是一個能夠對大量資料進行分散式處理的軟體框架,實現了Google的MapReduce程式設計模型和框架,能夠把應用程式分割成許多的小的工作單元(塊),並把這些單元放到任何叢集節點上執行。在MapReduce中,一個準備提交執行的應用程式稱為“作業(job)”,而從一個作業劃分出得、運行於各個計算節
初探大資料之Hadoop簡介
一、Hadoop的主要作用
Hadoop主要解決海量資料的儲存和海量資料的分析計算。
二、Hadoop框架技術的組成
1、HDFS:
HDFS是一個檔案系統,用來儲存檔案的
大資料之hadoop【hdfs】
目錄
1、HDFS體系結構
2、HDFS Shell操作
3、HDFS Java API
4、HDFS和RPC
5、HDFS High Availability
6、HDFS資料回收和簡單運維
=======
大資料之Hadoop(MapReduce(四))------->企業優化
6.1 MapReduce 跑的慢的原因
Mapreduce 程式效率的瓶頸在於兩點:
1)計算機效能
CPU、記憶體、磁碟健康、網路
2)I/O 操作優化
(1)資料傾斜
(2)map和reduce數設定不合理
(3)reduce等待過久
(4)小檔案過多
大資料之hadoop面試題4
2.23. 我們開發job時,是否可以去掉reduce階段。可以。設定reduce數為0 即可。2.24. datanode在什麼情況下不會備份datanode在強制關閉或者非正常斷電不會備份。2.25. combiner出現在那個過程出現在map階段的map方法後等。2.
大資料之hadoop單機版虛擬機器Vmware安裝教程
為深入學習hadoop,需要在個人電腦中安裝cloudera_centos虛擬機器。本篇文件介紹的就是關於cloudera_centos虛擬機器的安裝教程。(推薦使用virtualbox
大資料(hadoop-小檔案合併、Mapreduce原理)
hadoop-小檔案合併
package com.andy.merge;
import org.apache.hadoo
第127講:Hadoop叢集管理之安全模式解析及動手實戰學習筆記
第127講:Hadoop叢集管理之安全模式解析及動手實戰學習筆記
hadoop在啟動時namenode會把fsimage載入進記憶體,同時和edits內容合併,以此建立整個檔案系統的元資料的映象(記憶體級別),所以客戶端可以通過namenode訪問檔案系統的資訊。完成後變成
《Hadoop 權威指南 - 大資料的儲存與分析》學習筆記
第一章 初識Hadoop
1.2 資料的儲存與分析
對多個硬碟中的資料並行進行讀/寫資料,有以下兩個重要問題:
硬體故障問題。解決方案:複製(replication),系統儲存資料的副本(replica)。
以某種方式結合大部分資料來共同完成分析。MapReduce
大資料之Spark(七)--- Spark機器學習,樸素貝葉斯,酒水評估和分類案例學習,垃圾郵件過濾學習案例,電商商品推薦,電影推薦學習案例
一、Saprk機器學習介紹
------------------------------------------------------------------
1.監督學習
a.有訓練資料集,符合規範的資料
b.根據資料集,產生一個推斷函式