1. 程式人生 > >大資料點點滴滴積少成多---進階之路

大資料點點滴滴積少成多---進階之路

1.   HADOOP入門

大資料技術生態體系:

的Hadoop(HDFS,對映精簡,紗線)元老級大資料處理技術框架,擅長離線資料分析

HBase的分散式海量資料庫,離線分析和線上業務通吃

Hive sql資料倉庫工具,使用方便,功能豐富,基於MR延遲大

Sqoop資料匯入匯出工具

水槽資料採集框架

風暴實時流式計算框架,流式處理領域頭牌框架

Spark基於記憶體的分散式運算框架,一站式處理all in one,新秀,發展勢頭迅猛

sparkCore

SparkSQL

SparkStreaming

機器學習:

Mahout的基於MapReduce的的機器學習演算法庫

MLLIB基於火花機器學習演算法庫

  • 理解該工框架的功能和適用場景
  • 怎麼用(部署使用,程式設計規範,API)
  • 執行機制
  • 架構原理
  • 原始碼

BAT ----看運氣,其他,一切皆有可能

薪水---- 15000 <=你的底線期望值== 20k

發展路線----

應用開發 - 高階開發人員

平臺開發 - 架構級別

|

| ----------------- | --------- ---- ---- ------------- |

|     |                |

架構師資料探勘模型設計管理

先來寫一個普通的單詞計數程式

觀察程式處理的速度和能力

1.4.2 hadoop應用場景

(1)的hadoop的是用於處理(運算分析)海量資料的,且是採用分散式叢集的方式;

(2)通俗來說,可以把hadoop的理解為一個程式設計框架(比例結構,彈簧,休眠/ MyBatis的),有著自己特定的API封裝和使用者程式設計規範,使用者可藉助這些API來實現資料處理邏輯;

(3)從另一個角度,hadoop的又可以理解為一個提供服務的軟體(比如資料庫服務預言/ MySQL的,索引服務solr的,快取服務redis的等),使用者程式的功能都是通過客戶端向的hadoop的叢集請求服務來實現;

(4)具體來說,hadoop的的兩個大的功能:海量資料的儲存;海量資料的分析;

(5)的Hadoop的有3大核心元件:

HDFS

---- hadoop分散式檔案系統海量資料的儲存(叢集服務),

MapReduce的的

----運算框架(導jar包寫程式),海量資料運算分析(替代品:torm / spark / tez等)

----資源排程管理叢集(可以理解為一個分散式的作業系統,叢集服務)

(6)的Hadoop的產生的歷史

最早來自於谷歌的三大論文(為什麼谷歌會需要這麼一種技術)

後來經過doug cutting的山寨,出現了java版本的hdfs mapreduce和hbase

以上三個元件整合起來成為阿帕奇的一個頂級專案的Hadoop

經過演化,hadoop的元件又多出一個紗(mapreduce + yarn + hdfs)

而且,hadoop的的外圍產生了越來越多的工具元件,形成一個龐大的的hadoop的生態體系

(學習要求:掌握開發測試級別的的hadoop的叢集部署運維)

  1. 準備linux伺服器(centos 6.4 32位<生產環境的伺服器應該採用64位,可以支援更大的記憶體>)
  1. 解壓的CentOS的映象壓縮包到某個目錄,並用VMware的的開啟
  2. 準備作業系統環境(主機名,IP地址配成靜態,域名和IP的本地對映主機)
  3. 關閉圖形介面的啟動修改/ etc / inittab中的啟動級別為3
  1. 準備的Java的環境,安裝JDK,配置環境變數等
  • 解壓安裝包,修改環境變數:JAVA_HOME PATH
  • 配置防火牆(關閉)
  • 為hadoop軟體準備一個專門的linux使用者(在我們的linux系統映象中,有使用者hadoop,密碼:hadoop),為hadoop使用者設定sudo許可權/ etc / sudoers
  1. 安裝的hadoop ----(解壓,修改配置檔案,分發到叢集,初始化,啟動)

Hadoop的的的目錄結構:

濱#可執行檔案(Hadoop的的功能操作命令)

等#配置檔案

包括    

LIB#本地庫檔案(資料壓縮編解碼,本地檔案系統操作)

的的libexec    

LICENSE.TXT

NOTICE.txt

的的README.txt

sbin目錄#可執行檔案(Hadoop的叢集程序管理的操作命令)

分享#開發所需要的JAR包及使用者幫助文件

  1. 修改配置檔案(參考現成的配置檔案XXX-site.xml中中)

(1)hadoop-env.sh JAVA_HOME = / home / hadoop / app / jdk_7u65

(2)核心-site.xml中    

fs.defaultFS指定的Hadoop所使用的檔案系統

hadoop.tmp.dir指定各節點上的hadoop的程序所在的本地工作目錄(父目錄)

(3)mapred-site.xml mapreduce.framework.name:yarn

(4)yarn-site.xml yarn.resourcemanager.hostname:server01(yarn中的主節點所在主機)

                 yarn.nodemanager.aux服務:mapreduce_shuffle

(5)可選:

如果要讓名稱節點單獨配置一個工作目錄,在HDFS-site.xml中:

 <名稱> dfs.namenode.name.dir </名稱>

 <value> / mnt / driver-meta /,nfs:// </ value>

如果要讓資料節點單獨配置一個工作目錄,在HDFS-site.xml中:

 <名稱> dfs.datanode.data.dir </名稱>

 <value> / mnt / driver-data-a /,/ mnt / driver-data-b /,/ mnt / driver-data-c / </ value>

如果要讓secondary namenode在指定的機器上啟動,則配置:

<名稱> dfs.namenode.secondary.http地址</名稱>

<值> Hadoop的Server02上:50090 </ value>

(6)真實生產中部署一箇中型叢集:

有些公司會藉助一些自動化的網路拷貝工具加快配置速度

有些公司會採用一些商業發行版(CDH - Cloudera的公司的產品; HORTONWORKS; 微軟,IBM,EMC,INTEL)

  1. 啟動的Hadoop的

首先,格式化nameonde bin / hadoop namenode -format

  1. 手動一臺一臺地啟動

在相應伺服器上啟動HDFS的相關程序:

  啟動namenode程序 - sbin / hadoop-daemon.sh start namenode

啟動datanode程序--sbin / hadoop-daemon.sh啟動datanode

然後,驗證HDFS的服務是否能正常提供:

bin / hdfs dfsadmin -report檢視hdfs叢集的統計資訊

  1. 殼牌指令碼批量啟動方式:

在任意一臺伺服器上執行命令:

啟動HDFS服務:sbin目錄/ start-dfs.sh

啟動紗服務:sbin目錄/ start-yarn.sh

或者:直接啟動hdfs + yarn服務:sbin / start-all.sh

6,叢集內部的SSH金鑰認證登陸機制配置(免密登陸)

配置的機制:在登陸方生成密對,然後將公司複製給目標主機,在目標主機上將這個公司加入授權檔案〜/ .ssh / authorized_keys(該檔案的許可權:600)

真實大量配置的時候直接使用SSH工具箱的工具:

1 /在登陸方生成密對,執行命令:ssh-keygen

2 /執行這條指令:

ssh-copy-id hadoop-server03

就可以免密登陸目標主機

總的設計思想:

分而治之 - 將大檔案,大批量檔案,分散式存放在大量獨立的伺服器上,以便於採取分而治之的方式對海量資料進行運算分析;

重點概念:檔案切塊,副本存放,元資料,位置查詢,資料讀寫流

--appendToFile ----追加一個檔案到已經存在的檔案末尾

例如:hadoop fs -appendToFile ./hello.txt hdfs:// hadoop-server01:9000 / hello.txt

可以簡寫為:

Hadoop fs -appendToFile ./hello.txt /hello.txt

-cat ---顯示檔案內容  

例如:hadoop fs -cat /hello.txt

-chgrp

-chmod

-chown

上面三個跟的的Linux中的用法一樣

例如:hadoop fs -chmod 666 /hello.txt

-copyFromLocal#從本地檔案系統中拷貝檔案到HDFS路徑去

例如:hadoop fs -copyFromLocal ./jdk.tar.gz / aaa /

-copyToLocal#從HDFS拷貝到本地

例如:hadoop fs -copyToLocal /aaa/jdk.tar.gz

-count#統計一個指定目錄下的檔案節點數量

例如:hadoop fs -count / aaa /

-cp#從HDFS的一個路徑拷貝HDFS的另一個路徑

hadoop fs -cp /aaa/jdk.tar.gz /bbb/jdk.tar.gz.2

-createSnapshot

-deleteSnapshot

-renameSnapshot

以上三個用來操作HDFS檔案系統目錄資訊快照

例如:hadoop fs -createSnapshot /

-df#統計檔案系統的可用空間資訊

-du

例如:hadoop fs -df -h /

例如:hadoop fs -du -s -h / aaa / *

-get#等同於copyToLocal,就是從HDFS下載檔案到本地

-getmerge#合併下載多個檔案

例如:比如hdfs的目錄/ aaa /下有多個檔案:log.1,log.2,log.3,...

hadoop fs -getmerge /aaa/log.* ./log.sum

-help#輸出這個命令引數手冊

-ls#顯示目錄資訊

例如:hadoop fs -ls hdfs:// hadoop-server01:9000 /

這些引數中,所有的HDFS路徑都可以簡寫

例如:hadoop fs -ls /等同於上一條命令的效果

-mkdir#在HDFS上建立目錄

例如:hadoop fs -mkdir -p / aaa / bbb / cc / dd

-moveFromLocal#從本地剪下貼上到HDFS

-moveToLocal#從HDFS剪下貼上到本地

-mv#在HDFS目錄中移動檔案

-put#等同於copyFromLocal

-rm#刪除檔案或資料夾

例如:hadoop fs -rm -r / aaa / bbb /

-rmdir#刪除空目錄

-setrep#設定HDFS中檔案的副本數量

例如:hadoop fs -setrep 3 /aaa/jdk.tar.gz

-stat#顯示一個檔案或資料夾的元資訊

-tail#顯示一個檔案的末尾

-text#以字元形式列印一個檔案的內容

首先,它是一個檔案系統,有一個統一的名稱空間 - 目錄樹

其次,它是分散式的,由很多伺服器聯合起來實現功能;

  1. HDFS系統-會給檔案客戶端的的提供一個統一的抽象藝術藝術目錄樹,客戶端訪問HDFS檔案時就是通過指定這個抽象目錄中的路徑來訪問
  2. HDFS的中檔案都是分塊(塊)儲存的,塊的大小可以通過配置引數(dfs.blocksize)來規定,預設大小在hadoop2.x版本中是128M,老版本中是64M
  3. 檔案的各個塊由誰來進行真實的儲存呢?----在分佈各個資料節點服務節點上,每而且一個塊可以都儲存多個副本(副本數量也可以通過引數設定dfs.replication)
  4. Hdfs中有一個重要的角色:namenode,負責維護整個hdfs檔案系統的目錄樹,以及每一個路徑(檔案)所對應的塊塊資訊(block的id,及所在的datanode伺服器)
  5. HDFS的設計英文分類照片中翻譯適應一次寫入,多次讀出的場景,並不支援檔案的修改

(HDFS並不適合用來做網盤應用,因為,不便修改,延遲大,網路開銷大,成本太高)

可以隨機定位讀取位置:DFSInputStream.seek()

  1. 搭建開發環境(蝕,HDFS的罐子包----的的hadoop的安裝目錄的份額下)

<依賴性>

    <的groupId> org.apache.hadoop </的groupId>

   <artifactId的> Hadoop的客戶端</ artifactId的>

    <版本> 2.4.1 </版本>

</依賴性>

建議在linux下下下進行客戶端應用的開發,不會存在相容性問題。

如果非要在視窗上做客戶端應用開發,需要設定以下環境:

  1. 在視窗的某個目錄下解壓一個的的Hadoop的安裝包
  2. 將安裝包下的lib中中和斌目錄用對應的視窗版本平臺編譯的本地庫替換
  3. 在視窗系統中配置HADOOP_HOME指向你解壓的安裝包
  4. 在視窗系統的路徑變數中加入的的Hadoop的倉目錄
  1. 在Java的的中操作HDFS,首先要獲得一個客戶端例項

配置conf =新配置()

FileSystem fs = FileSystem.get(conf)

而我們的操作目標是HDFS,所以獲取到的FS物件應該是DistributedFileSystem的例項;

獲得方法是從何處判斷具體例項化那種客戶端類呢?

----從CONF中的一個引數fs.defaultFS的配置值判斷;

如果我們的程式碼中沒有指定並且工程類路徑下也沒有給定相應的配置,conf中的預設值就來自於hadoop的jar包中的core-default.xml,預設值為:file:///

FS所具備的方法:

HDFS叢集分為兩大角色:的的NameNode,DataNode會會

的NameNode會負責管理整個檔案系統的元資料

資料管理部負責管理使用者的檔案資料塊


2.5 namenode工作機制

NameNode會的職責:

負責客戶端請求的響應

元資料的管理(查詢,修改)

---- hdfs元資料是怎麼儲存的?

A,記憶體中有一份完整的元型態型態資料

B,磁碟有一個“準完整”的元資料映象

C,當客戶端對HDFS中的檔案進行新增或者修改操作,的響應記錄首先被記入編輯作業這種記錄日誌中,當客戶端操作成功後,相應的元資料會更新到記憶體中

每隔一段時間,會由secondary namenode將namenode上積累的所有edits和一個最新的fsimage下載到本地,並載入到記憶體進行merge(這個過程稱為檢查點)

d,檢查點操作的觸發條件配置引數:

dfs.namenode.checkpoint.check.period = 60   檢查觸發條件是否滿足的頻率,60秒

dfs.namenode.checkpoint.dir =檔案:// $ {} hadoop.tmp.dir / DFS / namesecondary

以上兩個引數做checkpoint 操作時,secondary namenode 的本地工作目錄

dfs.namenode.checkpoint.edits.dir = $ {} dfs.namenode.checkpoint.dir

dfs.namenode.checkpoint.max-retries = 3   #最大重試次數

dfs.namenode.checkpoint.period = 3600   #兩次檢查點之間的時間間隔3600秒

dfs.namenode.checkpoint.txns = 1000000  #兩次checkpoint之間最大的操作記錄

E,namenode和secondary namenode的工作目錄儲存結構完全相同,所以,當namenode故障退出需要重新恢復時,可以從secondary namenode的工作目錄中將fsimage拷貝到namenode的工作目錄,以恢復namenode的元資料

樓可以通過HDFS的一個工具來檢視編輯中的資訊

bin / hdfs oev -i edits -o edits.xml

某個Datanode的工作職責:

儲存管理使用者的檔案塊資料

定期向名稱節點彙報自身所持有的塊資訊(通過心跳資訊上報)

上傳一個檔案,觀察檔案的塊具體的物理存放情況

在每一臺資料節點機器上的這個目錄:

/home/hadoop/app/hadoop-2.4.1/tmp/dfs/data/current/BP-193442119-192.168.2.120-1432457733977/current/finalized

  1. HDFS的其他訪問方式:

HDFS檔案系統可以通過標準的hdfs shell / rest api / java api來操作,還可以利用fuse這種工具將hdfs掛載為一個unix標準檔案系統,就可以使用標準的linux檔案操作方式來操作hdfs檔案系統

HDFS還可以掛載為一個NFS系統

FileUtil工具類

FileUtil。副本  檔案(C:/test.tar.gz ),檔案系統獲得。(URI 建立“HDFS:// Hadoop的SERVER01:9000 ),CONF,“Hadoop的” ),  路徑(“/ test.tar.gz ),true,conf);

  1. HDFS的垃圾桶配置

HDFS存在回收站機制,進入回收站的檔案可以儲存一段時間,過期後再清除

引數配置:

fs.trash.checkpoint.interval = 0      回收站過期機制檢查頻率(分鐘)

fs.trash.interval = 0      回收站中檔案過期的時間限制(分鐘)

  1. 萬用字元及過濾器選擇檔案
  1. Namenode會會的安全模式

(1)當nameonde發現檔案塊丟失的數量達到一個配置的門限時,就會進入安全模式,它在這個模式下等待資料管理部向它彙報塊資訊。

(2)在安全模式下,名稱節點可以提供元資料查詢的功能,但是不能修改;

可以手動管理的的NameNode的安全模式:

hdfs dfsadmin -safemode <enter | 離開| 得到| 等待]>

1,跟的NameNode的通訊查詢元資料,找到檔案塊所在的資料節點伺服器

2,挑選一臺資料節點(就近原則,然後隨機)伺服器,請求建立插座流

3,資料管理部開始傳送資料(從磁盤裡面讀取資料放入流,以包為單位來做校驗)

如圖4所示,客戶端以分組為單位接收,現在本地快取,然後寫入目標檔案

如圖1所示,根名稱節點通訊請求上傳檔案,名稱節點檢查目標檔案是否已存在,父目錄是否存在

2,名稱節點返回是否可以上傳

3,客戶端請求第一個塊該傳輸到哪些資料節點伺服器上

4,名稱節點返回3個數據節點伺服器ABC

5,客戶端請求3臺DN中的一臺甲上傳資料(本質上是一個RPC呼叫,建立管道),A收到請求會繼續呼叫B,然後乙呼叫C,將真個管道建立完成,逐級返回客戶端

6,客戶端開始往甲上傳第一個塊(先從磁碟讀取資料放到一個本地記憶體快取),以分組為單位,A收到一個數據包就會傳給B,B傳給℃; 一個每傳一個數據包會放入一個應答佇列等待應答

如圖7所示,當一個塊傳輸完成之後,客戶端再次請求名稱節點上傳第二個塊的伺服器。

Hadoop的的中各節點之間存在大量的遠端過程呼叫,Hadoop的的為此封裝了一個RPC基礎框架

使用方法:

(1)定義一個介面,例項如下:

// RCP通訊的兩端共同遵守的協議(本質上就是業務實現類的介面)

public interface ClientNameNodeProtocal {

// RPC通訊雙方一致的版本號

public static final long versionID = 1L;

//業務方法簽名

public String getMetaData(String path);

}

(2)編寫介面的業務實現類

/ **

 *業務的具體實現類,應該執行在遠端伺服器上

 * @author [email protected]

 *

 * /

公共類NamNodeNameSystemImpl實現ClientNameNodeProtocal {

@覆蓋

public String getMetaData(String path){

//許多邏輯程式碼用於在元資料池中查詢元資料

返回“{/aa/bb/bian4.mp4;300M; [BLK_1,BLK_2,BLK_3]; 3;

{[BLK_1:DN-A,DN-B,DN-E],[BLK_2:DN-A,DN-B,DN-C],[BLK_3:DN-A,DN-d,DN-E]}} “;

}

}

(3)使用RPC框架API將業務實現釋出為RPC服務

/ **

 * RCP服務釋出工具

 * @author [email protected]

 *

 * /

公共類PublishServiceTool {

public static void main(String [] args)丟擲HadoopIllegalArgumentException,IOException {

//建立一個RPC服務建設者

Builder builder = new RPC.Builder(new Configuration());

//將要釋出的服務的資訊設定到建設者中

builder.setBindAddress(“spark01”)。setPort(10000).setProtocol(ClientNameNodeProtocal.class).setInstance(new NamNodeNameSystemImpl());

//用建設者構建出一個插座服務

Server server = builder.build();

//將服務啟動,就可以等待客戶端請求

server.start();

}

}

(4)客戶端通過RPC框架API獲取跟RPC服務端通訊的插座代理,呼叫遠端服務

公共類客戶{

public static void main(String [] args)throws Exception {

//首先用RPC框架獲得要呼叫的遠端服務的引用(動態代理物件)

ClientNameNodeProtocal namenodeImpl = RPC.getProxy(ClientNameNodeProtocal.class,1L,new InetSocketAddress(“spark01”,10000),new Configuration());

//因為這個動態代理物件實現了業務類的介面,所以可以直接通過這個引用來呼叫業務類的實現方法(本質上,具體實現在遠端,走的是插座通訊請求)

String metaData = namenodeImpl.getMetaData(“/ aa / bb / bian4.mp4”);

的的System.out.println(元資料);

}

}

3.6遠端除錯跟蹤Hadoop的服務端程式碼

(1)需要在$ HADOOP_HOME的/ etc / hadoop的/ hadoop-env.sh檔案的最後新增你想除錯的程序

#遠端除錯的NameNode的

export HADOOP_NAMENODE_OPTS =“ - agentlib:jdwp = transport = dt_socket,address = 8888,server = y,suspend = y”

#遠端除錯的資料節點

export HADOOP_DATANODE_OPTS =“ - agentlib:jdwp = transport = dt_socket,address = 9888,server = y,suspend = y”

  1. 在本地的日食中開啟的NameNode的或者資料節點類,點選右鍵,新增遠端除錯配置,如圖:

(3)新增一個遠端除錯除錯配置

(4)填寫遠端服務端的除錯地址和埠號

(5)接著在名稱節點類中新增斷點,如圖:

  1. 回到叢集伺服器上啟動HDFS
  1. 回到日食之前配置的遠端除錯配置上,點選除錯開始除錯

(8)成功進入斷點

MapReduce的的是一個分散式的運算程式設計框架,核心功能是將使用者編寫的核心邏輯程式碼分散式地執行在一個叢集的很多伺服器上;

學習要求:掌握MR程式程式設計規範;

   掌握MR程式執行機制

   掌握MR常見需求解決方式

(1)海量資料在單機上處理因為硬體資源限制,無法勝任,因為需要採用分散式叢集的方式來處理。

(2)而一旦將單機版程式擴充套件到叢集來分散式執行,將極大地增加程式的複雜度和開發難度

(3)引入的MapReduce的框架後,開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分散式計算中的複雜性交由框架來處理

的的Hadoop的釋出包中內建了一個的Hadoop的MapReduce的的例子-2.4.1.jar,這個罐包中有各種MR示例程式,可以通過以下步驟執行:

啟動HDFS,紗

然後在叢集中的任意一臺伺服器上執行,(比如執行單詞計數的):

hadoop jar hadoop-mapreduce-example-2.4.1.jar wordcount / wordcount / data / wordcount / out

  1. 使用者程式會分成三個部分:對映器,減速機,驅動器
  2. 對映器的輸入資料是KV對的形式,KV的型別可以設定
  3. 對映器的輸出資料是KV對的形式,KV的型別可以設定
  4. 對映中的業務邏輯寫在對映方法中
  5. 地圖方法是每進來一個KV對呼叫一次
  6. 減速機的輸入資料應該對應對映器的輸出資料,也是KV
  7. 減速機的業務邏輯寫在減少方法中
  8. 減少方法是對每一個<鍵,值列表>呼叫一次
  9. 使用者的對映和減速都要繼承各自的父類
  10. 整個程式需要一個Drvier來進行提交,提交的是一個描述了各種必要資訊的工作物件

(1)定義一個對映器類

//首先要定義四個泛型的型別

// keyin:LongWritable valuein:Text

// keyout:Text valueout:IntWritable

公共類WordCountMapper擴充套件Mapper <LongWritable,Text,Text,IntWritable> {

// map方法的生命週期:框架每傳一行資料就被呼叫一次

// key:這一行的起始點在檔案中的偏移量

//值:這一行的內容

@覆蓋

protected void map(LongWritable key,Text value,Context context)丟擲IOException,InterruptedException {

//拿到一行資料轉換為字串

String line = value.toString();

//將這一行切分出各個單詞

String [] words = line.split(“”);

//遍歷陣列,輸出<單詞,1>

for(String word:words){

context.write(new Text(word),new IntWritable(1));

}

}

}

(2)定義一個減速器類

//生命週期:框架每傳遞進來一個千伏組,減少方法被呼叫一次

@覆蓋

protected void reduce(Text key,Iterable <IntWritable> values,Context context)丟擲IOException,InterruptedException {

//定義一個計數器

int count = 0;

//遍歷這一組千伏的所有V,累加到計數中

for(IntWritable value:values){

count + = value.get();

}

context.write(key,new IntWritable(count));

}

}

(3)定義一個主類,用來描述工作並提交工作

公共類WordCountRunner {

//把業務邏輯相關的資訊(哪個是對映器,哪個是減速,要處理的資料在哪裡,輸出的結果放哪裡......)描述成一個工作物件

//把這個描述好的作業提交給叢集去執行

public static void main(String [] args)throws Exception {

配置conf = new Configuration();

Job wcjob = Job.getInstance(conf);

//指定我這個工作所在的JAR包

// wcjob.setJar(“/ home / hadoop / wordcount.jar”);

wcjob.setJarByClass(WordCountRunner.class);

wcjob.setMapperClass(WordCountMapper.class);

wcjob.setReducerClass(WordCountReducer.class);

//設定我們的業務邏輯對映類的輸出鍵和值的資料型別

wcjob.setMapOutputKeyClass(Text.class);

wcjob.setMapOutputValueClass(IntWritable.class);

//設定我們的業務邏輯減速類的輸出鍵和值的資料型別

wcjob.setOutputKeyClass(Text.class);

wcjob.setOutputValueClass(IntWritable.class);

//指定要處理的資料所在的位置

FileInputFormat.setInputPaths(wcjob,“hdfs:// hdp-server01:9000 / wordcount / data / big.txt”);

//指定處理完成之後的結果所儲存的位置

FileOutputFormat.setOutputPath(wcjob,new Path(“hdfs:// hdp-server01:9000 / wordcount / output /”));

//向紗線叢集提交這個工作

boolean res = wcjob.waitForCompletion(true);

System.exit(RES?0:1);

}

4.5.1本地執行模式

  1. MapReduce的的程式是被提交給LocalJobRunner在本地執行
  2. 而處理的資料及輸出結果可以在本地檔案系統,也可以在HDFS上
  3. 怎樣實現本地執行:寫一個程式,不要帶叢集的配置檔案(本質是你的先生程式的CONF中是否有mapreduce.framework.name =本地以及yarn.resourcemanager.hostname引數)

4.5.2叢集執行模式

  1. MapReduce的的程式會提交給紗線叢集的ResourceManager的中,分發到很多的節點上併發執行
  2. 處理的資料和輸出結果應該位於HDFS檔案系統
  3. 怎樣實現叢集執行:

A,將程式打成JAR包,然後在叢集的任意一個節點上用的Hadoop的命令啟動

$ hadoop jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver inputpath outputpath

B,直接在Linux的的的蝕中執行主要方法

(專案中要帶引數:mapreduce.framework.name =紗以及紗線的兩個基本配置)

C,如果要在視窗的日食中提交作業給叢集,則要修改YarnRunner類

4.6 MAPREDUCE中的Combiner

  1. 組合是MR程式中對映和減速之外的一種元件
  2. 組合元件的父類就是減速
  3. 合和減速機的區別在於執行的位置:

合是在每一個maptask所在的節點執行

減速是接收全域性所有對映器的輸出結果;

(1)的爪哇的序列化是一個重量級序列化框架(序列化),一個物件被序列化後,會附帶很多額外的資訊(各種校驗資訊,頁首,繼承體系.... ),所以很臃腫,不便於在網路中高效傳輸;

所以,的hadoop的自己開發了一套序列化機制(可寫),精簡,高效

  1. 簡單程式碼驗證兩種序列化機制的差別:

公共類TestSeri {

public static void main(String [] args)throws Exception {

//定義兩個ByteArrayOutputStream,用來接收不同序列化機制的序列化結果

ByteArrayOutputStream ba = new ByteArrayOutputStream();

ByteArrayOutputStream ba2 = new ByteArrayOutputStream();

//定義兩個DataOutputStream類類,用於將普通物件進行JDK標準序列化

DataOutputStream dout = new DataOutputStream(ba);

DataOutputStream dout2 = new DataOutputStream(ba2);

ObjectOutputStream obout = new ObjectOutputStream(dout2);

//定義兩個豆,作為序列化的源物件

ItemBeanSer itemBeanSer = new ItemBeanSer(1000L,89.9f);

ItemBean itemBean = new ItemBean(1000L,89.9f);

//用於比較字串型別和文字型別的序列化差別

文字atext = new Text(“a”);

// atext.write(dout);

itemBean.write(DOUT);

byte [] byteArray = ba.toByteArray();

//比較序列化結果

的的System.out.println(byteArray.length);

for(byte b:byteArray){

是System.out.print是(b)中中;

是System.out.print(“:”);

}

的System.out.println(“-----------------------”);

String astr =“a”;

// dout2.writeUTF(astr);

obout.writeObject(itemBeanSer);

byte [] byteArray2 = ba2.toByteArray();

的的System.out.println(byteArray2.length);

for(byte b:byteArray2){

是System.out.print是(b)中中;

是System.out.print(“:”);

}

}

}

4.8 Mapreduce的排序初步

MR程式在處理資料的過程中會對資料排序,排序的依據是對映器輸出的關鍵

5.1分割槽程式程式設計

Partition就是對地輸出的金鑰進行分組,不同的組可以指定不同的reduce task處理;

分割槽功能由分割槽的實現子類來實現

示例:不同省份流量資料彙總到不同檔案中

5.2 Mapreduce的排序----重點

MR中的常見排序機制:部分/全部/次級排序

MR中排序的基本要素:

排序是在對映階段輸出之後,降低處理之前

(通過無減少的MR程式示例觀察)

只針對重點進行排序

關鍵要實現WritableComparable介面

簡單示例:對流量彙總資料進行倒序排序

5.2.1 partital排序示例,多減少任務自動實現各輸出檔案有序

5.2.3總排序機制

  1. 設定一個reduce task,全域性有序,但是併發度太低,單節點負載太大
  2. 設定分割槽段partitioner,設定相應數量的reduce task,可以實現全域性有序,但難以避免資料分佈不均勻 - 資料傾斜問題,有些減少任務負載過大,而有些則過小;
  3. 可以通過編寫一個職位來統計資料分佈規律,獲取合適的區段劃分,然後用分割槽段分割槽來實現排序,但是這樣需要另外編寫一個工作對整個資料集運算,比較費事
  4. 利用的hadoop的自帶的取樣器,來對資料集取樣並劃分區段,然後利用的hadoop的自帶的TotalOrderPartitioner分割槽來實現全域性排序

示例:

/ **

 *全排序示例

 * @author [email protected]

 *

 * /

公共類TotalSort {

static class TotalSortMapper擴充套件Mapper <Text,Text,Text,Text> {

OrderBean bean = new OrderBean();

@覆蓋

protected void map(文字鍵,文字值,上下文上下文)丟擲IOException,InterruptedException {

// String line = value.toString();

// String [] fields = line.split(“\ t”);

// bean.set(fields [0],Double.parseDouble(fields [1]));

context.write(鍵,值);

}

}

static class TotalSortReducer擴充套件Reducer <Text,Text,Text,Text> {

@覆蓋

protected void reduce(文字鍵,Iterable <Text>值,Context上下文)丟擲IOException,InterruptedException {

for(Text v:values){

context.write(鍵,V);

}

}

}

public static void main(String [] args)throws Exception {

配置conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(TotalSort.class);

job.setMapperClass(TotalSortMapper.class);

job.setReducerClass(TotalSortReducer.class);

// job.setOutputKeyClass(OrderBean.class);

// job.setOutputValueClass(NullWritable.class);

//用來讀取序列原始檔的輸入元件

job.setInputFormatClass(SequenceFileInputFormat.class);

FileInputFormat.setInputPaths(job,new Path(args [0]));

FileOutputFormat.setOutputPath(job,new Path(args [1]));

// job.setPartitionerClass(RangePartitioner.class);

//分割槽的邏輯使用的Hadoop的的自帶的全域性排序分割槽元件

job.setPartitionerClass(TotalOrderPartitioner.class);

//系統自帶的這個抽樣器只能針對sequencefile抽樣

RandomSampler randomSampler

 = new InputSampler.RandomSampler <Text,Text>(0.1,100,10);

InputSampler.writePartitionFile(工作,randomSampler);

//獲取抽樣器所產生的分割槽規劃描述檔案

配置conf2 = job.getConfiguration();

String partitionFile = TotalOrderPartitioner.getPartitionFile(conf2);

//把分割槽描述規劃檔案分發到每一個任務節點的本地

job.addCacheFile(new URI(partitionFile));

//設定若干併發的減少任務

job.setNumReduceTasks(3);

job.waitForCompletion(真);

}

}

5.2.4二級排序機制

----就是讓的MapReduce的幫我們根據價值排序

考慮一個場景,需要取按鍵分組的最大價值條目:

通常,洗牌只是對鍵進行排序

如果需要對值排序,則需要將值放到鑰匙中,但是此時,價值就和原來的鍵形成了一個組合鍵,從而到達減速時,組合鍵是一個一個到達減速,想在減速中輸出最大值的那一個,不好辦,它會一個一個都輸出去,除非自己弄一個快取,將到達的組合鍵全部快取起來然後只取第一個

(或者弄一個訪問標識?但是同一個減速可能會收到多個鍵的組合鍵,無法判斷訪問標識)

此時就可以用到secondary sort,其思路:

  1. 要有對組合鍵排序的比較器
  2. 要有分割槽進行分割槽負載並行減速計算
  3. 要有一個groupingcomparator來重定義值列表聚合策略 - 這是關鍵,其原理就是將相同鍵而不同組合鍵的資料進行聚合,從而把他們聚合成一組,然後在減速中可以一次收到這一組關鍵詞的組合鍵,並且,值最大的也就是在這一組中的第一個組合鍵會被選為迭代器值列表的關鍵,從而可以直接輸出這個組合鍵,就實現了我們的需求

示例:輸出每個專案的訂單金額最大的記錄

(1)定義一個GroupingComparator

/ **

 *用於控制洗牌過程中減少端對KV對的聚合邏輯

 * @author [email protected]

 *

 * /

公共類ItemidGroupingComparator擴充套件WritableComparator {

protected ItemidGroupingComparator(){

超(OrderBean.class,真);

}

@覆蓋

public int compare(WritableComparable a,WritableComparable b){

OrderBean abean =(OrderBean)a;

OrderBean bbean =(OrderBean)b;

//將ITEM_ID相同的豆都視為相同,從而聚合為一組

return abean.getItemid()。compareTo(bbean.getItemid());

}

}

(2)定義訂單資訊豆

/ **

 *訂單資訊豆,實現的Hadoop的序列化機制

 * @author [email protected]

 *

 * /

公共類OrderBean實現WritableComparable <OrderBean> {

私人文字的itemid;

私人DoubleWritable金額;

public OrderBean(){

}

public OrderBean(Text itemid,DoubleWritable amount){

集(商品ID,金額);

}

public void set(Text itemid,DoubleWritable amount){

this.itemid = itemid;

this.amount =金額;

}

public Text getItemid(){

return itemid;

}

public DoubleWritable getAmount(){

退貨金額;

}

@覆蓋

public int compareTo(OrderBean o){

int cmp = this.itemid.compareTo(o.getItemid());

if(cmp == 0){

cmp = -this.amount.compareTo(o.getAmount());

}

返回CMP;

}

@覆蓋

public void write(DataOutput out)丟擲IOException {

out.writeUTF(itemid.toString());

out.writeDouble(amount.get());

}

@覆蓋

public void readFields(DataInput in)throws IOException {

String readUTF = in.readUTF();

double readDouble = in.readDouble();

this.itemid = new Text(readUTF);

this.amount = new DoubleWritable(readDouble);

}

@覆蓋

public String toString(){

return itemid.toString()+“\ t”+ amount.get();

}

}

(3)自定義一個分割槽器,以使相同ID的豆發往相同減少任務

公共類ItemIdPartitioner擴充套件了Partitioner <OrderBean,NullWritable> {

@覆蓋

public int getPartition(OrderBean key,NullWritable value,int numPartitions){

//指定ITEM_ID相同的豆發往相同的減速任務

返回(key.getItemid()的hashCode()&Integer.MAX_VALUE的。)%numPartitions;

}

}

(4)定義了主體先生流程

/ **

 *利用secondarysort機制輸出每種專案訂單金額最大的記錄

 * @author [email protected]

 *

 * /

公共類SecondarySort {

static class SecondarySortMapper擴充套件Mapper <LongWritable,Text,OrderBean,NullWritable> {

OrderBean bean = new OrderBean();

@覆蓋

protected void map(LongWritable key,Text value,Context context)丟擲IOException,InterruptedException {

String line = value.toString();

String [] fields = StringUtils.split(line,“\ t”);

bean.set(new Text(fields [0]),new DoubleWritable(Double.parseDouble(fields [1])));

context.write(豆,NullWritable.get());

}

}

static class SecondarySortReducer擴充套件Reducer <OrderBean,NullWritable,OrderBean,NullWritable> {

//在設定了groupingcomparator以後,這裡收到的kv資料就是:<1001 87.6>,null <1001 76.5>,null ....

//此時,減少方法中的引數key就是上述kv組中的第一個kv的金鑰:<1001 87.6>

//要輸出同一個專案的所有訂單中最大金額的那一個,就只要輸出這個關鍵

@覆蓋

protected void reduce(OrderBean key,Iterable <NullWritable> values,Context context)丟擲IOException,InterruptedException {

context.write(鍵,NullWritable.get());

}

}

public static void main(String [] args)throws Exception {

相關推薦

Pandas使用DataFrame進行資料分析比賽(二):日期資料處理:按日期篩選、顯示及統計資料

首先,表格的資料格式如下: 1、獲取某年某月資料 data_train = pd.read_csv('data/train.csv') # 將資料型別轉換為日期型別 data_train[

php技能樹---神的

進行 think 文本編輯器 性能優化 mysq bootstrap 同步機制 小型 連接 PHP7 迎來巨大的性能提升,又一次回到關註的焦點。根據這些年在開發圈子總結的LNMP程序猿發展軌跡,結合個人經驗體會,總結出很多程序員對未來的迷漫,特別對技術學習的盲目和慌亂,簡單

新人程式設計師的

1.對程式碼花時間解構出來那一塊負責什麼功能,把專案給庖丁解牛成一個個不同功能的模組 2.對每個模組實現什麼瞭解 3.看懂每個模組的程式碼,不懂就google+stackoverflow去問 4.嘗試對某個你感興趣的小模組去重構 5.重構出來的效能不如原來的,分析原因,回到4,迴圈 6.期間惡補相關的知識,特

一個資深Java程式設計師從碼農到牛的

無論是誰,在剛進入某個領域之時,有再大的雄心壯志也敵不過眼前的迷茫:不知道應該怎麼做,不知道應該做什麼。下面是讓年輕程式設計師少走彎路的14個忠告,希望能對大家有所幫助。1.不要害怕在工作中學習。只要有電腦,就可以通過電子閱讀器閱讀報紙和大多數書籍。如果你只是做好自己的本職工

成為1個技術牛的入門到(學習路線圖)

有興趣朋友也可以進一步關注公眾號“架構之道與術”, 獲取原文。 或掃描如下二維碼: 計算機領域技術更迭非常之快,內容博大精深。涉及到分散式架構,更是分支眾多,知識龐雜。很多新人在最初往往找不到頭緒,不知道從何處下手來一步步提升自己的技術水準。 本文

資料分析

      博者本人一路從統計走來,從最初的年幼無知到現在無比的後悔,多多少少也是有點心路旅程的人,再此記錄下,並且推薦一些乾貨,以供學弟學妹們參考。     首先,說說統計學的就業趨勢,筆者親身體驗,近年來難度以指數形勢增加,因為隨著機器學習以及人工智慧的告訴發展,湧現一大

避免五大誤區丨新手資料科學家

**作者 Jan Zawadzki 編譯 Mika 本文為 CDA 資料分析師原創作品,轉載需授權** 你為成為資料科學家做了充分的準備,但實際的工作將於你的預期大不相同。 你為成為資料科學家做好了充分的準備。你參加Kaggle比賽,看了大量的Cour

Java碼農~基本資料型別&運算子&流程控制-分支&三目

基本資料型別    首先先說下位元組:位元組(byte)是計算機資訊科技用於計量儲存容量的一種計量單位    通常情況下一位元組由8個二進位制位表示 0000 0000;一個位元組的取值範圍為-128~127.大家有興趣的可以瞭解下為什麼是-128~127.    一個Byt

CSS

模式 ant 表格 weight mil 比較 標題 根據 amp 下面主要引用http://www.cnblogs.com/wangfupeng1988/tag/css知多少/ CSS進階筆記: 一、學習CSS的三個突破點 1.瀏覽器如何加載和解析CSS——CSS的5個來

(基礎篇) - 011 arduino api基礎手冊

異或 change 可用 算術運算符 chan 程序結構 換算 是否 關閉 arduino 函數 api 程序結構 在Arduino中, 標準的程序入口main函數在內部被定義, 用戶只需要關心以下兩個函數:void setup()void loop()setup() 函數

(基礎篇) - 008 SPI數據傳輸(庫函數方法)

ria att clockd == bus 屏蔽 attach serial out 主機端: 1 /********************************* 2 代碼功能:SPI數據傳輸(主機端) 3 引腳說明: 4 SS/CS:片選(高電平屏

(基礎篇) - 009 通過底層AVR方法實現SPI數據傳輸

lean oop and return false 進階 from setup pie 主機端: /********************************* 代碼功能:通過底層AVR方法實現SPI數據傳輸(主機端) 創作時間:2016*10*17 使用資源:

(基礎篇) - 007 脈沖寬度測量

style 函數 long 最大 void serial 作者 println 電平 1 /********************************* 2 代碼功能:Pulse脈沖寬度測量 3 使用函數: 4 pulseIn(引腳號,脈沖響應電平,

(中級篇) - 018 基於arduino的簡易版智能衣架

檢驗 dig cloc 布線 pin on() -- mage 根據 一. 設備及要求 目的:制作一個可以自動根據事實的天氣的狀況進行對衣架上的衣服進行晾曬。 基礎裝置:可伸縮的晾衣架。 開發環境:Arduino1. 8.1 主控板:Arduino UNO 動力裝置:

【SSH】Hibernate映射——一對一單向關聯映射(五)

技術 iyu 標識 tails for sso 3.0 sdn 例如 【SSH進階之路】Hibernate基本原理(一) ,小編介紹了Hibernate的基本原理以及它的核心,採用對象化的思維操作關系型數據庫。 【SSH進階之路】Hibernate搭建開發環境+簡單實例

【SSH】Struts + Spring + Hibernate 開端(一)

height 一段 ioc 效率 陽光大道 面向對象的思想 text ase 們的 Long Long ago。就聽說過SSH。起初還以為是一個東東,詳細內容更是不詳,總認為高端大氣上檔次,經過學習之後才發現,不不過高大上,更是低調奢華有內涵,經過一段時間的

2017PHP程序員的

通信協議 thinkphp nmp 圖形圖像 update gin io模型 應用場景 單例 又是一年畢業季,可能會有好多畢業生即將進入開發這個圈子,踏上碼農這個不歸路。根據這些年在開發圈子總結的LNMP程序猿發展軌跡,結合個人經驗體會,總結出很多程序員對未來的迷漫,特別對

樹莓派 (012) - 關於Raspberry Pi樹莓派無線網卡配置

linu names interface 文件內容 動態ip down run 表示 無線網絡 Raspberry Pi樹莓派無線網卡配置[多重方法備選] 要想讓樹莓派方便操作,肯定需要配置無線網卡,這樣可以大大增強樹莓派的移動性和便利性,其實配置無線網卡基本就是和普通li

【SSH】Hibernate基本映射(三)

tor res 主動 tran clas oid 支持包 lose 包括 【SSH進階之路】Hibernate基本原理(一) ,小編介紹了Hibernate的基本原理以及它的核心。採用對象化的思維操作關系型數據庫。 【SSH進階之路】Hibernate搭建開發環境+簡單