1. 程式人生 > >Spark效能優化:基於分割槽進行操作

Spark效能優化:基於分割槽進行操作

前言(摘自Spark快速大資料分析)

基於分割槽對資料進行操作可以讓我們避免為每個資料元素進行重複的配置工作。諸如開啟資料庫連線或建立隨機數生成器等操作,都是我們應當儘量避免為每個元素都配置一次的工作。Spark 提供基於分割槽的map 和foreach,讓你的部分程式碼只對RDD 的每個分割槽執行一次,這樣可以幫助降低這些操作的代價。
當基於分割槽操作RDD 時,Spark 會為函式提供該分割槽中的元素的迭代器。返回值方面,也返回一個迭代器。除mapPartitions() 外,Spark 還有一些別的基於分割槽的操作符,見下表:

函式名 呼叫所提供的 返回的 對於RDD[T]的函式簽名
mapPartitions() 該分割槽中元素的迭代器 返回的元素的迭代器 f: (Iterator[T]) → Iterator[U]
mapPartitionsWithIndex() 分割槽序號,以及每個分割槽中的元素的迭代器 返回的元素的迭代器 f: (Int, Iterator[T]) → Iterator[U]
foreachPartitions() 元素迭代器 f: (Iterator[T]) → Unit

首先給出上面三個運算元的具體程式碼示例。

1、mapPartitions

與map類似,不同點是map是對RDD的裡的每一個元素進行操作,而mapPartitions是對每一個分割槽的資料(迭代器)進行操作,具體可以看上面的表格。
下面同時用map和mapPartitions實現WordCount,看一下mapPartitions的用法以及與map的區別

package com.dkl.leanring.spark.test

import org.apache.spark.sql.SparkSession

object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("WordCount").getOrCreate()
    val sc = spark.sparkContext

    val input = sc.parallelize(Seq("Spark Hive Kafka"
, "Hadoop Kafka Hive Hbase", "Java Scala Spark")) val words = input.flatMap(line => line.split(" ")) val counts = words.map(word => (word, 1)).reduceByKey { (x, y) => x + y } println(counts.collect().mkString(",")) val counts1 = words.mapPartitions(it => it.map(word => (word, 1))).reduceByKey { (x, y) => x + y } println(counts1.collect().mkString(",")) spark.stop() } }

2、mapPartitionsWithIndex

和mapPartitions一樣,只是多了一個分割槽的序號,下面的程式碼實現了將Rdd的元素數字n變為(分割槽序號,n*n)

val rdd = sc.parallelize(1 to 10, 5)
val res = rdd.mapPartitionsWithIndex((index, it) => {
  it.map(n => (index, n * n))
})
println(res.collect().mkString(" "))

3、foreachPartitions

foreachPartitions和foreach類似,不同點也是foreachPartitions基於分割槽進行操作的

rdd.foreachPartition(it => it.foreach(println))

4、關於如何避免重複配置

下面以開啟資料庫連線舉例,需求是這樣的:
讀取mysql表裡的資料,做了一系列資料處理得到結果之後,需要修改我們mysql表裡的每一條資料的狀態,代表程式已經處理過了,下次不需要處理了。

4.1 表

以最簡單表結構示例

欄位名 註釋
ID 主鍵、唯一標識
ISDEAL 程式是否處理過

建表語句

CREATE TABLE test (
    id INTEGER NOT NULL AUTO_INCREMENT,
    isdeal INTEGER DEFAULT 0 NOT NULL,
    primary key(id) 
)
ENGINE=InnoDB
DEFAULT CHARSET=utf8
COLLATE=utf8_general_ci;

4.2 不基於分割槽操作

一共用兩種方法

4.2.1 第一種

package com.dkl.leanring.spark.sql.mysql

import org.apache.spark.sql.SparkSession

object UpdateMysqlDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("UpdateMysqlDemo").master("local").getOrCreate()

    val database_url = "jdbc:mysql://192.168.44.128:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    val user = "root"
    val password = "Root-123456"
    val df = spark.read
      .format("jdbc")
      .option("url", database_url)
      .option("dbtable", "(select * from test where isDeal=0 limit 5)a")
      .option("user", user)
      .option("password", password)
      .option("driver", "com.mysql.jdbc.Driver")
      .option("numPartitions", "5")
      .option("partitionColumn", "ID")
      .option("lowerBound", "1")
      .option("upperBound", "10")
      .load()

    import java.sql.{ Connection, DriverManager, ResultSet };
    df.rdd.foreach(row => {
      val conn = DriverManager.getConnection(database_url, user, password)
      try {
        // Configure to be Read Only
        val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
        val prep = conn.prepareStatement(s"update test set isDeal=1 where id=?")

        val id = row.getAs[Int]("id")
        prep.setInt(1, id)
        prep.executeUpdate

      } catch {
        case e: Exception => e.printStackTrace
      } finally {
        conn.close()
      }

    })

    spark.stop()
  }
}

  • 上面的程式碼,取isDeal=0的前五條,因為造的資料量少,所以只取了前五條,然後指定了五個分割槽,這裡只是一個程式碼示例,實際工作中應該資料量很大,每個分割槽肯定不止一條資料

根據上面的程式碼,看到用這種方式的缺點是每一個元素都要建立一個數據庫連線,這樣頻繁建立連線、關閉連線,在資料量很大的情況下,勢必會對效能產生影響,但是優點是不用擔心記憶體不夠。

4.2.2 第二種

val conn = DriverManager.getConnection(database_url, user, password)
try {
  val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
  val prep = conn.prepareStatement(s"update test set isDeal=1 where id=?")

  df.select("id").collect().foreach(row => {
    val id = row.getAs[Int]("id")
    prep.setInt(1, id)
    prep.executeUpdate

 })

} catch {
  case e: Exception => e.printStackTrace
}

這種方式的缺點是把要操作的資料全部轉成scala陣列,僅在Driver端執行,但是如果資料量很大的話,可能因為Driver記憶體不夠大而丟擲異常,優點是隻建立一次資料庫連線,在資料量不是特別大,且確定Driver的記憶體足夠的時候,可以採取這種方式。

4.3 基於分割槽的方式

df.rdd.foreachPartition(it => {
  val conn = DriverManager.getConnection(database_url, user, password)
  try {
    val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
    val prep = conn.prepareStatement(s"update test set isDeal=1 where id=?")
    it.foreach(row => {
      val id = row.getAs[Int]("id")
      prep.setInt(1, id)
      prep.executeUpdate
    })

  } catch {
    case e: Exception => e.printStackTrace
  } finally {
    conn.close()
  }

})

這種方式就結合了上面兩種方式的優點,基於分割槽的方式使得建立連線的次數不會那麼多,然後每個分割槽的資料也可以平均分到每個節點的executor上,避免了記憶體不足產生的異常,當然前提是要合理的分配分割槽數,既不能讓分割槽數太多,也不能讓每個分割槽的資料太多,還有要注意資料傾斜的問題,因為當資料傾斜造成某個分割槽資料量太大同樣造成OOM(記憶體溢位)。

4.4 其他

上面只是列舉了一個例子,且只是在foreach這樣的action運算元裡體現的,當然肯定也有需求需是在transformation裡進行如資料庫的連線這樣的操作,大家可類比的使用mapPartitions即可

5、其他優點(未證實)

網上有很多部落格提到mapPartitions還有其他優點,就是mapPartitions比map快,效能高,原因是因為map的function會執行rdd.count次,而mapPartitions的function則執行rdd.numPartitions次。
但我並這麼認為,因mapPartitions的function和map的function是不一樣的,mapPartitions裡的迭代器的每個元素還是都要執行一遍的,實際上也是執行rdd.count次。
下面以其中一篇部落格舉例(只列出優點,大部分部落格上的寫的都一樣的,應該出自同一篇部落格吧~)

部落格地址:Spark—運算元調優之MapPartitions提升Map類操作效能

  • 至於mapPartitions是否真的比map處理速度快,如果我有時間驗證得到結果的話,我再更新一下這個地方~

相關閱讀