1. 程式人生 > >spark Streaming 直接消費Kafka資料,儲存到 HDFS 實戰程式設計實踐

spark Streaming 直接消費Kafka資料,儲存到 HDFS 實戰程式設計實踐

最近在學習spark streaming 相關知識,現在總結一下

主要程式碼如下

def createStreamingContext():StreamingContext ={
  val sparkConf = new SparkConf().setAppName("myStreamingText").setMaster(ConfigInfo.MasterConfig)
  sparkConf.set("spark.streaming.kafka.maxRetries", "100")
  sparkConf.set("spark.streaming.kafka.maxRatePerParititon", "1000")
  val ssc = new StreamingContext(sparkConf,Seconds(ConfigInfo.durationConfig))
  ssc.checkpoint(ConfigInfo.checkpointConfig)
  ssc
}

def createKafkaDstream(ssc : StreamingContext, kafkaParams : Map[String,String], topics : Set[String] ): InputDStream[(String,String)] = {

  val kafkaDstream = KafkaUtils.createDirectStream[String,String,StringDecoder, StringDecoder](ssc, kafkaParams, topics)
  kafkaDstream
}

val ssc = StreamingContext.getOrCreate(ConfigInfo.checkpointConfig, createStreamingContext)
val kafkaParams= Map[String,String]("metadata.brokers.list" -> ConfigInfo.brokerListConfig, "group.id" -> ConfigInfo.groupIdConfig)
val topics = Set[String](ConfigInfo.groupIdConfig)

val dStream = createKafkaDstream(ssc, kafkaParams, topics)

def saveToHDFS(rdd : RDD[String]) : Unit ={
  def convertData(line:String) = {
    val key = SparkUtil.fetchKey(line)
    (new Text(key), new Text(line))
  }

 val dataToSaveHDFS : RDD[Tuple2[Text,Text]] = rdd.map( line =>{ convertData(line)
  })

  val hadoopConf = rdd.context.hadoopConfiguration
  hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
  hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec")
  hadoopConf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK")

 dataToSaveHDFS.saveAsNewAPIHadoopFile(ConfigInfo.saveHdfsPathConfig, classOf[Text], classOf[Text], classOf[StreamingDataOutputFormat[Text,Text]])

 }