1. 程式人生 > >拉開大變革序幕(下):分散式計算框架與大資料

拉開大變革序幕(下):分散式計算框架與大資料

因為對大資料處理的需求,使得我們不斷擴充套件計算能力,叢集計算的要求導致分散式計算框架的誕生,用廉價的叢集計算資源在短短的時間內完成以往數週甚至數月的執行等待,有人說誰掌握了龐大的資料,誰就主導了需求。雖然在十幾年間,通過過去幾十年的積澱,誕生了mapreduce,誕生了分散式檔案系統,誕生了霸主級別的Spark,不知道這是不是分散式計算框架的終點,如果還有下一代的處理框架,必然來自更大規模的資料,我想那個量級已經不是今天可以想象的。先研究好當前的,走得越深,看得越深,所以誕生Spark的地方是在AMPLab,而不是一家網際網路電商巨頭。

不可變基礎設施

乾貨連結

我們在本地用Eclipse Scala或者IntelliJ IDEA編寫好Spark程式後,需要對其進行測試,在測試環境下,我們部署好了執行Spark所需的 Software Stack,並特別注意各個Software的版本。那如果有人想用這個程式,在他的環境下,是不同的Software Stack,那麼程式就有可能失敗。假如我們想要在任何機器上不費力氣的部署和執行我們所開發的Spark程式,我們使用Docker將作業系統和Software Stack打成一個映象包,讓這個映象包成為一個不可變單元,那麼在任何機器上我們只要部署好這個映象的示例,所開發的Spark應用程式便可成功執行。

下面的圖出自上面第二個連結,第二個圖就表述了不可變基礎設施,第一個是傳統的DevOps環境。

這裡寫圖片描述

這裡寫圖片描述

Tachyon

Spark on Yarn

下面是阿里雲梯給出的Spark on YARN架構圖

這裡寫圖片描述

“基於YARN的Spark作業首先由客戶端生成作業資訊,提交給ResourceManager,ResourceManager在某一 NodeManager彙報時把AppMaster分配給NodeManager,NodeManager啟動 SparkAppMaster,SparkAppMaster啟動後初始化作業,然後向ResourceManager申請資源,申請到相應資源後 SparkAppMaster通過RPC讓NodeManager啟動相應的SparkExecutor,SparkExecutor向 SparkAppMaster彙報並完成相應的任務。此外,SparkClient會通過AppMaster獲取作業執行狀態。”

1) 配置Hadoop Yarn叢集時出現的問題及修復:

在每一臺機器上(master和各個slave),都要對hadoop-env.sh和yarn-env.sh兩個檔案末尾新增(export)JAVA_HOME這個環境變數(根據具體機器上JAVA_HOME的不同而不同)。
在經過

cd ~/hadoop-2.7.1     #進入hadoop目錄
bin/hadoop namenode -format     #格式化namenode
sbin/start-dfs.sh               #啟動dfs 
sbin/start-yarn.sh              #啟動yarn

之後,登入 http://master:8088,發現有slave節點是unhealthy狀態,再進行一下配置,在每臺機器(master和各個slave)上,修改yarn-site.xml檔案,新增如下:(不推薦!

name=yarn.nodemanager.disk-health-checker.enable
value=false

然後在master上stop-all.sh後,重新啟動叢集:

sbin/start-dfs.sh               #啟動dfs 
sbin/start-yarn.sh              #啟動yarn

就會發現恢復正常。

2) 配置spark的spark-env.sh時

注意master上SPARK_LOCAL_DIRS的值和各個slave上應當一樣,即spark放在各個機器的同一路徑下。

3) 目前來看在REHL 7.1上編譯成的hadoop並不能在SUSE上跑起來

4) 各種slaves檔案中不新增localhost這一項,如果不想讓master機器也成為worker參與叢集運算的話。

Hadoop 編譯

乾貨推薦

【2】安裝JAVA,設定環境變數

以 root 使用者  vi   /etc/profile 或者 vi ~/.bashrc
根據自己路徑在最後新增以下三行
#jdk
export JAVA_HOME=/usr/java/jdk1.7.0_67                               
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib        
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH:$HOME/bin       
新增之後執行   
source  /etc/profile    
或者
source  ~/.bashrc

檢查是否安裝成功 protoc –version

【4】下載原始碼編譯

編譯命令

mvn clean package -Pdist,native -DskipTests -Dtar

Hadoop 編譯出錯

我是在IBM JAVA環境下進行hadoop的編譯。列出編譯過程中的錯誤和解決方法,供大家參考。

1) Antrun

Failed to execute goal
org.apache.maven.plugins:maven-antrun-plugin:1.6:run (create-testdirs)

chown -R username parent-directory
( 如 chown -R root ../ )
mvn install -DskipTests

2) Build failed with JVM IBM JAVA on TestSecureLogins

package com.sun.security.auth.module does not exist

這是專門為在IBM JAVA環境下打的patch。

3) 經過上面兩個fix後如果很快顯示BUILD SUCCESS,並且在(假設下載的原始碼資料夾名為hadoop-release-2.7.1)hadoop-release-2.7.1/hadoop-dist/target/目錄下沒有名為hadoop-2.7.1.tar.gz的tar包,說明沒有編譯成功,返回到hadoop-release-2.7.1這個根目錄下,繼續執行:

mvn package -Pdist -DskipTests -Dtar

這之後編譯的時間明顯變長,各位在這段驚心動魄的時間裡度過吧:)

YARN叢集執行SparkPi出錯

在 yarn-cluster 模式下

 WARN hdfs.DFSClient: DFSOutputStream ResponseProcessor exception  for block BP-xxx:blk_1073741947_1123
java.io.IOException: Bad response ERROR_CHECKSUM for block BP-xxx:blk_1073741947_1123 from datanode xxxxx:50010

Exception in thread “main” java.io.IOException: All datanodes
xxxxx:50010 are bad. Aborting…
at
org.apache.hadoop.hdfs.DFSOutputStream DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1206)
at
org.apache.hadoop.hdfs.DFSOutputStream DataStreamer.processDatanodeError(DFSOutputStream.java:1004)
at
org.apache.hadoop.hdfs.DFSOutputStream DataStreamer.run(DFSOutputStream.java:548)

上述錯誤是因為IBM大型機上大小端的問題,需要一個patch
或者通過組合異構平臺解決(引入x86機器)。不過如果要讓大機作為worker進行運算就要打上patch

執行成功顯示

這裡寫圖片描述

Spark standalone master單節點執行時出錯

如果分配給spark的driver記憶體 SPARK_DRIVER_MEMORY(在spark-env.sh中設定)不足,比如只設置了1G,很有可能出現下面的錯誤,我改成20G後就可以避免。不足的話,GC會做大量的清掃工作,不僅極大的消耗CPU,而且會出現執行失敗。

16/01/24 09:59:36 WARN spark.HeartbeatReceiver: Removing executor driver with no recent heartbeats: 436450 ms exceeds timeout 120000 ms
16/01/24 09:59:40 WARN akka.AkkaRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@3570311d,BlockManagerId(driver, localhost, 54709))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:241)
16/01/24 09:59:40 ERROR scheduler.TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 436450 ms
Exception in thread "main" 16/01/24 10:06:49 INFO storage.BlockManagerMaster: Trying to register BlockManager
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 6, localhost): ExecutorLostFailure (executor driver lost)

Spark對記憶體的消耗主要分為三部分(即取決於你的應用程式的需求):

  1. 資料集中物件的大小
  2. 訪問這些物件的記憶體消耗
  3. 垃圾回收GC的消耗

由網路或者gc引起,worker或executor沒有接收到executor或task的心跳反饋,導致 Executor&Task Lost,這時要提高 spark.network.timeout 的值,根據情況改成300(5min)或更高。

yarn-cluster模式執行出錯

ScalienDB: Designing and Implementing a Distributed Database using Paxos

由於所執行的程式需要第三方jar,而沒有引入導致。
解決:
使用 –jars 和 –archives新增應用程式所依賴的第三方jar包,如

spark-submit --class spark-helloworld --master yarn-cluster --jars scopt.jar spark-helloworld.jar

同時檢查資源分配引數的設定,以防因為資源分配不夠導致執行失敗。

MapReduce:大資料理論基石

如今的阿里飛天系統可以超越一臺超算。超大規模分散式計算浪潮不可阻擋,大資料處理剛剛開始。一些東西在達到一定規模後就會出現前所未有的難題,在理論上也許無法預見,複雜度和不可預知性在增加,技術進步就會出現。

MapReduce程式設計模型由map和reduce兩個核心函式構成。

Map:將原始輸入轉換成一個個key/value的鍵值對形式的描述,同一key下可能有多個value,然後傳給Reduce。

Reduce:將同一key下的value進行合併,讓value的集合縮小。

我們經常在linux上使用grep命令來幫我們找出一個輸出中我們想要的欄位,如果它也變成分散式的話,我們就可以利用多臺機器從龐大的輸出或檔案中找到我們想要的欄位,那麼這個map函式就是將我們想要的欄位利用grep從輸出中找出來,而reduce負責將每臺機器上這樣的grep後的輸出進行彙總就是所有的我們想要的欄位了。

下面是MapReduce論文裡的架構圖:

這裡寫圖片描述

User program會建立很多worker,這些worker中有一個是master,負責將map任務分配給哪些worker,將reduce任務分配給另外一些worker。負責map的worker會讀取原始檔案(已經被分成一份份的)進行map操作,生成的中間鍵值對結果會從記憶體中轉移到硬碟上,在master的通知下,負責reduce的worker就會知道從硬碟的某個地方讀取到這些中間結果從而進行reduce操作生成最終的output。