Spark Streaming之foreachRDD效能優化
首先我們來對官網的描述瞭解一下。
DStream中的foreachRDD是一個非常強大函式,它允許你把資料傳送給外部系統。因為輸出操作實際上是允許外部系統消費轉換後的資料,它們觸發的實際操作是DStream轉換。所以要掌握它,對它要有深入瞭解。下面有一些常用的錯誤需要理解。經常寫資料到外部系統需要建立一個連線的object(eg:根據TCP協議連線到遠端的伺服器,我們連線外部資料庫需要自己的控制代碼)和傳送資料到遠端的系統為此,開發者需要在Spark的driver建立一個object用於連線。
為了達到這個目的,開發人員可能不經意的在Spark驅動中建立一個連線物件,但是在Spark worker中 嘗試呼叫這個連線物件儲存記錄到RDD中,如下
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}
這是不正確的,因為這需要先序列化連線物件,然後將它從driver傳送到worker中。這樣的連線物件在機器之間不能
傳送。它可能表現為序列化錯誤(連線物件不可序列化)或者初始化錯誤(連線物件應該 在worker中初始化)等
等。正確的解決辦法是在worker中建立連線物件。
然而,這會造成另外一個常見的錯誤-為每一個記錄建立了一個連線物件。例如:dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
通常,建立一個連線物件有資源和時間的開支。因此,為每個記錄建立和銷燬連線物件會導致非常高的開支,明顯
的減少系統的整體吞吐量。一個更好的解決辦法是利用rdd.foreachPartition方法。 為RDD的partition建立一個連線對
象,用這個兩件物件傳送partition中的所有記錄。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
最後,可以通過在多個RDD或者批資料間重用連線物件做更進一步的優化。開發者可以保有一個靜態的連線物件
池,重複使用池中的物件將多批次的RDD推送到外部系統,以進一步節省開支
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
需要注意的是,池中的連線物件應該根據需要延遲建立,並且在空閒一段時間後自動超時。這樣就獲取了最有效的
方式發生資料到外部系統。
其它需要注意的地方:(1)輸出操作通過懶執行的方式操作DStreams,正如RDD action通過懶執行的方式操作RDD。具體地看,RDD
actions和DStreams輸出操作接收資料的處理。因此,如果你的應用程式沒有任何輸出操作或者 用於輸出操作
dstream.foreachRDD(),但是沒有任何RDD action操作在dstream.foreachRDD()裡面,那麼什麼也不會執行。系統
僅僅會接收輸入,然後丟棄它們。
(2)預設情況下,DStreams輸出操作是分時執行的,它們按照應用程式的定義順序按序執行。
實驗1:把SparkStreaming的內部資料存入Mysql
(1)在mysql中建立一個表用於存放資料
- mysql> create database sparkStreaming;
- Query OK, 1 row affected (0.01 sec)
- mysql> use sparkStreaming;
- Database changed
- mysql> show tables;
- Empty set (0.01 sec)
- mysql> create table searchKeyWord(insert_time date,keyword varchar(30),search_count integer);
- Query OK, 0 rows affected (0.05 sec)
(2)用scala編寫連線Mysql的連線池
- import java.sql.Connection
- import java.sql.PreparedStatement
- import java.sql.ResultSet
- import org.apache.commons.dbcp.BasicDataSource
- import org.apache.log4j.Logger
- object scalaConnectPool {
- val log = Logger.getLogger(scalaConnectPool.this.getClass)
- var ds:BasicDataSource = null
- def getDataSource={
- if(ds == null){
- ds = new BasicDataSource()
- ds.setUsername("root")
- ds.setPassword("iamhaoren")
- ds.setUrl("jdbc:mysql://localhost:3306/sparkStreaming")
- ds.setDriverClassName("com.mysql.jdbc.Driver")
- ds.setInitialSize(20)
- ds.setMaxActive(100)
- ds.setMinIdle(50)
- ds.setMaxIdle(100)
- ds.setMaxWait(1000)
- ds.setMinEvictableIdleTimeMillis(5*60*1000)
- ds.setTimeBetweenEvictionRunsMillis(10*60*1000)
- ds.setTestOnBorrow(true)
- }
- ds
- }
- def getConnection : Connection= {
- var connect:Connection = null
- try {
- if(ds != null){
- connect = ds.getConnection
- }else{
- connect = getDataSource.getConnection
- }
- }
- connect
- }
- def shutDownDataSource: Unit=if (ds !=null){ds.close()}
- def closeConnection(rs:ResultSet,ps:PreparedStatement,connect:Connection): Unit ={
- if(rs != null){rs.close}
- if(ps != null){ps.close}
- if(connect != null){connect.close}
- }
- }
(3)編寫SparkStreaming程式
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- object dataToMySQL {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("use the foreachRDD write data to mysql").setMaster("local[2]")
- val ssc = new StreamingContext(conf,Seconds(10))
- val streamData = ssc.socketTextStream("master",9999)
- val wordCount = streamData.map(line =>(line.split(",")(0),1)).reduceByKeyAndWindow(_+_,Seconds(60))
- val hottestWord = wordCount.transform(itemRDD => {
- val top3 = itemRDD.map(pair => (pair._2, pair._1))
- .sortByKey(false).map(pair => (pair._2, pair._1)).take(3)
- ssc.sparkContext.makeRDD(top3)
- })
- hottestWord.foreachRDD( rdd =>{
- rdd.foreachPartition(partitionOfRecords =>{
- val connect = scalaConnectPool.getConnection
- connect.setAutoCommit(false)
- val stmt = connect.createStatement()
- partitionOfRecords.foreach(record =>{
- stmt.addBatch("insert into searchKeyWord (insert_time,keyword,search_count) values (now(),'"+record._1+"','"+record._2+"')")
- })
- stmt.executeBatch()
- connect.commit()
- }
- )
- }
- )
- ssc.start()
- ssc.awaitTermination()
- ssc.stop()
- }
- }
- import java.io.{PrintWriter}
- import java.net.ServerSocket
- import scala.io.Source
- object streamingSimulation {
- def index(n: Int) = scala.util.Random.nextInt(n)
- def main(args: Array[String]) {
- // 呼叫該模擬器需要三個引數,分為為檔案路徑、埠號和間隔時間(單位:毫秒)
- if (args.length != 3) {
- System.err.println("Usage: <filename> <port> <millisecond>")
- System.exit(1)
- }
- // 獲取指定檔案總的行數
- val filename = args(0)
- val lines = Source.fromFile(filename).getLines.toList
- val filerow = lines.length
- // 指定監聽某埠,當外部程式請求時建立連線
- val listener = new ServerSocket(args(1).toInt)
- while (true) {
- val socket = listener.accept()
- new Thread() {
- override def run = {
- println("Got client connected from: " + socket.getInetAddress)
- val out = new PrintWriter(socket.getOutputStream(), true)
- while (true) {
- Thread.sleep(args(2).toLong)
- // 當該埠接受請求時,隨機獲取某行資料傳送給對方
- val content = lines(index(filerow))
- println("-------------------------------------------")
- println(s"Time: ${System.currentTimeMillis()}")
- println("-------------------------------------------")
- println(content)
- out.write(content + '\n')
- out.flush()
- }
- socket.close()
- }
- }.start()
- }
- }
- }
spark
Streaming
better
than
storm
you
need
it
yes
do
it
(5)實驗啟動
在客戶端啟動資料流模擬
對socket端的資料模擬器程式進行 jar檔案的打包,並放入到叢集目錄中
啟動程式如下:
- java -cp DataSimulation.jar streamingSimulation /root/application/upload/Information 99991000
結果如下: