1. 程式人生 > >將Streaming拉取的資料存入redis中

將Streaming拉取的資料存入redis中

啟動redis:   ./redis-cli -h hadoop01

package utils

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{Jedis, JedisPool}

/**
  * 建立jedis連線池
  */
object Jpools {
   private val poolConfig = new GenericObjectPoolConfig()
  poolConfig.setMaxIdle(5)//最大的空閒連線數
  poolConfig.setMaxTotal(2000)//支援最大的連線數
  //連線池不需要對外提供訪問
  private lazy val jedisPool = new JedisPool(poolConfig,"hadoop01")

  /**
    * 對外提供一個可以從池子裡面獲取連線的方法
    * @return
    */
  def getJedis :Jedis={
    val jedis = jedisPool.getResource
    jedis.select(1)
    jedis
  }
}
package shujuku

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import utils.Jpools

/**
  * 將實時統計的詞頻寫入到redis裡面
  */
object WordCountRedis {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("WordCountRedis")
    
    //每2秒鐘取樣一次資料
    //第二個引數是批次時間間隔,多長時間的資料集作為一個批次,這個時間不能隨意設定,必須是科學合理的設定,只有這樣才能穩定執行
    val ssc = new StreamingContext(conf,Seconds(3))
    //接收資料
    val words: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01",1235)
    
    words.foreachRDD(rdd=>{
      //計算當前批次結果
      val current_batch_result: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

      //把計算好的當前批次結果寫到redis
      current_batch_result.foreachPartition(partition=>{
        //每個分割槽從池子裡獲取一個連線物件
       val jedis= Jpools.getJedis
        partition.foreach(tp=>{
        //redis的特性hincrby
          jedis.hincrBy("wordcount",tp._1,tp._2)
        })
        //用完之後,記得文明
        jedis.close()
      })
    })
    ssc.start()
    ssc.awaitTermination()
  }
}