大資料技術學習筆記之Hadoop框架基礎4-MapReduceshuffer過程詳解及zookeeper框架學習
阿新 • • 發佈:2018-12-07
一、MapReduce Shuffle
-》MapReduce執行五個階段
input
fileinputformat.setinputpaths(job,new Path(args[0]))
預設:輸入key是行的偏移量,value是行的內容
job.setinputFormatClass(Textinputformat.class)
map:分片處理,實現map方法
hadoop 1
hadoop 1
hive 1
spark 1
hbase 1
shuffle :
分割槽
排序
分組
hadoop ,{1,1}
hbase ,{1}
hive,{1}
spark,{1}
reduce:合併處理,實現reduce方法
hadoop 2
hbase 1
hive 1
spark 1
output:同input
-》shuffle的功能:
-》分割槽:決定當前的key交給哪個reduce進行處理
預設:按照key的hash值取餘的方式進去區分
reduce的個數=結果檔案的個數
-》排序:預設按照key的字典順序進行排序
-》分組:預設按照key進行分割槽
-》流程
-》input
256M檔案 =》 block1、block2 =》 keyvalue =》 split1,split2
-》map
split1 => map task1 => 呼叫map方法 =》 輸出當前的keyvalue
split2 => map task2
-》shuffle過程
-》第一種版本:
-》map端的shuffle
-》緩衝區:map的輸出會進入一個環形緩衝區100M(記憶體)
-》分割槽:根據hash值對reduce的個數取餘,為keyvalue打標籤
-》對每一條keyvalue進行分割槽
hadoop 1 --- reduce1
hive 1 --- reduce2
hbase 1 --- reduce1
hadoop 1 --- reduce1
spark 1 --- reduce2
-》對每個分割槽的資料會進行排序
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hbase 1 --- reduce1
hive 1 --- reduce2
spark 1 --- reduce2
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hbase 1 --- reduce1
hive 1 --- reduce2
spark 1 --- reduce2
-》溢寫:當環形緩衝區儲存達到80%,開始將緩衝區中的資料溢寫
到磁碟,變成多個小檔案
-》合併:將每個小檔案合併到一起
-》對每個分割槽的資料進行排序
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hbase 1 --- reduce1
hbase 1 --- reduce1
hive 1 --- reduce2
hive 1 --- reduce2
spark 1 --- reduce2
spark 1 --- reduce2
-》reduce端的shuffle
-》reduce1和reduce2分別到每個map shuffle的輸出中去取
取屬於自己分割槽的資料
-》reduce1到map task1和map task2中取屬於自己分割槽的資料
map task1 shuffle :
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hbase 1 --- reduce1
hbase 1 --- reduce1
map task2 shuffle :
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hbase 1 --- reduce1
hbase 1 --- reduce1
-》合併
-》將獲取到的所有資料進行合併
-》對所有資料進行排序
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hbase 1 --- reduce1
hbase 1 --- reduce1
hbase 1 --- reduce1
hbase 1 --- reduce1
-》分組:
hadoop ,{1,1,1,1,1,1,1,1}
hbase ,{1,1,1,1}
-》第二種版本
-》spill(溢寫)
-》分割槽
-》排序
-》spill
-》merge(合併)
-》合併
-》排序
-》merge(合併)
-》合併
-》排序
-》分組
-》reduce
-》output
-》程式中設定shuffle過程
//shuffle
//partition
job.setPartitionerClass(null);
//sort
job.setSortComparatorClass(null);
//group
job.setGroupingComparatorClass(null);
-》shuffle的優化
-》combiner:相當於Mapper端的聚合reduce
一般情況下,combiner的類與reduce的類是同一個類
不是所有的MapReduce程式都可以使用、
map:
hadoop 1
hadoop 1
hive 1
hbase 1
hive 1
shuffle:
hadoop {1,1}
hbase {1}
hive {1,1}
reduce:
hadoop 2
hbase 1
hive 2
map:
hadoop 1
hadoop 1
hive 1
hbase 1
hive 1
一個maptask執行完的shuffle:combiner
hadoop {2}
hbase {1}
hive {2}
整個shuffle結束
hadoop {2,2,1} => {1,1,1,1,1}
hbase {1,2}
hive {2,3}
reduce:
hadoop 5
hbase 3
hive 5
//set combiner class
job.setCombinerClass(null);
一般情況下,combiner的類與reduce的類是同一個類
當map的輸出型別與reduce的輸出型別一致時,可以使用
map 輸出 = combiner 的輸入
combiner = reduce
combiner輸出 = reduce的輸入
-》compress:壓縮
-》map處理資料交給shuffle
map端shuffle處理以後的資料可以配置壓縮
mapreduce.map.output.compress=true
mapreduce.map.output.compress.codec
-》reduce讀取資料處理以後儲存輸出也可以配置壓縮
輸出結果壓縮
mapreduce.output.fileoutputformat.compress=true
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DefaultCodec:不壓縮
org.apache.hadoop.io.compress.SnappyCodec:Snappy壓縮
org.apache.hadoop.io.compress.lz4Codec:lz4壓縮
-》程式碼中設定壓縮
conf.set("mapreduce.output.fileoutputformat.compress","true");
conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
-》job處理任務執行過程
-》一般DataNode與NodeManager在同一臺機器
-》處理任務時,優先處理自己機器上的資料塊
-》hadoop執行時配置
-》程式碼開發中:configuration物件中
conf.set("fs.defaultFS","hdfs://hostname:8020");
-》default配置檔案:hadoop的預設配置檔案
core-default.xml
hdfs-default.xml
mapred-default.xml
yarn-default.xml
-》site配置檔案:使用者自定義配置檔案
core-site.xml
hdfs-site.xml
mapred-site.xml
yarn-site.xml
-》整個hadoop在執行過程中:
-》載入所有預設配置選項
-》載入使用者自定義配置
使用者自定義配置替換預設配置
三、分散式叢集部署
-》虛擬機器:三臺
-》記憶體:8G
-》主節點:2.5G
-》從節點:1.5G
-》叢集劃分:
NameNode,DataNode,ResourceManager,NodeManager,JobhistoryServer
Node1:DataNode NodeManager NameNode ResourceManager JobhistoryServer
Node2:DataNode NodeManager
Node3:DataNode NodeManager
-》部署過程
-》Linux部署
-》每臺網絡:防火牆、selinux、ip
Node1:192.168.134.221
Node2:192.168.134.222
Node3:192.168.134.223
-》主機名的修改
-》臨時修改:hostname bigdata-training02.hpsk.com
-》永久修改:vim /etc/sysconfig/network
-》每臺本地域名解析/etc/hosts
192.168.134.221 bigdata-training01.hpsk.com
192.168.134.222 bigdata-training02.hpsk.com
192.168.134.223 bigdata-training03.hpsk.com
-》免金鑰登入:每臺機器之間可以互相登入
ssh-keygen -t rsa
ssh-copy-id bigdata-training01.hpsk.com
ssh-copy-id bigdata-training02.hpsk.com
ssh-copy-id bigdata-training03.hpsk.com
-》ntp時間同步
-》配置ntp:有延遲,十分鐘左右才開始同步成功
server機器:Node1 -》 時鐘伺服器進行同步
Node1:
sudo vim /etc/ntp.conf
--允許誰同步我
restrict 192.168.134.0 mask 255.255. 255.0 nomodify notrap
--我跟誰同步
server 202.112.10.36
--取消註釋
server 127.127.1.0 # local clock
fudge 127.127.1.0 stratum 10
-》啟動ntp服務
sudo service ntpd start
sudo chkconfig ntpd on
-》檢視同步狀態
ntpstat
client機器:Node2,Node3
server 192.168.134.221
-》第一次手動同步
ntpdate –u 192.168.134.221
-》啟動服務
sudo service ntpd start
sudo chkconfig ntpd on
-》準備方法:
sudo date -s "2018-03-20 16:07:00"
sudo vim /etc/ntp.conf
-》jdk
-》Hadoop部署
-》在1上面解壓配置hadoop
-》env:JAVA_HOME
-》core-site
hdfs地址
hadoop臨時目錄
-》hdfs-site
副本數 3
許可權關閉
-》mapred-site
執行框架在yarn
jobhistoryserver
-》yarn-site
mapreduce執行方式
resourcemanager的地址
日誌聚集
-》slaves
-》將hadoop分發到其他兩臺機器
1 to 2 :
scp -r hadoop-2.7.3 [email protected]:/opt/modules/
3 from 1:
scp -r [email protected]:/opt/modules/hadoop-2.7.3 /opt/modules/
-》在1主節點上格式化
bin/hdfs namenode -format
-》啟動測試
-》MapReduce執行五個階段
input
fileinputformat.setinputpaths(job,new Path(args[0]))
預設:輸入key是行的偏移量,value是行的內容
job.setinputFormatClass(Textinputformat.class)
map:分片處理,實現map方法
hadoop 1
hadoop 1
hive 1
spark 1
hbase 1
shuffle :
分割槽
排序
分組
hadoop ,{1,1}
hbase ,{1}
hive,{1}
spark,{1}
reduce:合併處理,實現reduce方法
hadoop 2
hbase 1
hive 1
spark 1
output:同input
-》shuffle的功能:
-》分割槽:決定當前的key交給哪個reduce進行處理
預設:按照key的hash值取餘的方式進去區分
reduce的個數=結果檔案的個數
-》排序:預設按照key的字典順序進行排序
-》分組:預設按照key進行分割槽
-》流程
-》input
256M檔案 =》 block1、block2 =》 keyvalue =》 split1,split2
-》map
split1 => map task1 => 呼叫map方法 =》 輸出當前的keyvalue
split2 => map task2
-》shuffle過程
-》第一種版本:
-》map端的shuffle
-》緩衝區:map的輸出會進入一個環形緩衝區100M(記憶體)
-》分割槽:根據hash值對reduce的個數取餘,為keyvalue打標籤
-》對每一條keyvalue進行分割槽
hadoop 1 --- reduce1
hive 1 --- reduce2
hbase 1 --- reduce1
hadoop 1 --- reduce1
spark 1 --- reduce2
-》對每個分割槽的資料會進行排序
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hbase 1 --- reduce1
hive 1 --- reduce2
spark 1 --- reduce2
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hbase 1 --- reduce1
hive 1 --- reduce2
spark 1 --- reduce2
-》溢寫:當環形緩衝區儲存達到80%,開始將緩衝區中的資料溢寫
到磁碟,變成多個小檔案
-》合併:將每個小檔案合併到一起
-》對每個分割槽的資料進行排序
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hbase 1 --- reduce1
hbase 1 --- reduce1
hive 1 --- reduce2
hive 1 --- reduce2
spark 1 --- reduce2
spark 1 --- reduce2
-》reduce端的shuffle
-》reduce1和reduce2分別到每個map shuffle的輸出中去取
取屬於自己分割槽的資料
-》reduce1到map task1和map task2中取屬於自己分割槽的資料
map task1 shuffle :
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hbase 1 --- reduce1
hbase 1 --- reduce1
map task2 shuffle :
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hbase 1 --- reduce1
hbase 1 --- reduce1
-》合併
-》將獲取到的所有資料進行合併
-》對所有資料進行排序
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hadoop 1 --- reduce1
hbase 1 --- reduce1
hbase 1 --- reduce1
hbase 1 --- reduce1
hbase 1 --- reduce1
-》分組:
hadoop ,{1,1,1,1,1,1,1,1}
hbase ,{1,1,1,1}
-》第二種版本
-》spill(溢寫)
-》分割槽
-》排序
-》spill
-》merge(合併)
-》合併
-》排序
-》merge(合併)
-》合併
-》排序
-》分組
-》reduce
-》output
-》程式中設定shuffle過程
//shuffle
//partition
job.setPartitionerClass(null);
//sort
job.setSortComparatorClass(null);
//group
job.setGroupingComparatorClass(null);
-》shuffle的優化
-》combiner:相當於Mapper端的聚合reduce
一般情況下,combiner的類與reduce的類是同一個類
不是所有的MapReduce程式都可以使用、
map:
hadoop 1
hadoop 1
hive 1
hbase 1
hive 1
shuffle:
hadoop {1,1}
hbase {1}
hive {1,1}
reduce:
hadoop 2
hbase 1
hive 2
map:
hadoop 1
hadoop 1
hive 1
hbase 1
hive 1
一個maptask執行完的shuffle:combiner
hadoop {2}
hbase {1}
hive {2}
整個shuffle結束
hadoop {2,2,1} => {1,1,1,1,1}
hbase {1,2}
hive {2,3}
reduce:
hadoop 5
hbase 3
hive 5
//set combiner class
job.setCombinerClass(null);
一般情況下,combiner的類與reduce的類是同一個類
當map的輸出型別與reduce的輸出型別一致時,可以使用
map 輸出 = combiner 的輸入
combiner = reduce
combiner輸出 = reduce的輸入
-》compress:壓縮
-》map處理資料交給shuffle
map端shuffle處理以後的資料可以配置壓縮
mapreduce.map.output.compress=true
mapreduce.map.output.compress.codec
-》reduce讀取資料處理以後儲存輸出也可以配置壓縮
輸出結果壓縮
mapreduce.output.fileoutputformat.compress=true
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DefaultCodec:不壓縮
org.apache.hadoop.io.compress.SnappyCodec:Snappy壓縮
org.apache.hadoop.io.compress.lz4Codec:lz4壓縮
-》程式碼中設定壓縮
conf.set("mapreduce.output.fileoutputformat.compress","true");
conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
-》job處理任務執行過程
-》一般DataNode與NodeManager在同一臺機器
-》處理任務時,優先處理自己機器上的資料塊
-》hadoop執行時配置
-》程式碼開發中:configuration物件中
conf.set("fs.defaultFS","hdfs://hostname:8020");
-》default配置檔案:hadoop的預設配置檔案
core-default.xml
hdfs-default.xml
mapred-default.xml
yarn-default.xml
-》site配置檔案:使用者自定義配置檔案
core-site.xml
hdfs-site.xml
mapred-site.xml
yarn-site.xml
-》整個hadoop在執行過程中:
-》載入所有預設配置選項
-》載入使用者自定義配置
使用者自定義配置替換預設配置
三、分散式叢集部署
-》虛擬機器:三臺
-》記憶體:8G
-》主節點:2.5G
-》從節點:1.5G
-》叢集劃分:
NameNode,DataNode,ResourceManager,NodeManager,JobhistoryServer
Node1:DataNode NodeManager NameNode ResourceManager JobhistoryServer
Node2:DataNode NodeManager
Node3:DataNode NodeManager
-》部署過程
-》Linux部署
-》每臺網絡:防火牆、selinux、ip
Node1:192.168.134.221
Node2:192.168.134.222
Node3:192.168.134.223
-》主機名的修改
-》臨時修改:hostname bigdata-training02.hpsk.com
-》永久修改:vim /etc/sysconfig/network
-》每臺本地域名解析/etc/hosts
192.168.134.221 bigdata-training01.hpsk.com
192.168.134.222 bigdata-training02.hpsk.com
192.168.134.223 bigdata-training03.hpsk.com
-》免金鑰登入:每臺機器之間可以互相登入
ssh-keygen -t rsa
ssh-copy-id bigdata-training01.hpsk.com
ssh-copy-id bigdata-training02.hpsk.com
ssh-copy-id bigdata-training03.hpsk.com
-》ntp時間同步
-》配置ntp:有延遲,十分鐘左右才開始同步成功
server機器:Node1 -》 時鐘伺服器進行同步
Node1:
sudo vim /etc/ntp.conf
--允許誰同步我
restrict 192.168.134.0 mask 255.255. 255.0 nomodify notrap
--我跟誰同步
server 202.112.10.36
--取消註釋
server 127.127.1.0 # local clock
fudge 127.127.1.0 stratum 10
-》啟動ntp服務
sudo service ntpd start
sudo chkconfig ntpd on
-》檢視同步狀態
ntpstat
client機器:Node2,Node3
server 192.168.134.221
-》第一次手動同步
ntpdate –u 192.168.134.221
-》啟動服務
sudo service ntpd start
sudo chkconfig ntpd on
-》準備方法:
sudo date -s "2018-03-20 16:07:00"
sudo vim /etc/ntp.conf
-》jdk
-》Hadoop部署
-》在1上面解壓配置hadoop
-》env:JAVA_HOME
-》core-site
hdfs地址
hadoop臨時目錄
-》hdfs-site
副本數 3
許可權關閉
-》mapred-site
執行框架在yarn
jobhistoryserver
-》yarn-site
mapreduce執行方式
resourcemanager的地址
日誌聚集
-》slaves
-》將hadoop分發到其他兩臺機器
1 to 2 :
scp -r hadoop-2.7.3
3 from 1:
scp -r [email protected]:/opt/modules/hadoop-2.7.3 /opt/modules/
-》在1主節點上格式化
bin/hdfs namenode -format
-》啟動測試