1. 程式人生 > >Spark如何在一個SparkContext中提交多個任務

Spark如何在一個SparkContext中提交多個任務

在使用spark處理資料的時候,大多數都是提交一個job執行,然後job內部會根據具體的任務,生成task任務,執行在多個程序中,比如讀取的HDFS檔案的資料,spark會載入所有的資料,然後根據block個數生成task數目,多個task執行中不同的程序中,是並行的,如果在同一個程序中一個JVM裡面有多個task,那麼多個task也可以並行,這是常見的使用方式。

考慮下面一種場景,在HDFS上某個目錄下面有10個檔案,我想要同時並行的去統計每個檔案的數量,應該怎麼做? 其實spark是支援在一個spark context中可以通過多執行緒同時提交多個任務執行,然後spark context接到這所有的任務之後,通過中央排程,在來分配執行各個task,最終任務完成程式退出。

下面就來看下如何使用多執行緒提交任務,可以直接使用new Thread來建立執行緒提交,但是不建議這麼做,推薦的做法是通過Executors執行緒池來非同步管理執行緒,尤其是在提交的任務比較多的時候用這個會更加方便。

核心程式碼如下:

def main(args: Array[String]): Unit = {

   val sparkConf=new SparkConf()
   //例項化spark context
   val sc=new SparkContext(sparkConf)
   sparkConf.setAppName("multi task submit ")
   //儲存任務返回值
val list=new util.ArrayList[Future[String]]() //並行任務讀取的path val task_paths=new util.ArrayList[String]() task_paths.add("/tmp/data/path1/") task_paths.add("/tmp/data/path2/") task_paths.add("/tmp/data/path3/") //執行緒數等於path的數量 val nums_threads=task_paths.size() //構建執行緒池 val executors=Executors.newFixedThreadPool(nums_threads) for
(i<-0 until nums_threads){ val task= executors.submit(new Callable[String] { override def call(): String ={ val count=sc.textFile(task_paths.get(i)).count()//獲取統計檔案數量 return task_paths.get(i)+" 檔案數量: "+count } }) list.add(task)//新增集合裡面 } //遍歷獲取結果 list.asScala.foreach(result=>{ log.info(result.get()) }) //停止spark sc.stop() }

可以看到使用scala寫的程式碼比較精簡,這樣就完成了一個並行task提交的spark任務,最後我們打包完畢後,上傳到linux上進行提交,命令如下:

/opt/bigdata/spark/bin/spark-submit  
--class  MultiTaskSubmit  
--master yarn  
--deploy-mode cluster 
--executor-cores 3 
--driver-memory 1g 
--executor-memory 1g
--num-executors 10
--jars  $jars   task.jar

最後需要注意一點,線上程裡面呼叫的方法如果包含一些全域性載入的屬性,最好放線上程的成員變數裡面進行初始化,否則多個執行緒去更改全域性屬性,有可能會造成一些未知的問題。