1. 程式人生 > >spark streaming 每10s打印出前20s的所有記錄

spark streaming 每10s打印出前20s的所有記錄

使用spark streaming基本入門demo 程式碼如下:

 

package com.xj365.bdp

import org.apache.spark.SparkConf

import org.apache.spark.streaming._

import org.apache.spark.streaming.kafka010._

 

object DirectKafkaWordCount {

  def main(args: Array[String]) {

    if (args.length < 2) {

      System.err.println(s"""

                            |Usage: DirectKafkaWordCount <brokers> <topics>

                            |  <brokers> is a list of one or more Kafka brokers

                            |  <topics> is a list of one or more kafka topics to consume from

                            |

        """.stripMargin)

      System.exit(1)

    }

 

    val Array(brokers, topics) = args

 

    // Create context with 2 second batch interval

    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(2))

 

    // Create direct kafka stream with brokers and topics

    val topicsSet = topics.split(",").toSet

    val kafkaParams = Map[String, String]("bootstrap.servers" -> brokers,"key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",

      "value.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer","group.id"->"wuzhanwei")

    val messages = KafkaUtils.createDirectStream[String, String](

      ssc,

      LocationStrategies.PreferConsistent,

      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

 

    // Get the lines, split them into words, count the words and print

    val lines = messages.map(_.value).window(Seconds(20),Seconds(10))

    lines.foreachRDD{

      rdd=>rdd.foreachPartition{

        it=>it.foreach{

          case msg=>

            println(System.currentTimeMillis()+"msg:"+msg)

        }

      }

    }

    // Start the computation

    ssc.start()

    ssc.awaitTermination()

  }

}

// scalastyle:on println