1. 程式人生 > >kafka生產者消費者API 與sparkStreaming 整合(scala版)

kafka生產者消費者API 與sparkStreaming 整合(scala版)

maven配置檔案

       <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.0.0</version>
        </dependency
>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency
>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version
>
<scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.1</version> <scope>provided</scope> </dependency>
1. kafka生產者
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.io.Source
import scala.reflect.io.Path

class KafkaProduceMsg extends Runnable {

  private val BROKER_LIST = "slave6:9092,slave7:9092"
  private val TOPIC = "kafka"
  private val DIR = "C:\\Users\\admin\\Desktop\\kafka-data.txt"

  /**
    * 1、配置屬性
    * metadata.broker.list : kafka叢集的broker
    * serializer.class : 如何序列化傳送訊息
    * request.required.acks : 1代表需要broker接收到訊息後acknowledgment,預設是0
    * producer.type : 預設就是同步sync
    */
  private val props = new Properties()
  props.put("bootstrap.servers",BROKER_LIST)
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("request.required.acks", "1")
  props.put("producer.type", "async")

  private val producer = new KafkaProducer[String,String](props)

  def run(): Unit = {
    println("開始生產訊息!!!!!!!!!!")
    while(true){
      val files = Path(this.DIR).walkFilter(p => p.isFile)
      try {
        for(file <- files){
          val reader = Source.fromFile(file.toString(),"UTF-8")
          for(line <- reader.getLines()){
            var m = 0
            while(m < 10){
              val record = new ProducerRecord[String,String](this.TOPIC,"key",line)
              m = m + 1
              println(m + "" + record)
              producer.send(record)
            }
            try{
              Thread.sleep(3000)
            }catch {
              case e : Exception => println(e)
            }
          }
        }
      }catch{
        case e : Exception => println(e)
      }
    }
  }
}

生產者執行程式:

object Msg {
  def main(args: Array[String]): Unit = {
    new Thread(new KafkaProduceMsg()).start()
  }

}
2. 消費者sparkStreaming
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 2.spark-streaming消費資料,匹配應用層是否含有制定關鍵字,
  *   如果包含就儲存下來,不包含就丟棄
  */
object KafkaConsumer {
  def main(args: Array[String]): Unit = {
    //    建立sparksession
    val conf = new SparkConf().setAppName("Consumer")
    val ssc = new StreamingContext(conf,Seconds(5))
    //    設定中間儲存的檢查點,可以進行累計計算
//    ssc.checkpoint("hdfs://master:9000/xxx")
    //    讀取kafka資料
    val kafkaParam = Map("metadata.broker.list" -> "slave6:9092,slave7:9092")
    val topic = "kafka".split(",").toSet
    //    獲取日誌資料
    val logDStream: DStream[String] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParam,topic).map(_._2)
    logDStream.print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}