1. 程式人生 > >Spark學習(二)---kafka+SparkStreaming的搭建與連線

Spark學習(二)---kafka+SparkStreaming的搭建與連線

環境說明:

三臺機器(Centos 6.5):

Master 192.168.203.148

Slave1 192.168.203.149

Slave2 192.168.203.150

第一步:環境

spark環境配置: spark安裝很簡單,可以參考網上教程,說下spark的配置: 主要是${SPARK_HOME}/conf/slaves中配置如下:
Master
Slave1
Slave2
${SPARK_HOME}/conf/spark-env.sh配置如下:
export JAVA_HOME=/usr/local/java/jdk1.8.0_65
export SCALA_HOME=/usr/local/scala/scala-2.10.5
export SPARK_MASTER_IP=192.168.203.148
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop

zookeeper搭建

kafka搭建:

參照_zhenwei部落格:

http://blog.csdn.net/wang_zhenwei/article/details/48346327

http://blog.csdn.net/wang_zhenwei/article/details/48357131

安裝完成,傳送訊息為:


第二步:jar包下載

由於SparkStreaming讀取kafka是依賴於其他jar包所以還要下載jar包對應分別為:

spark-streaming-kafka_2.11-0.8.2.2.jar

下載地址:http://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11/0.8.2.2

kafka_2.10-0.8.1.jar,

metrics-core-2.2.0.jar,

zkclient-0.4.jar
(下面的這三個在上面網址中搜索可以找到)

第三步:建立scala程式:

建立myKafkaStreaming.scala:

import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils


val ssc = new StreamingContext(sc,Seconds(2))
val zkQuorum = "Slave3:2181,Slave1:2181,Master:2181"
val group = "test-consumer-group"
val topics = "kafkatopic"
val numThreads = 1
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map(x => (x,1))
val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(2),2)
wordCounts.print
ssc.checkpoint("/words/checkpoint")
ssc.start
ssc.awaitTermination


第四步:執行程式

兩種方法:

第一種將上面的程式通過spark-shell形式提交執行;

${SPARK_HOME}/bin/spark-shell --jars kafka_2.10-0.8.2.1.jar,metrics-core-2.2.0.jar,spark-streaming-kafka_2.10-1.5.0.jar,zkclient-0.4.jar --master  spark://192.168.203.148:7077

說明:我將對應的jar包拷到了${SPARK_HOME}中,--master這需要注意下 後面跟的ip是需要通過WebUi中檢視到的:如左上角:spark://192.168.203.148:7077,如果寫的不對可能會提示如下錯誤cluster.ClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory


上圖中可以看到我已經提交的app任務,但是slave1節點掉了,可以看到任務執行ok了。

第二種方法:把上面的檔案打成jar包通過spark-submit來進行提交,由於一直嘗試用sbt但一直下載不成功所以暫時還沒試,明天試試,或者換用maven來用。

四、新技能

如果想在spark-shell互動環境中一下子輸入很多條語句在一起執行而不是一條一條輸入。

scala> :paste
//put your code

//Then if you finish,ctrl+d組合鍵進行執行上面輸入的所有程式碼

五、遇到的其他錯誤:

spark在提交任務時,出現如下錯誤:

15/03/26 22:29:36 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/03/26 22:29:51 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/03/26 22:30:06 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/03/26 22:30:21 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

從警告資訊上看,初始化job時沒有獲取到任何資源;提示檢查叢集,確保workers可以被註冊並有足夠的記憶體資源。

如上問題產生的原因是多方面的,可能原因如下:

1.因為提交任務的節點不能和spark工作節點互動,因為提交完任務後提交任務節點上會起一個程序,展示任務進度,大多埠為4044,工作節點需要反饋進度給該該埠,所以如果主機名或者IP在hosts中配置不正確。所以檢查下主機名和ip是否配置正確

2.有可能是記憶體不足

  檢查記憶體

   conf.set("spark.executor.memory", "3000m")

  Make sure to set SPARK_LOCAL_IP andSPARK_MASTER_IP.

  檢視8080埠,確保一些workers保持Alive狀態,確保 some cores 是可利用的。

最後自己通過WebUi來檢視,竟然是所有節點都掉了,為什麼會這樣?待研究,於是重啟了下叢集Make it!

六、Spark配置說明

參照:http://blog.csdn.net/chenxingzhen001/article/details/11835399

一、環境變數spark-env.sh配置項
SCALA_HOME              #指向你的scala安裝路徑
MESOS_NATIVE_LIBRARY   #如果你要在Mesos上執行叢集的話
SPARK_WORKER_MEMORY  #作業可使用的記憶體容量,預設格式1000M或者 2G (預設:  所有RAM去掉給作業系統用的1 GB);每個作業獨立的記憶體空間由SPARK_MEM決定。
SPARK_JAVA_OPTS   #新增JVM選項。你可以通過-D來獲取任何系統屬性 eg: SPARK_JAVA_OPTS+="-Dspark.kryoserializer.buffer.mb=1024"
SPARK_MEM     #設定每個節點所能使用的記憶體總量。他們應該和JVM‘s -Xmx選項的格式保持一致(e.g.300m或1g)。注意:這個選項將很快被棄用支援系統屬性spark.executor.memory,所以我們推薦將它使用在新程式碼中。
SPARK_DAEMON_MEMORY   #分配給Spark master和worker守護程序的記憶體空間(預設512M)
SPARK_DAEMON_JAVA_OPTS  #Spark master和worker守護程序的JVM選項(預設:none)
二、System Properties
Property NameDefaultMeaning
spark.executor.memory512mAmount of memory to use per executor process, in the same format as JVM memory strings (e.g. `512m`, `2g`).
spark.akka.frameSize10mMaximum message size to allow in "control plane" communication (for serialized tasks and task results), in MB. Increase this if your tasks need to send back large results to the driver (e.g. using collect() on a large dataset).
spark.default.parallelism8Default number of tasks to use for distributed shuffle operations (groupByKey, reduceByKey, etc) when not set by user. 
  
spark.akka.frameSize: 控制Spark中通訊訊息的最大容量 (如 task 的輸出結果),預設為10M。當處理大資料時,task 的輸出可能會大於這個值,需要根據實際資料設定一個更高的值。如果是這個值不夠大而產生的錯誤,可以從 worker的日誌 中進行排查。通常 worker 上的任務失敗後,master 的執行日誌上出現”Lost TID: “的提示,可通過檢視失敗的 worker 的日誌檔案($SPARK_HOME/worker/下面的log檔案) 中記錄的任務的 Serialized size of result 是否超過10M來確定。
spark.default.parallelism: 控制Spark中的分散式shuffle過程預設使用的task數量,預設為8個。如果不做調整,資料量大時,就容易執行時間很長,甚至是出Exception,因為8個task無法handle那麼多的資料。 注意這個值也不是說設定得越大越好。
spark.local.dir:Spark 執行時的臨時目錄,例如 map 的輸出檔案,儲存在磁碟的 RDD 等都儲存在這裡。預設是 /tmp 這個目錄,而一開始我們搭建的小叢集上 /tmp 這個目錄的空間只有2G,大資料量跑起來就出 Exception (”No space left on device”)了。