1. 程式人生 > >Spark自定義類在Driver和Executor端傳輸問題

Spark自定義類在Driver和Executor端傳輸問題

方式一:自定義一個類,並且這個類需要實現Serializable介面

1.首先寫一個class自定義類

class Rules extends Serializable {

  val rulesMap = Map("hadoop" -> 2.7, "spark" -> 2.2)

  //val hostname = InetAddress.getLocalHost.getHostName

  //println(hostname + "@@@@@@@@@@@@@@@@")

}

2.寫一個spark程式

val conf = new SparkConf().setAppName("SerTest")
val sc = new SparkContext(conf)

val lines: RDD[String] = sc.textFile(args(0))

val r = lines.map(word => {
  val rules = new Rules
  val hostname = InetAddress.getLocalHost.getHostName
  val threadName = Thread.currentThread().getName
  (hostname, threadName, rules.rulesMap.getOrElse(word, 0), rules.toString)
})

總結:如果採用這種方式的話,會導致rules物件被多次建立,因為map方法是對RDD中的每一條資料都執行那裡面的演算法的,所以說這種方式會造成資源的浪費。自定義的類要想在運算元中使用的話,就必須將自定義的類進行序列化,也就是基層Serializable介面,不然會拋異常Rules物件應該在運算元外面進行new,如下:

val conf = new SparkConf().setAppName("SerTest")
val sc = new SparkContext(conf)

val lines: RDD[String] = sc.textFile(args(0))
  val rules = new Rules
val r = lines.map(word => {

  val hostname = InetAddress.getLocalHost.getHostName
  val threadName = Thread.currentThread().getName
  (hostname, threadName, rules.rulesMap.getOrElse(word, 0), rules.toString)
})

因為運算元中的變數都會被髮送到Executor端進行執行,所以Driver端new出來的物件會被髮送到Executor端進行執行的,如果Executor端有多個task的話,那麼這個例項就會被分發到多個task中,也就是每個task會儲存一個例項,這樣做會造成資源的浪費,具體細節如下圖:
在這裡插入圖片描述

方式二:自定義一個object物件,這個物件同樣要實現Serializable介面

1.自定義一個object物件,使用object修飾的話,這個類相當於java中的單例物件,只會被初始化一次。

在這裡插入程式碼片object Rules extends Serializable {

  val rulesMap = Map("hadoop" -> 2.7, "spark" -> 2.2)

  val hostname = InetAddress.getLocalHost.getHostName

  println(hostname + "@@@@@@@@@@@@@@@@!!!!")

}

2.例項化這個單例物件

val conf = new SparkConf().setAppName("SerTest")
val sc = new SparkContext(conf)

val lines: RDD[String] = sc.textFile(args(0))
val rules = Rules
val r = lines.map(word => {
  val hostname = InetAddress.getLocalHost.getHostName
  val threadName = Thread.currentThread().getName
  //rules的實際是在Executor中使用的
  (hostname, threadName, rules.rulesMap.getOrElse(word, 0), rules.toString)
})

總結:這裡的Rules的object物件是需要被序列化的,在scala中使用object修飾的類是單例的
在類載入的時候時候就會回被初始化,也就是說,因為這裡rules是在map外面定義的,所以這個類只會被初始化一次,然後再task任務提交的時候,會見這個物件傳送到對應的Executor上,並且每個worker上的Executor上多個task公用一個rules物件,這樣做的好處是在同一個Executor中多個task可以公用一個rules物件。在這裡插入圖片描述

方式三:前兩種方式都是在Driver端例項化這個物件的,在後期程式執行的時候會見這個例項化過的物件隨著task物件的傳送而傳送到Executor端執行,如果資料量特別大的情況下,會造成網路頻寬的消耗,最理想的方式就是直接在Executor端建立 這個例項化的物件,因為scala中object修飾的物件是單例的,所以最好的方式就是在Executor端直接呼叫這個單例物件:這裡不進行序列化的原因是因為,不需要將物件從Driver端傳送到Executor端,而是直接在Executor端生成

1.////第三種方式,希望Rules在EXecutor中被初始化(不走網路了,就不必實現序列化介面)
object Rules  {

  val rulesMap = Map("hadoop" -> 2.7, "spark" -> 2.2)

  val hostname = InetAddress.getLocalHost.getHostName

  println(hostname + "@@@@@@@@@@@@@@@@!!!!")

}

2.編寫spark程式

val conf = new SparkConf().setAppName("SerTest")
val sc = new SparkContext(conf)

val lines: RDD[String] = sc.textFile(args(0))

val r = lines.map(word => {
  //函式的執行是在Executor執行的(Task中執行的)
  val hostname = InetAddress.getLocalHost.getHostName
  val threadName = Thread.currentThread().getName
  (hostname, threadName, Rules.rulesMap.getOrElse(word, 0), Rules.toString)
})

這裡直接呼叫Rules單例物件,然後呼叫裡面的靜態方法,因為單例物件中的方法都是單例的,不需要再進行new obj ,而是直接呼叫的。這樣做的好處就是最終 Rules會隨著Task直接傳送到Executor上,初始化的時候直接在Executor端進行了。可以將規則資訊使用這種方式進行傳送到對應的Executor中。
在這裡插入圖片描述