1. 程式人生 > >Spark Streaming第三部分

Spark Streaming第三部分

updateStateByKey運算元

需求,統計到目前為止,累計出現的單詞個數(需要保持之前的狀態)

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}
  1. 如果使用了帶狀態的運算元,必須指定checkpoint,來連線老值和新值
  2. 在生產環境中,建議大家把checkPoint設定到HDFS某個資料夾中
  3. 傳進去的引數就是定義的方法,其中包含了隱式轉換 

官網給出的解釋如下:

The update function will be called for each word, with newValues having a sequence of 1’s (from the (word, 1) pairs) and the runningCount having the previous count.

Note that using updateStateByKey requires the checkpoint directory to be configured, which is discussed in detail in the checkpointing section.
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StatefulWordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("StatefulWordCount")
      .setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf,Seconds(5))
    val lines = ssc.socketTextStream("192.168.1.6",1111)
    //在生產環境中,建議大家把checkPoint設定到HDFS某個資料夾中
//如果使用了帶狀態的運算元,必須指定checkpoint,來連線老值和新值
    ssc.checkpoint(".")
    val result = lines.flatMap(_.split(" ")).map((_,1))
   //這裡傳進去的引數就是定義的方法,其中包含了隱式轉換
    val state = result.updateStateByKey[Int](updateFunction _)

    state.print()
    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * 用當前的資料去更新已有的或者是老的資料
    * @param newValues
    * @param preValues
    * @return
    */
  def updateFunction(newValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
    val newCount = newValues.sum
    val pre = preValues.getOrElse(0)
    Some(newCount+pre)
  }
}

將統計結果寫入到MySql資料庫中

首先將之前程式產生的checkPoint刪掉

其中用到foreachRDD運算元

其中官網的解釋如下

The most generic output operator that applies a function, func, 
to each RDD generated from the stream. 
This function should push the data in each RDD to an external system,
such as saving the RDD to files, or writing it over the network to a database. 
Note that the function func is executed in the driver process running the streaming application, 
and will usually have RDD actions in it that will force the computation of the streaming RDDs.
dstream.foreachRDD is a powerful primitive that allows data to be sent out to external systems. 
However, it is important to understand how to use this primitive correctly and efficiently

其中操作foreachRDD運算元常出現的錯誤有如下:

序列化異常

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

成本 

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

優化版本1; 

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

終極優化版本

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

將統計結果寫入到MySQL中

資料庫建立表

create table wordcount(
word varchar(50) default null,
wordcount int(10) default null
);

Mysql連線池建立

  def createConnection()={
    Class.forName("com.mysql.jdbc.Driver")
    DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_spark","root","root")
  }

將資料寫輸入Mysql中

 result.foreachRDD { rdd =>
        rdd.foreachPartition { partitionOfRecords =>{
            val connection = createConnection()
            partitionOfRecords.foreach(record => {
              val sql = "insert into wordcount(word,wordcount) values('"+record._1+"',"+record._2+")"
              connection.createStatement().execute(sql)
            })
            connection.close()
        }
      }

最後結果驗證...

總結:

通過該sql將統計結果寫入到MySQL

insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"

存在的問題:

1) 對於已有的資料做更新,而是所有的資料均為insert

改進思路:

a) 在插入資料前先判斷單詞是否存在,如果存在就update,不存在則insert

b) 工作中:HBase/Redis

2) 每個rdd的partition建立connection,建議大家改成連線池

視窗函式的使用

window:定時的進行一個時間段內的資料處理

window length : 視窗的長度

sliding interval: 視窗的間隔

這2個引數和我們的batch size有關係:倍數

每隔多久計算某個範圍內的資料:每隔10秒計算前10分鐘的wc  ==> 每隔sliding interval統計前window length的值

官網給出的例子

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

黑名單過濾:

  • 將訪問日誌轉化為DStream
  • 黑名單列表轉為為RDD
  • 對DStream和RDD進行LeftJoin操作篩選出非黑名單的日誌資訊
訪問日誌   ==> DStream
20180808,zs
20180808,ls
20180808,ww
   ==>  (zs: 20180808,zs)(ls: 20180808,ls)(ww: 20180808,ww)

黑名單列表  ==> RDD
zs
ls
   ==>(zs: true)(ls: true)
==> 20180808,ww

leftjoin
(zs: [<20180808,zs>, <true>])  x 
(ls: [<20180808,ls>, <true>])  x
(ww: [<20180808,ww>, <false>])  ==> tuple 1

程式碼如下:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 黑名單過濾
  */
object TransformApp {


  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

    /**
      * 建立StreamingContext需要兩個引數:SparkConf和batch interval
      */
    val ssc = new StreamingContext(sparkConf, Seconds(5))


    /**
      * 構建黑名單
      */
    val blacks = List("zs", "ls")
    val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true))

    val lines = ssc.socketTextStream("localhost", 6789)
    val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
      rdd.leftOuterJoin(blacksRDD)
        .filter(x=> x._2._2.getOrElse(false) != true)
        .map(x=>x._2._1)
    })

    clicklog.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

Spark Sql和Spark Streaming整合

在POM.xml中加入Spark sql的依賴

    <!--SparkSQL-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
      <!--
      <scope>provided</scope>
      -->
    </dependency>

 SparkSession是單例的

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}

/**
  * Spark Streaming整合Spark SQL完成詞頻統計操作
  */
object SqlNetworkWordCount {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val lines = ssc.socketTextStream("localhost", 6789)
    val words = lines.flatMap(_.split(" "))

    // Convert RDDs of the words DStream to DataFrame and run SQL query
    words.foreachRDD { (rdd: RDD[String], time: Time) =>
      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
      import spark.implicits._

      // Convert RDD[String] to RDD[case class] to DataFrame
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()

      // Creates a temporary view using the DataFrame
      wordsDataFrame.createOrReplaceTempView("words")

      // Do word count on table using SQL and print it
      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      println(s"========= $time =========")
      wordCountsDataFrame.show()
    }


    ssc.start()
    ssc.awaitTermination()
  }


  /** Case class for converting RDD to DataFrame */
  case class Record(word: String)


  /** Lazily instantiated singleton instance of SparkSession */
  object SparkSessionSingleton {

    @transient  private var instance: SparkSession = _

    def getInstance(sparkConf: SparkConf): SparkSession = {
      if (instance == null) {
        instance = SparkSession
          .builder
          .config(sparkConf)
          .getOrCreate()
      }
      instance
    }
  }
}

效果驗證