1. 程式人生 > >SparkStreaming(15):DStream轉換為RDD的Transform運算元

SparkStreaming(15):DStream轉換為RDD的Transform運算元

1.實現功能

    DStream中還是缺少某些API的,比如sortByKey之類的。所以使用Transform直接操作DStream中的當前job/批次對應的RDD,來替換DStream的操作(可以直接使用RDD的api),比較方便。

2.程式碼

package _0809kafka

import java.text.SimpleDateFormat
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}

/**
 * 
 */
object DStream2RddAPI {
  def main(args: Array[String]) {
    //1、建立sparkConf
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("DStream2RddAPI")
      .setMaster("local[2]")
    //2、建立sparkContext
    val sc = new SparkContext(sparkConf)

    val ssc = new StreamingContext(sc,Seconds(10))

    val socketDStream: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata.ibeifeng.com",9999)
    //dstream 當中有一些api是沒有的(例如:sortbyKey等)
    //將DStream轉換成RDD進行操作
    val resultDStream: DStream[((String, String), Int)] = socketDStream.transform((rdd,timestamp) =>{
      val sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss")
      val ts: String =sdf.format(timestamp.milliseconds)

      rdd.flatMap(_.split(" "))
        .filter(word =>word.nonEmpty)
        .map(word =>((word,ts),1))
        .reduceByKey(_ + _)
        //指定按照第二個位置上的資料型別排序,並且倒敘
        .sortBy(t =>t._2,ascending = false)
    })

    resultDStream.print()
    ssc.start()
    ssc.awaitTermination()


  }

}

3.測試

(1)開啟nc

nc -lt 9999

(2)然後再執行程式,否則會報錯!說9999埠無法連線
        
(3)輸入測試

(成功~)