1. 程式人生 > >Hadoop 框架基礎(四)

Hadoop 框架基礎(四)

釋放 top gem orien 系統啟動 -s blog 希望 記錄

** Hadoop 框架基礎(四)

上一節雖然大概了解了一下 mapreduce,徒手抓了海膽,不對,徒手寫了 mapreduce 代碼,也運行了出來。但是沒有做更深入的理解和探討。

那麽……

本節目標:

* 深入了解 mapreduce 過程

* 成功部署 Hadoop 集群

** mapreduce 原理

想要了解 mapreduce 原理,我們必須搞清楚處理數據時的每一個重要階段,首先,貼上一張官方的圖:

技術分享圖片

我們依次討論每一個過程以及該過程對應的作用:

我先在這裏假設一個情景,我現在有一個 10G 大小的 words.txt,裏面存放的是 N 多個英文單詞。

這 10G 大小的文件分為若幹個 128M 的文件塊 block 分布存儲於若幹個服務器。

好,現在我要統計這 10G 文件中的單詞出現頻率。

** input split

一個 split 會對應一個 map 任務。

一般來講,split 的個數和 block 的個數相同,當然也可以多個 block 屬於同一個 split,但是後者會產生大量的網絡和磁盤 IO,原因在於一個 split 對應一個 map 任務,一個 map 任務肯定跑在某一臺機器上,如果某個 split 所包含的多個 block 分布於不同的機器,首先需要做的操作就是把其他機器的 block 拷貝到運行 map 任務的機器上,這會耗費一定時間,所以,默認情況下,一個 block 對應一個 split,源碼中設定如下:

mapreduce.input.fileinputformat.split.minsize == 0

mapreduce.input.fileinputformat.split.maxsize == 10000

splitSize=max(minSize,min(maxSize, blockSize)),此為默認 split 大小

如果要修改,則如下方式:

recordSize 表示一個記錄的大小,分塊要保證數據的完整性,所以:

int blockSize = Integer.parseInt(x); //x 表示你希望的 split 大小

int splitSize = blockSize / recordSize * recordSize;

conf.setLong("mapred.max.split.size",splitSize);

conf.setLong("mapred.min.split.size",splitSize);

** map

此時輸入的到 map 中的數據形式大致為:

<0, cat one hadoop element...> ---> 調用一次 map

<30, dog two one hadoop....> ---> 調用一次 map

……

省略號表示後邊還有,其中 0,30 表示的是偏移量,每次從當前 split 中讀取 1 行數據,比如第一次讀取第一行,偏移量為 0~29,第二次是第二行數據,偏移量是 30~?,以此類推。每次讀取都執行一次 map 任務,並調用一次 map 方法。map 階段結束,輸出結果形式大致為:

<cat , 1> <one, 1> <hadoop, 1> <element, 1> …… 等等


從此進入 shuffle 階段

** buffer in memory

這是一個狀態描述,表明此刻在內存中開始操作,buffer 在這裏是內存中的一個環形數組。

之所以用環形數組來存放數據,是因為這樣可以最大化的利用存儲空間。

這個環形數組中存放數據分為兩個類別:

1、元數據區(Kvmeta):

裏面存放的每組數據都包含:

** value 的起始位置

** key 的起始位置

** partition 值

** value 的長度

2、數據區(Kvbuffer):

裏面存放的每組數據都包含:

** key 值,例如 <cat ,1> 中的 cat

** value 值,例如 <cat, 1> 中的 1

註意:

* 以上兩個區域的分界點為 0,即 0 以上存儲數據區內容,0 以下存儲元數據區的內容。

* 這個環形 buffer 雖然實際為一個字節數組,但抽象為一個 IntBuffer,無論哪個區域中的數據,每組數據中的每個元素都占用 4 個字節,也就是每組中的每個元素的存儲,數組下標都將移動 4 位 (因為一個 int 為 4 個字節)。

* partition

分區的意義在於把一系列相似的單詞分為同一個區。即單詞歸類處理,這樣不同機器上的不同 map 任務輸出的單詞可以依據分區遞交給相同的 reduce 做處理。

註意:

* 相關類: HashPartitioner

* 這裏的 “相似”,指的是:鍵(此例中為單詞)的 hash 值在某一個範圍內

* sort

map 排序階段,在 buffer 中把數據按照 partion 和 key 兩個關鍵字做升序排序,這個排序只需要移動 “元數據區” 中的每組數據順序即可。排序結果是 “元數據區” 中的每組數據按照 partition 分區聚集在一起,同一個 partition 分區內的 key 按照字典順序排序。

* combine(可選)

結合階段,可以在 map 階段簡化數據輸出,減少後邊 spill 溢寫過程中,spill 溢寫文件的大小,例如:可以將 <cat, 1> <cat, 1 > 這樣的數據在 map 階段合並為 < cat, 2 > 這樣的數據作為 map 輸出,默認沒有開啟。

* spill

溢寫階段,當內存中的環形存儲結構占用率達到一定程度(默認占用 80% 時,則開始溢寫),則將環形數據區中的所有內容,刷入到當前本地硬盤能夠存的下這些數據的目錄中,以使內容騰出空間供後邊繼續使用。

相同的 partition 分區的數據寫入到同一個文件中,類似:“spill10.out”,“spill11.out”這樣的文件,每一個 partition 分區所產生的文件的存放位置和一些相關信息,存放在另一個 “元數據” 文件中,類似“spill10.out.index”,“spill11.out.index”(註意,這個元數據文件和剛才說的元數據區不是一碼事)。

這個元數據文件包含:

** 起始位置

** 原始數據長度

** 壓縮之後的數據長度

** crc32 的校驗數據

該文件的作用是:標記某個 partition 對應的文件在哪個目錄,哪個索引中存放。

註意:

* spill10.out.index 這樣的文件不一定會產生,如果內存中放得下(針對這個文件數據的存放,內存只提供 1M 空間可用),就放在內存中。

* 內存占用達到 80%,開始溢寫,那麽此時 map 任務還在進行,還在往內存裏添加數據,新的數據的起始點(0 點)為剩余空間的中間部分,然後數據區和元數據區分別往兩邊遞增即可,溢寫後釋放內存後也不必改變什麽,繼續寫入即可。

** map merge

map 融合階段,將溢寫階段產生的多個文件,根據所屬分區,把具有相同 partition 分區的 “元數據(從 spill10.out.index 這樣的文件中讀取的)” 放置於同一個 segment 列表中,最後根據 segment 列表,把數據從 spill 溢寫出來的文件一個一個中讀取出來,寫入到 file.out 文件中,同時將這一批段的數據索引(元數據分區等)寫入到 file.out.index 文件中,最終生成兩個文件,file.out 和 file.out.index,其中包含了多段數據,每段數據對應一個分區。

** compress (可選)

map 壓縮階段,對 map merge 階段產生的文件進行壓縮處理,以便於在後邊的網絡傳輸過程中減少網絡 IO 壓力,提升效率。

至此,map 端的 shuffle 過程結束。

** sort merge

reduce 任務會根據分區數據段拉取每個 map 任務產生的數據,拉取後,因為可能涉及到多個 map 產生的數據,所以要進行排序,一邊 copy 一邊排序,最後把多個 map 產生的具有相同分區的數據合並為一個分區數據段,這個 merge 過程和 map 的 merge 算法過程一樣。

在此完成 shuffle 階段


** reduce

對於本例而言,此時產生的某個分區中的某個單詞形式大概如下:

<cat, [1, 1, 1, 1, 1, 1]>,在調用 reduce 方法時,進行 values 各個元素的疊加操作即可。

** output

統計完成後,輸出數據到文件目錄,文件格式為 part-r-00000 這樣形式的文件,存放於 HDFS 中。文件中 key 和 value 默認的分隔符為:\t

** Hadoop 集群部署

之前我們在 yarn 框架中運行 mapreduce 任務,或者操作 hdfs,其中的各種節點都是運行在一臺虛擬機上的,現在我們要將 hadoop 部署在一個多臺虛擬機構成的完全分布式集群中(全部都在一個機器節點上的叫做偽分布式,比如之前的方式)。部署前,我們先勾畫一下各個節點的部署結構,如下圖所示:

技術分享圖片

描述:

3 臺機器共有進程:HDFS 的 datanode,yarn 的 nodemanager

其中,HDFS 的 namenode 開在 z01 這臺機器上,secondarynamenode 開在 z03 這臺機器上

YARN 的 resourcemanager 開在 z02 這臺機器上。

註:SecondaryNameNode 是用來協助 NameNode 整合 fsimage 和 edits 的。

一、準備系統環境

1、修改主機名

# vi /etc/hostname

2、主機名和 ip 地址的映射

# vi /etc/hosts,我的機器修改如圖,註意,三臺機器都要這麽設置:

技術分享圖片

3、關閉防火墻和 selinux

請跳轉至 Linux 基礎 04 查看相關方法。

4、創建普通用戶

# useradd 用戶名,如果已經有普通用戶,則無需再次創建

# echo 666666 | passwd --stdin 用戶名

5、配置靜態 IP 和 DNS

請參看 Linux 基礎 01 內容

6、把後面兩個虛擬機的系統啟動級別改成 “字符模式”(就是沒有桌面,這樣可以減少虛擬機負擔,加速系統啟動和運行)

# cat /etc/inittab,內容如圖所示:

技術分享圖片

根據文件中的提示,可以使用命令:

systemctl set-default multi-user.target,來設置無界面啟動 linux

systemctl set-default graphical.target,來設置有界面啟動 linux

7、卸載服務器 JDK

請參看 Linux 基礎 02 中的內容

二、配置 NTP 時間服務器

對於我們當前這種案例,主要目標是把 z01 這臺服務器設置為時間服務器,剩下的 z02,z03 這兩臺機器同步 z01 的時間,我們需要這樣做的原因是因為,整個集群架構中的時間,要保持一致。

** 檢查當前系統時區,使用命令:

# date -R,如圖:

技術分享圖片

註意這裏,如果顯示的時區不是 + 0800,你可以刪除 localtime 文件夾後,再關聯一個正確時區的鏈接過去,命令如下:

# rm -rf /etc/localtime

# ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

** 同步時間

# ntpdate pool.ntp.org

** 檢查軟件包

查看 ntp 軟件包是否已安裝,使用命令:

# rpm -qa | grep ntp,如圖,紅框中的內容:

技術分享圖片

如果沒有紅框中的內容,則可以使用命令:

# yum -y install ntp,來進行安裝

** 修改 ntp 配置文件

# vi /etc/ntp.conf

去掉下面這行前面的# , 並把網段修改成自己的網段:

restrict 192.168.122.0 mask 255.255.255.0 nomodify notrap

註釋掉以下幾行:

#server 0.centos.pool.ntp.org

#server 1.centos.pool.ntp.org

#server 2.centos.pool.ntp.org

把下面兩行前面的 #號去掉, 如果沒有這兩行內容, 需要手動添加

server 127.127.1.0 # local clock

fudge 127.127.1.0 stratum 10

最後,如圖所示:

技術分享圖片

** 重啟 ntp 服務

# systemctl start ntpd.service,註意,如果是 centOS7 以下的版本,使用命令:service ntpd start

# systemctl enable ntpd.service,註意,如果是 centOS7 以下的版本,使用命令:chkconfig ntpd on

** z03,z03 去同步 z01 這臺時間服務器時間

首先需要關閉這兩臺計算機的 ntp 服務

# systemctl stop ntpd.service,centOS7 以下,則:service ntpd stop

# systemctl disable ntpd.service,centOS7 以下,則:chkconfig ntpd off

# systemctl status ntpd,查看 ntp 服務狀態

# pgrep ntpd,查看 ntp 服務進程 id

同步第一臺服務器 z01 的時間:

# ntpdate z01,如圖:

技術分享圖片

** 制定計劃任務, 周期性同步時間

# crontab -e

*/10 * * * * /usr/sbin/ntpdate z01,如圖所示:

技術分享圖片

重啟定時任務:

# systemctl restart crond.service,centOS7 以下使用:service crond restart,z03 這臺機器的配置同理

三、配置無密鑰登錄

配置 hadoop 集群,首先需要配置集群中的各個主機的 ssh 無密鑰訪問

在 z01 上,通過如下命令,生成一對公私鑰對

$ ssh-keygen -t rsa,一頓回車操作,這條命令執行完畢後(註意使用普通用戶執行該命令),會在 / home/z/.ssh / 目錄下生成兩個文件:id_rsa 和 id_rsa.pub,如圖所示:

技術分享圖片 技術分享圖片

生成之後呢,把 z01 生成的公鑰拷貝給 z01,z02,z03 這三臺機器,對,沒錯,包含當前機器。

$ ssh-copy-id z01

$ ssh-copy-id z02

$ ssh-copy-id z03

完成後,z02 機器如圖(z03 同理):

技術分享圖片

以上完成了 z01 生成私鑰,公鑰並把公鑰拷貝給 z01,z02,z03 三臺機器的過程,z02,z03 這兩臺機器也需要進行如上操作。全部完成後,我們可以在任意一臺機器上,無密鑰的連接到另外一臺機器,比如,我們在 z01 連接到 z02 這臺機器,使用命令:

$ ssh z02,如圖:

技術分享圖片

這樣就成功的在 z01 的機器登錄到 z02 機器了。

四、安裝配置 JDK

使用 root 用戶,在後面兩臺機器上創建 / opt/modules 文件夾,並使該文件夾的所屬改為普通用戶。

接著便可以使用遠程命令 scp,把已經在 z01 中安裝好的 jdk 目錄拷貝給另外兩臺機器。

$ scp -r /opt/modules/jdk1.7.0_67/ z02:/opt/modules/

$ scp -r /opt/modules/jdk1.7.0_67/ z03:/opt/modules/

註意中間有空格分開。配置完成後,記得去 z02,z03 修改 / etc/profile 環境變量

五、安裝配置 Hadoop

** 首先,需要先刪除 z01 中的 / opt/modules/hadoop-2.5.0/data 目錄,執行命令:

$ rm -rf /opt/modules/hadoop-2.5.0/data

** 在如下文件中,修改 JAVA_HOME

hadoop-env.sh yarn-env.sh mapred-env.sh

export JAVA_HOME=/opt/modules/jdk1.8.0_121

** 修改 HDFS 默認地址、HDFS 臨時存儲路徑

涉及文件:core-site.xml

fs.defaultFS:hdfs://z01:8020

hadoop.tmp.dir:/opt/modules/hadoop-2.5.0/data

如圖:

技術分享圖片

** 聲明哪些服務器是 datanode

涉及文件:slaves

z01

z02

z03

如圖:

技術分享圖片

** 修改數據存放的副本數,SecondaryNameNode 節點地址

涉及文件:hdfs-site.xml

dfs.replication:3

dfs.namenode.secondary.http-address:z03:50090

dfs.namenode.http-address:z01:50070

dfs.permissions.enabled:false

如圖:

技術分享圖片

**resourcemanager 節點配置,以及一些其他配置

涉及文件:yarn-site.xml

yarn.resourcemanager.hostname:z02

yarn.nodemanager.aux-services:mapreduce_shuffle

yarn.log-aggregation-enable:true

yarn.log-aggregation.retain-seconds:86400

如圖:

技術分享圖片

** jobhistory 服務以及其他設置

涉及文件:mapred-site.xml

mapreduce.framework.name:yarn

mapreduce.jobhistory.address:z01:10020

mapreduce.jobhistory.webapp.address:z01:19888

如圖:

技術分享圖片

** 配置好後,拷貝 hadoop 安裝目錄給其他服務器

$ rm -rf /opt/modules/hadoop-2.5.0/share/doc/,刪除該文檔目錄,以減少遠程拷貝的體積

$ scp -r /opt/modules/hadoop-2.5.0/ z02:/opt/modules/

$ scp -r/opt/modules/ hadoop-2.5.0/ z03:/opt/modules/

全部搞定後,接下來我們就可以啟動這個分布式系統了

六、啟動 Hadoop

** 在 z01 需要先格式化 hdfs 的 namenode:

$ bin/hdfs namenode -format

** 使用 start 的腳本啟動集群中所有的 hdfs 服務,包含 namenode 和 datanode 節點

$ sbin/start-dfs.sh

** 在 z02 中啟動 yarn 服務,包含 resourcemanager 和 nodemanager,註意,如果 resourcemanger 和 namenode 服務不在同一臺機器上,那麽啟動 resourcemanager 服務必須在所在的機器啟動,這裏參看我們之前設定的集群配置圖,所以需要在 z02 機器上執行如下命令:

$ sbin/start-yarn.sh

啟動完成後,分別查看 z01,z02,z03 機器的 jps,如下圖:

z01:

技術分享圖片

z02:

技術分享圖片

z03:

技術分享圖片

在對比一下之前的集群配置圖,是符合我們的期望的。

** 總結

本節主要深入討論 mapreduce 的運算原理及過程,以及如何配置一個 hadoop 完全分布式集群。


個人微博:http://weibo.com/seal13

QQ 大數據技術交流群(廣告勿入):476966007



作者:Z盡際
鏈接:https://www.jianshu.com/p/0ad52ec23309
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請註明出處。

Hadoop 框架基礎(四)