1. 程式人生 > >SparkStreaming整合kafka直連模式direct方式

SparkStreaming整合kafka直連模式direct方式

org.apache.spark spark-streaming_2.10 1.6.2 org.apache.spark spark-streaming-kafka_2.10 1.6.2

val checkpoint =“hdfs://bdha/checkpoint” val conf = new SparkConf().setMaster(“local”).setAppName(“AdRealStatJob”) val sc=new SparkContext(conf) def createFunction(): StreamingContext = { val ssc = new StreamingContext(conf, Seconds(5)) ssc.checkpoint(checkpoint) val sqlContext =new SQLContext(sc) val topics =Set(“Test”) val kafkaParams = Map( “metadata.broker.list” -> “centos01:9092,centos02:9092,centos03:9092”,//kafka主題叢集 “auto.offset.reset” -> “largest” ) val messages: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(.2) messages.persist(StorageLevel.MEMORY_ONLY) val dateUserAd2CountsDStream:DStream[(String, Int)]=messages.map{case message => { val fields = message.split("\s+") val timestamp = fields(0).toLong val province = fields(1) val city = fields(2) val userId = fields(3) val adId = fields(4) val date = DateUtils.formatDate(timestamp) val key = date + "

" + userId + "" + adId (key, 1) }}.reduceByKey(+) dateUserAd2CountsDStream.foreachRDD(rdd => { if(!rdd.isEmpty()) { rdd.foreachPartition(partition => { if(!partition.isEmpty) { val auccDao = new AdUserClickCountDaoImpl val blacklistDao = new AdBlacklistDaoImpl val blacklists = new util.ArrayList
AdBlacklist
// val auccList = new util.ArrayListAdUserClickCount partition.foreach{case (dateUserAd, count) => { val fields = dateUserAd.split("_") val date = fields(0) val userId = fields(1).toInt val adId = fields(2).toInt val historyCount = auccDao.getClickCountByDateUserAd(date, userId, adId) val clickCount = count + historyCount if(clickCount > 10) {//黑名單使用者 val blacklist = new AdBlacklist blacklist.setUser_id(userId) blacklists.add(blacklist) } /else {//如果a使用者昨天已經加入到黑名單中了,但是進行操作沒有達到預值,這回導致該使用者資料進行資料庫,這是不對的 val aucc = new AdUserClickCount aucc.setAd_id(adId) aucc.setDate(date) aucc.setUser_id(userId) aucc.setClick_count(count) auccList.add(aucc) }
/ }} blacklistDao.insertBatch(blacklists) }}}} ssc } val ssc = StreamingContext.getOrCreate(checkpoint, createFunction) ssc.start() ssc.awaitTermination() //ssc.stop()