kafka生產者消費者API 與sparkStreaming 整合(scala版)
阿新 • • 發佈:2019-02-15
maven配置檔案
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency >
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency >
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version >
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
1. kafka生產者
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.io.Source
import scala.reflect.io.Path
class KafkaProduceMsg extends Runnable {
private val BROKER_LIST = "slave6:9092,slave7:9092"
private val TOPIC = "kafka"
private val DIR = "C:\\Users\\admin\\Desktop\\kafka-data.txt"
/**
* 1、配置屬性
* metadata.broker.list : kafka叢集的broker
* serializer.class : 如何序列化傳送訊息
* request.required.acks : 1代表需要broker接收到訊息後acknowledgment,預設是0
* producer.type : 預設就是同步sync
*/
private val props = new Properties()
props.put("bootstrap.servers",BROKER_LIST)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("request.required.acks", "1")
props.put("producer.type", "async")
private val producer = new KafkaProducer[String,String](props)
def run(): Unit = {
println("開始生產訊息!!!!!!!!!!")
while(true){
val files = Path(this.DIR).walkFilter(p => p.isFile)
try {
for(file <- files){
val reader = Source.fromFile(file.toString(),"UTF-8")
for(line <- reader.getLines()){
var m = 0
while(m < 10){
val record = new ProducerRecord[String,String](this.TOPIC,"key",line)
m = m + 1
println(m + "" + record)
producer.send(record)
}
try{
Thread.sleep(3000)
}catch {
case e : Exception => println(e)
}
}
}
}catch{
case e : Exception => println(e)
}
}
}
}
生產者執行程式:
object Msg {
def main(args: Array[String]): Unit = {
new Thread(new KafkaProduceMsg()).start()
}
}
2. 消費者sparkStreaming
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 2.spark-streaming消費資料,匹配應用層是否含有制定關鍵字,
* 如果包含就儲存下來,不包含就丟棄
*/
object KafkaConsumer {
def main(args: Array[String]): Unit = {
// 建立sparksession
val conf = new SparkConf().setAppName("Consumer")
val ssc = new StreamingContext(conf,Seconds(5))
// 設定中間儲存的檢查點,可以進行累計計算
// ssc.checkpoint("hdfs://master:9000/xxx")
// 讀取kafka資料
val kafkaParam = Map("metadata.broker.list" -> "slave6:9092,slave7:9092")
val topic = "kafka".split(",").toSet
// 獲取日誌資料
val logDStream: DStream[String] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParam,topic).map(_._2)
logDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}