1. 程式人生 > >kafka資料快取到redis的全路徑操作流程

kafka資料快取到redis的全路徑操作流程

第一步:配置redis客戶端

spark中配置redis客戶端的程式碼參考:

 

import org.apache.commons.pool2.impl.GenericObjectPoolConfig

import redis.clients.jedis.JedisPool

 

object RedisClient extends Serializable {

  val redisHost = "192.168.16.100"

  val redisPort = 6379

  val redisTimeout = 30000

  lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)

  lazy val hook = new Thread {

    override def run = {

      println("Execute hook thread: " + this)

      pool.destroy()

    }

  }

  sys.addShutdownHook(hook.run)

}

 

若出錯可能缺少jar包,需要引入common-pool2-2.2.jar 和 jedis-2.6.jar

 

第二步:資料輸入到kafka中,本列使用sparkstream

①Kafka生產資料

package Traffic

import java.util.Properties

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

import org.apache.spark.{SparkConf, SparkContext}

import org.codehaus.jettison.json.JSONObject

 

/**

  * Created by Administrator on 2017/10/14.

  * 功能:SparkStream作為kafka的生產者,將制定檔案資料打到kafka中

  *

  */

object KafkaEventProducer {

  def main(args: Array[String]): Unit = {

    //建立topic

    val topic="car_event"

    val brokers="192.168.17.108:9092"

    val props=new Properties()

    //把broker put進去

    props.put("metadata.broker.list",brokers)

    //把kafka編譯器放進去

    props.put("serializer.class","kafka.serializer.StringEncoder")

   //配置kafka的config(配置)

    val kafkaconfig=new ProducerConfig(props)

    val producer=new Producer[String,String](kafkaconfig)

   //配置spark的config

    val conf=new SparkConf().setAppName("KafkaEventProducer").setMaster("local[2]")

    val sc=new SparkContext(conf)

    //從path中載入資料

// val filePath="data/shuju.txt"

    val filePath="c://test//shuju.txt"

    //載入資料並進行切分

    val records=sc.textFile(filePath)

       .filter(!_.startsWith(";"))

           .map(_.split(",")).collect()

    //對資料進行預處理形成Json形式

    for(temp <-records)

      {

        val event=new JSONObject()

        //因為要put很多資料,這樣看起來很規範

        event

          .put("camer_id",temp(0))    //相機編號

          .put("car_id",temp(2))      //車牌號

          .put("event_time",temp(4))  //時間

          .put("car_speed",temp(6))   //速度

          .put("car_speed",temp(13))  //車道編號

        //生產event資訊 topic 是往哪個topic中生產資料 event.toString是生產的真正的內容

        producer.send(new KeyedMessage[String,String](topic,event.toString))

        println("Message Sent:  "+event)

        Thread.sleep(200) //休息200微秒

      }

  sc.stop()

  }

}

需要commons-pool2-2.2.jar,jedis-2.6.1.jar和json-lib-2.3-jdk15.jar 

 ②啟動kafka  建立car_event 和 topic

start-kafka.sh

kafka-topics.sh --create --zookeeper hadoop:2181 --topic car_event --partitions 1 --replication-factor 1

Created topic "car_event".

第三小步:啟動car_event的topic的消費者,此步僅僅是為了驗證資料的

Kafka-console-consumer.sh --topic car_event --zookeeper hadoop:2181

第三步:idea中部署kafka打入redis的程式碼,如下所示:

package Traffic

 

import java.text.SimpleDateFormat

import java.util.Calendar

import kafka.serializer.{StringDecoder, StringEncoder}

import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.{SparkConf, SparkContext}

import net.sf.json.JSONObject

 

/**

  * Created by Administrator on 2017/10/14.

  * 功能: 從kafka中獲取資料寫入到redis中

  *

  */

object CarEventAnalysis {

  def main(args: Array[String]): Unit = {

   //配置SparkStrteaming

    val conf=new SparkConf().setAppName("CarEventAnalysis").setMaster("local[2]")

    val sc=new SparkContext(conf)

    val ssc=new StreamingContext(sc,Seconds(5))

    val dbindex=1 //指定是用哪個資料庫進行連線

    //從kafka中讀取資料(用直連的方法)

    val topics=Set("car_event")

    // 只要和brokers相關的都要寫全

    val brokers="192.168.17.108:9092"

    //配置kafka引數

    val kafkaParams=Map[String,String](

      "metadata.broker.list"->brokers,

    "serializer.class"->"kafka.serializer.StringEncoder"

    )

    //建立一個流  這是一個模板程式碼  引數中的兩個String代表的是kafka的鍵值對的資料,及key和value

    val kafkaStream=KafkaUtils.createDirectStream[String,String,

                    StringDecoder,StringDecoder](ssc,kafkaParams,topics)

    //從kafka中將資料讀出

    val events=kafkaStream.flatMap(line=>{

      //轉換為object

      val data=JSONObject.fromObject(line._2) // ._2是真正的資料

//      println(data)

      //必須用Some修飾data option有兩個子類 none 代表無值  some代表有值

      // 加上some表示一定有值,後面有x.getString和x.getInt,保證程式能知道有值

      Some(data)

    })

    //從kafka中取出卡口編號和速度資料

    val carspeed=events.map(x=>(x.getString("camer_id"),x.getInt("car_speed")))

    //把資料變成(camer_id,(car_speed,1))

        .mapValues((x:Int)=>(x,1.toInt))

      //每隔10秒計算一次前20秒的速度(4個rdd) Tuple2表示兩個引數

      // (速度,數量)  (速度,數量)

      .reduceByKeyAndWindow((a:Tuple2[Int,Int], b:Tuple2[Int,Int]) =>

      {(a._1 + b._1,a._2 + b._2)},Seconds(20),Seconds(10))

    // carspeed  速度之和  數量之和

//    carspeed.map{case(key,value)=>(key,value._1/value._2.toFloat)}

     carspeed.foreachRDD(rdd=>{

       rdd.foreachPartition(partitionofRecords=>{

         //得到連線池的一個資源

         val jedis=RedisClient.pool.getResource

         // camer_id 卡口以及總的速度

         partitionofRecords.foreach(pair=>{

           val camer_id=pair._1  //卡口

           val total_speed=pair._2._1  //總的速度

           val count=pair._2._2  //總的數量

           val now=Calendar.getInstance().getTime() //獲取當前的時間

           val minuteFormat=new SimpleDateFormat("HHmm") //獲取分鐘格式

           val dayFormat=new SimpleDateFormat("yyyyMMdd") //獲取天格式

           val time = minuteFormat.format(now) //獲取分鐘

           val day = dayFormat.format(now)     //獲取天

 

           //開始往redis中插入資料

           if(count!=0){

             jedis.select(dbindex)   //用選擇的資料庫

             // set進去一個map

             jedis.hset(day + "_" + camer_id, time ,total_speed + "_" + count)

             // 從redis中取資料

             val foreachdata=jedis.hget(day + "_" + camer_id, time)

             println(foreachdata)

           }

         })

         RedisClient.pool.returnResource(jedis)

       })

     })

    println("----------計算開始---------------------------")

    ssc.start()

    ssc.awaitTermination()

  }

}

第四步: idea中執行第步部署好的kafka打入redis的程式碼

記得要引入ezmorph-1.0.6.jar, commons-collections-3.2.jar ,

commons-lang-2.3.jar,commons-pool2-2.2.jar, 共四個jar 

第五步:執行第步的往kafka中打資料的程式

第六步:登入到redis的客戶端,驗證資料是否存入redis中

redis-cli -p 12002

Select 資料庫名稱