1. 程式人生 > >streaming流式讀取hdfs採坑記

streaming流式讀取hdfs採坑記

package rockerMQ


import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext, sql}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @Auther: sss
  * @Date: 2018/11/26 10:05
  * @Description:Streaming接收hdfsDemo
  */
object Demo02 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val session = SparkSession.builder().config(conf).getOrCreate()
    import session.implicits._
    val value = ssc.textFileStream("hdfs://192.168.xx.xx:9000/tmp/cxbtest")

    value.foreachRDD(rdd => {
     val df= rdd.map(row => {
        val name = row.split(" ", row.length)(0)
        val name1 = row.split(" ", row.length)(1)
        val name2 = row.split(" ", row.length)(2)
        Test(name, name1, name2)
      }).toDF()
      df.createOrReplaceTempView("tmp")
      session.sql("select name,name1,name2 from tmp where name='a' ").show()
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

case class Test(name: String, name1: String, name2: String)

程式碼如上,自己測試寫的

發現流式讀取hdfs有個大坑,公司是用rocketMQ作為訊息中介軟體,實時將資料接到hdfs上,因為不會

用sparkStreaming整合mq,就用這種方式來接,發現死活接收不到hdfs的流資料,自己在hdfs上建立目錄然後再往裡面上傳文字發現程式碼是好使的,最後猜想原因是

自己往hdfs上傳的檔案或是文字都是幾KB,而hdfs接收mq的資料都是幾GB為一個文字,感覺流式讀取hdfs是以

一個個文字為讀取的批次??  不是沒有讀到文字,而是一個文字形成的時間太久了,hdfs還沒形成一個文字,我這邊就給kill了