spark中map與mapPartitions區別
阿新 • • 發佈:2018-10-25
part print map spark 偽代碼 一個 數據 最終 partition
在spark中,map與mapPartitions兩個函數都是比較常用,這裏使用代碼來解釋一下兩者區別
import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer object MapAndPartitions { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("map_mapPartitions_demo").setMaster("local")) val arrayRDD=sc.parallelize(Array(1,2,3,4,5,6,7,8,9)) //map函數每次處理一個/行數據 arrayRDD.map(element=>{ element }).foreach(println) //mapPartitions每次處理一批數據 //將 arrayRDD分成x批數據進行處理 //elements是其中一批數據 //mapPartitions返回一批數據(iterator) arrayRDD.mapPartitions(elements=>{ var result = newArrayBuffer[Int]() elements.foreach(element=>{ result.+=(element) }) result.iterator }).foreach(println) } }
兩個函數最終處理得到的結果是一樣的
mapPartitions比較適合需要分批處理數據的情況,比如將數據插入某個表,每批數據只需要開啟一次數據庫連接,大大減少了連接開支,偽代碼如下:
arrayRDD.mapPartitions(datas=>{ dbConnect = getDbConnect() //獲取數據庫連接 datas.foreach(data=>{ dbConnect.insert(data) //循環插入數據 }) dbConnect.commit() //提交數據庫事務 dbConnect.close() //關閉數據庫連接 })
spark中map與mapPartitions區別