將Streaming拉取的資料存入redis中
阿新 • • 發佈:2018-12-27
啟動redis: ./redis-cli -h hadoop01
package utils import org.apache.commons.pool2.impl.GenericObjectPoolConfig import redis.clients.jedis.{Jedis, JedisPool} /** * 建立jedis連線池 */ object Jpools { private val poolConfig = new GenericObjectPoolConfig() poolConfig.setMaxIdle(5)//最大的空閒連線數 poolConfig.setMaxTotal(2000)//支援最大的連線數 //連線池不需要對外提供訪問 private lazy val jedisPool = new JedisPool(poolConfig,"hadoop01") /** * 對外提供一個可以從池子裡面獲取連線的方法 * @return */ def getJedis :Jedis={ val jedis = jedisPool.getResource jedis.select(1) jedis } }
package shujuku import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import utils.Jpools /** * 將實時統計的詞頻寫入到redis裡面 */ object WordCountRedis { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("WordCountRedis") //每2秒鐘取樣一次資料 //第二個引數是批次時間間隔,多長時間的資料集作為一個批次,這個時間不能隨意設定,必須是科學合理的設定,只有這樣才能穩定執行 val ssc = new StreamingContext(conf,Seconds(3)) //接收資料 val words: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01",1235) words.foreachRDD(rdd=>{ //計算當前批次結果 val current_batch_result: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //把計算好的當前批次結果寫到redis current_batch_result.foreachPartition(partition=>{ //每個分割槽從池子裡獲取一個連線物件 val jedis= Jpools.getJedis partition.foreach(tp=>{ //redis的特性hincrby jedis.hincrBy("wordcount",tp._1,tp._2) }) //用完之後,記得文明 jedis.close() }) }) ssc.start() ssc.awaitTermination() } }