1. 程式人生 > >Spark Streaming之foreachRDD效能優化

Spark Streaming之foreachRDD效能優化

首先我們來對官網的描述瞭解一下。

DStream中的foreachRDD是一個非常強大函式,它允許你把資料傳送給外部系統。因為輸出操作實際上是允許外部系統消費轉換後的資料,它們觸發的實際操作是DStream轉換。所以要掌握它,對它要有深入瞭解。下面有一些常用的錯誤需要理解。經常寫資料到外部系統需要建立一個連線的object(eg:根據TCP協議連線到遠端的伺服器,我們連線外部資料庫需要自己的控制代碼)和傳送資料到遠端的系統為此,開發者需要在Spark的driver建立一個object用於連線。

為了達到這個目的,開發人員可能不經意的在Spark驅動中建立一個連線物件,但是在Spark worker中 嘗試呼叫這個連線物件儲存記錄到RDD中,如下

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

這是不正確的,因為這需要先序列化連線物件,然後將它從driver傳送到worker中。這樣的連線物件在機器之間不能

傳送。它可能表現為序列化錯誤(連線物件不可序列化)或者初始化錯誤(連線物件應該 在worker中初始化)等

等。正確的解決辦法是在worker中建立連線物件。

然而,這會造成另外一個常見的錯誤-為每一個記錄建立了一個連線物件。例如:
dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

通常,建立一個連線物件有資源和時間的開支。因此,為每個記錄建立和銷燬連線物件會導致非常高的開支,明顯

的減少系統的整體吞吐量。一個更好的解決辦法是利用rdd.foreachPartition方法。 為RDD的partition建立一個連線對

象,用這個兩件物件傳送partition中的所有記錄。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

最後,可以通過在多個RDD或者批資料間重用連線物件做更進一步的優化。開發者可以保有一個靜態的連線物件

池,重複使用池中的物件將多批次的RDD推送到外部系統,以進一步節省開支

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

需要注意的是,池中的連線物件應該根據需要延遲建立,並且在空閒一段時間後自動超時。這樣就獲取了最有效的

方式發生資料到外部系統。

其它需要注意的地方:

(1)輸出操作通過懶執行的方式操作DStreams,正如RDD action通過懶執行的方式操作RDD。具體地看,RDD 

actions和DStreams輸出操作接收資料的處理。因此,如果你的應用程式沒有任何輸出操作或者 用於輸出操作

dstream.foreachRDD(),但是沒有任何RDD action操作在dstream.foreachRDD()裡面,那麼什麼也不會執行。系統

僅僅會接收輸入,然後丟棄它們。

(2)預設情況下,DStreams輸出操作是分時執行的,它們按照應用程式的定義順序按序執行。

實驗1:把SparkStreaming的內部資料存入Mysql

(1)在mysql中建立一個表用於存放資料

  1. mysql> create database sparkStreaming;  
  2. Query OK, 1 row affected (0.01 sec)  
  3. mysql> use sparkStreaming;  
  4. Database changed  
  5. mysql> show tables;  
  6. Empty set (0.01 sec)  
  7. mysql> create table searchKeyWord(insert_time date,keyword varchar(30),search_count integer);  
  8. Query OK, 0 rows affected (0.05 sec)  

(2)用scala編寫連線Mysql的連線池

  1. import java.sql.Connection  
  2. import java.sql.PreparedStatement  
  3. import java.sql.ResultSet  
  4. import org.apache.commons.dbcp.BasicDataSource  
  5. import org.apache.log4j.Logger  
  6. object scalaConnectPool {  
  7.   val  log = Logger.getLogger(scalaConnectPool.this.getClass)  
  8.   var ds:BasicDataSource = null
  9.   def getDataSource={  
  10.     if(ds == null){  
  11.       ds = new BasicDataSource()  
  12.       ds.setUsername("root")  
  13.       ds.setPassword("iamhaoren")  
  14.       ds.setUrl("jdbc:mysql://localhost:3306/sparkStreaming")  
  15.       ds.setDriverClassName("com.mysql.jdbc.Driver")  
  16.       ds.setInitialSize(20)  
  17.       ds.setMaxActive(100)  
  18.       ds.setMinIdle(50)  
  19.       ds.setMaxIdle(100)  
  20.       ds.setMaxWait(1000)  
  21.       ds.setMinEvictableIdleTimeMillis(5*60*1000)  
  22.       ds.setTimeBetweenEvictionRunsMillis(10*60*1000)  
  23.       ds.setTestOnBorrow(true)  
  24.     }  
  25.     ds  
  26.   }  
  27.   def getConnection : Connection= {  
  28.     var connect:Connection = null
  29.     try {  
  30.       if(ds != null){  
  31.         connect = ds.getConnection  
  32.       }else{  
  33.         connect = getDataSource.getConnection  
  34.       }  
  35.     }  
  36.     connect  
  37.   }  
  38.   def shutDownDataSource: Unit=if (ds !=null){ds.close()}  
  39.   def closeConnection(rs:ResultSet,ps:PreparedStatement,connect:Connection): Unit ={  
  40.     if(rs != null){rs.close}  
  41.     if(ps != null){ps.close}  
  42.     if(connect != null){connect.close}  
  43.   }  
  44. }  

(3)編寫SparkStreaming程式
  1. import org.apache.spark.SparkConf  
  2. import org.apache.spark.streaming.{Seconds, StreamingContext}  
  3. object dataToMySQL {  
  4.   def main(args: Array[String]) {  
  5.     val conf = new SparkConf().setAppName("use the foreachRDD write data to mysql").setMaster("local[2]")  
  6.     val ssc = new StreamingContext(conf,Seconds(10))  
  7.     val streamData = ssc.socketTextStream("master",9999)  
  8.     val wordCount = streamData.map(line =>(line.split(",")(0),1)).reduceByKeyAndWindow(_+_,Seconds(60))  
  9.     val hottestWord = wordCount.transform(itemRDD => {  
  10.       val top3 = itemRDD.map(pair => (pair._2, pair._1))  
  11.         .sortByKey(false).map(pair => (pair._2, pair._1)).take(3)  
  12.       ssc.sparkContext.makeRDD(top3)  
  13.     })  
  14.     hottestWord.foreachRDD( rdd =>{  
  15.       rdd.foreachPartition(partitionOfRecords =>{  
  16.         val connect = scalaConnectPool.getConnection  
  17.         connect.setAutoCommit(false)  
  18.         val stmt = connect.createStatement()  
  19.         partitionOfRecords.foreach(record =>{  
  20.           stmt.addBatch("insert into searchKeyWord (insert_time,keyword,search_count) values (now(),'"+record._1+"','"+record._2+"')")  
  21.         })  
  22.         stmt.executeBatch()  
  23.         connect.commit()  
  24.       }  
  25.       )  
  26.     }  
  27.     )  
  28.     ssc.start()  
  29.     ssc.awaitTermination()  
  30.     ssc.stop()  
  31.   }  
  32. }  
(4)編寫一個socket端的資料模擬器
  1. import java.io.{PrintWriter}    
  2. import java.net.ServerSocket    
  3. import scala.io.Source    
  4. object streamingSimulation {    
  5.   def index(n: Int) = scala.util.Random.nextInt(n)    
  6.   def main(args: Array[String]) {    
  7.     // 呼叫該模擬器需要三個引數,分為為檔案路徑、埠號和間隔時間(單位:毫秒)  
  8.     if (args.length != 3) {    
  9.       System.err.println("Usage: <filename> <port> <millisecond>")    
  10.       System.exit(1)    
  11.     }    
  12.     // 獲取指定檔案總的行數  
  13.     val filename = args(0)    
  14.     val lines = Source.fromFile(filename).getLines.toList    
  15.     val filerow = lines.length    
  16.     // 指定監聽某埠,當外部程式請求時建立連線  
  17.     val listener = new ServerSocket(args(1).toInt)    
  18.     while (true) {    
  19.       val socket = listener.accept()    
  20.       new Thread() {    
  21.         override def run = {    
  22.           println("Got client connected from: " + socket.getInetAddress)    
  23.           val out = new PrintWriter(socket.getOutputStream(), true)    
  24.           while (true) {    
  25.             Thread.sleep(args(2).toLong)    
  26.             // 當該埠接受請求時,隨機獲取某行資料傳送給對方  
  27.             val content = lines(index(filerow))    
  28.             println("-------------------------------------------")    
  29.             println(s"Time: ${System.currentTimeMillis()}")    
  30.             println("-------------------------------------------")    
  31.             println(content)    
  32.             out.write(content + '\n')    
  33.             out.flush()    
  34.           }    
  35.           socket.close()    
  36.         }    
  37.       }.start()    
  38.     }    
  39.   }    
  40. }    
實驗資料為:

spark
Streaming
better
than
storm
you
need
it
yes
do
it

(5)實驗啟動

在客戶端啟動資料流模擬

對socket端的資料模擬器程式進行 jar檔案的打包,並放入到叢集目錄中

啟動程式如下:

  1. java -cp DataSimulation.jar streamingSimulation /root/application/upload/Information 99991000
啟動SparkStreaming程式

結果如下: