1. 程式人生 > >spark 叢集執行python作業

spark 叢集執行python作業

今天嘗試用剛搭建好的spark叢集執行python作業,遇到了一些問題,解決了一些坑的同時也對spark叢集的運作和配置方式有了一些比較淺的認識,不像之前那麼沒有概念了,記錄如下,之後還要繼續更多的對Hadoop生態圈和spark平行計算框架的探究。

首先說下環境,叢集有五個節點,叢集環境是用cloudera manager 搭建的,hadoop用的是cloudera的CDH,我對CDH和hadoop之間關係的理解就是與linux和CentOS的關係一樣,其他的的相關元件例如Hbase和Hive也是用cloudera安裝的,之前我看到伺服器上已經有實驗室學長下好的spark parcels安裝包,於是也沒看具體版本就直接用了,之後發現是0.9版本的,略囧。。。因為spark已經發布到1.4版本了,0.9版本都沒有spark-submit,而且也不沒有R的原生API,不過不影響,直接看0.9的文件就可以了,如果必須用到新版的功能就重新部署吧。。。。。

首先記錄一下spark的四種執行模式

  • local:本地單程序模式,用於本地開發測試Spark程式碼
  • standalone:分散式叢集模式,Master-Worker架構,Master負責排程,Worker負責具體Task的執行
  • on yarn/mesos:執行在yarn/mesos等資源管理框架之上,yarn/mesos提供資源管理,spark提供計算排程,並可與其他計算框架(如MapReduce/MPI/Storm)共同執行在同一個叢集之上 (使用cloudera搭建的叢集就是這種情況)
  • on cloud(EC2):執行在AWS的EC2之上。

下面用python的一個簡單作業SimpleApp.py為例,記錄下指令碼的執行過程

from pyspark import SparkContext,SparkConf

conf=SparkConf()
conf.setMaster("spark://192.168.2.241:7077")
conf.setAppName("test application")


logFile="hdfs://hadoop241:8020/user/root/testfile"
sc=SparkContext(conf=conf)
logData=sc.textFile(logFile).cache()


numAs=logData.filter(lambda s: 'a' in s).count
() numBs=logData.filter(lambda s: 'b' in s).count() print "Lines with a:%i,lines with b:%i" % (numAs,numBs)

關於這裡的問題主要涉及到連線叢集的配置問題,也就是上述程式碼的conf部分,首先要連線叢集的master節點,注意這裡的配置寫法

spark://192.168.2.241:7077

字首spark不可少,否則會報“could not parse master URL”的錯誤即無法解析URL的錯誤,至於埠號可以在/etc/spark/conf中查詢$SPARK_MASTER_PORT這個環境變數,(具體安裝方式配置檔案位置也不同,根據具體情況來確定)

還有

logFile=”hdfs://hadoop241:8020/user/root/testfile”

我觀察到這裡預設是從hdfs檔案系統上讀取檔案的,所以首先要把待處理檔案put到hdfs上,同樣注意路徑的寫法,這裡寫的是hdfs上得絕對路徑,也可以寫相對路徑
這裡的testfile裡只有兩句話,用來測試作業能否正確執行

stay hungery,stay foolish
steve jobs

之後執行

$ pyspark SimpleApp.py

執行結果貼出如下,可以從中觀察運算任務的分配排程過程

[[email protected] workspace]# pyspark SimpleApp.py
15/07/31 16:22:27 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/07/31 16:22:27 INFO Remoting: Starting remoting
15/07/31 16:22:27 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:34248]
15/07/31 16:22:27 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:34248]
15/07/31 16:22:27 INFO spark.SparkEnv: Registering BlockManagerMaster
15/07/31 16:22:27 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20150731162227-804b
15/07/31 16:22:27 INFO storage.MemoryStore: MemoryStore started with capacity 294.9 MB.
15/07/31 16:22:27 INFO network.ConnectionManager: Bound socket to port 42522 with id = ConnectionManagerId(hadoop241,42522)
15/07/31 16:22:27 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/07/31 16:22:27 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop241:42522 with 294.9 MB RAM
15/07/31 16:22:27 INFO storage.BlockManagerMaster: Registered BlockManager
15/07/31 16:22:27 INFO spark.HttpServer: Starting HTTP Server
15/07/31 16:22:27 INFO server.Server: jetty-7.6.8.v20121106
15/07/31 16:22:27 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:42944
15/07/31 16:22:27 INFO broadcast.HttpBroadcast: Broadcast server started at http://192.168.2.241:42944
15/07/31 16:22:27 INFO spark.SparkEnv: Registering MapOutputTracker
15/07/31 16:22:27 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-c6a0d067-075c-493b-81c8-754f569a91b5
15/07/31 16:22:27 INFO spark.HttpServer: Starting HTTP Server
15/07/31 16:22:27 INFO server.Server: jetty-7.6.8.v20121106
15/07/31 16:22:27 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:33063
15/07/31 16:22:27 INFO server.Server: jetty-7.6.8.v20121106
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
15/07/31 16:22:27 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/07/31 16:22:27 INFO ui.SparkUI: Started Spark Web UI at http://hadoop241:4040
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Connecting to master spark://192.168.2.241:7077...
15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150731162228-0018
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/0 on worker-20150728175302-hadoop246-7078 (hadoop246:7078) with 16 cores
15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/0 on hostPort hadoop246:7078 with 16 cores, 512.0 MB RAM
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/1 on worker-20150728175303-hadoop245-7078 (hadoop245:7078) with 8 cores
15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/1 on hostPort hadoop245:7078 with 8 cores, 512.0 MB RAM
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/2 on worker-20150728175303-hadoop254-7078 (hadoop254:7078) with 8 cores
15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/2 on hostPort hadoop254:7078 with 8 cores, 512.0 MB RAM
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/3 on worker-20150728175302-hadoop241-7078 (hadoop241:7078) with 8 cores
15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/3 on hostPort hadoop241:7078 with 8 cores, 512.0 MB RAM
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/4 on worker-20150728175302-hadoop217-7078 (hadoop217:7078) with 8 cores
15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/4 on hostPort hadoop217:7078 with 8 cores, 512.0 MB RAM
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/3 is now RUNNING
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/2 is now RUNNING
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/1 is now RUNNING
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/0 is now RUNNING
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/4 is now RUNNING
15/07/31 16:22:28 INFO storage.MemoryStore: ensureFreeSpace(125687) called with curMem=0, maxMem=309225062
15/07/31 16:22:28 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 122.7 KB, free 294.8 MB)
15/07/31 16:22:29 INFO mapred.FileInputFormat: Total input paths to process : 1
15/07/31 16:22:29 INFO spark.SparkContext: Starting job: count at SimpleApp.py:13
15/07/31 16:22:29 INFO scheduler.DAGScheduler: Got job 0 (count at SimpleApp.py:13) with 2 output partitions (allowLocal=false)
15/07/31 16:22:29 INFO scheduler.DAGScheduler: Final stage: Stage 0 (count at SimpleApp.py:13)
15/07/31 16:22:29 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/07/31 16:22:29 INFO scheduler.DAGScheduler: Missing parents: List()
15/07/31 16:22:29 INFO scheduler.DAGScheduler: Submitting Stage 0 (PythonRDD[2] at count at SimpleApp.py:13), which has no missing parents
15/07/31 16:22:29 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (PythonRDD[2] at count at SimpleApp.py:13)
15/07/31 16:22:29 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/07/31 16:22:29 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:48226/user/Executor#-1281030996] with ID 0
15/07/31 16:22:29 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: hadoop246 (PROCESS_LOCAL)
15/07/31 16:22:29 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 3119 bytes in 6 ms
15/07/31 16:22:29 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor 0: hadoop246 (PROCESS_LOCAL)
15/07/31 16:22:29 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 3119 bytes in 1 ms
15/07/31 16:22:29 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:34233/user/Executor#-994522395] with ID 4
15/07/31 16:22:30 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:53345/user/Executor#1663802475] with ID 3
15/07/31 16:22:30 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop246:34291 with 294.9 MB RAM
15/07/31 16:22:30 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop217:35324 with 294.9 MB RAM
15/07/31 16:22:30 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop241:54770 with 294.9 MB RAM
15/07/31 16:22:31 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:49492/user/Executor#-967494826] with ID 2
15/07/31 16:22:31 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:34383/user/Executor#266145334] with ID 1
15/07/31 16:22:31 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop254:52135 with 294.9 MB RAM
15/07/31 16:22:31 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop245:16696 with 294.9 MB RAM
15/07/31 16:22:31 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_1_0 in memory on hadoop246:34291 (size: 208.0 B, free: 294.9 MB)
15/07/31 16:22:31 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_1_1 in memory on hadoop246:34291 (size: 176.0 B, free: 294.9 MB)
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Finished TID 1 in 2128 ms on hadoop246 (progress: 0/2)
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Finished TID 0 in 2142 ms on hadoop246 (progress: 1/2)
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
15/07/31 16:22:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Stage 0 (count at SimpleApp.py:13) finished in 2.476 s
15/07/31 16:22:32 INFO spark.SparkContext: Job finished: count at SimpleApp.py:13, took 2.550761544 s
15/07/31 16:22:32 INFO spark.SparkContext: Starting job: count at SimpleApp.py:14
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Got job 1 (count at SimpleApp.py:14) with 2 output partitions (allowLocal=false)
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Final stage: Stage 1 (count at SimpleApp.py:14)
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Missing parents: List()
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Submitting Stage 1 (PythonRDD[3] at count at SimpleApp.py:14), which has no missing parents
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 1 (PythonRDD[3] at count at SimpleApp.py:14)
15/07/31 16:22:32 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 2 on executor 0: hadoop246 (PROCESS_LOCAL)
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 3121 bytes in 0 ms
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 3 on executor 0: hadoop246 (PROCESS_LOCAL)
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as 3121 bytes in 0 ms
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Finished TID 3 in 27 ms on hadoop246 (progress: 0/2)
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Completed ResultTask(1, 1)
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Finished TID 2 in 32 ms on hadoop246 (progress: 1/2)
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Completed ResultTask(1, 0)
15/07/31 16:22:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 1.0 from pool
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Stage 1 (count at SimpleApp.py:14) finished in 0.034 s
15/07/31 16:22:32 INFO spark.SparkContext: Job finished: count at SimpleApp.py:14, took 0.04234127 s
Lines with a:1,lines with b:1