Spark Streaming--3 Spark 與 Kafka整合
阿新 • • 發佈:2019-01-05
- 引入jar包依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
- 編寫scala
//Stream2Kafka import kafka.serializer.StringDecoder import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * */ object Stream2Kafka extends App { //建立配置物件 val conf = new SparkConf().setAppName("kafka").setMaster("local[3]") //建立SparkStreaming操作物件 val ssc = new StreamingContext(conf,Seconds(5)) //連線Kafka就需要Topic //輸入的topic val fromTopic = "source" //輸出的Topic val toTopic = "target" //建立brokers的地址 val brokers = "master:9092,slave1:9092,slave3:9092,slave2:9092" //Kafka消費者配置物件 val kafkaParams = Map[String, Object]( //用於初始化連結到叢集的地址 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, //Key與VALUE的序列化型別 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer], //用於標識這個消費者屬於哪個消費團體 ConsumerConfig.GROUP_ID_CONFIG->"kafka", //如果沒有初始化偏移量或者當前的偏移量不存在任何伺服器上,可以使用這個配置屬性 //可以使用這個配置,latest自動重置偏移量為最新的偏移量 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"latest", //如果是true,則這個消費者的偏移量會在後臺自動提交 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG->(false: java.lang.Boolean) ) //建立DStream,連線到Kafka,返回接收到的輸入資料 val inputStream = { KafkaUtils.createDirectStream[String, String]( ssc, //位置策略(可用的Executor上均勻分配分割槽) LocationStrategies.PreferConsistent, //消費策略(訂閱固定的主題集合) ConsumerStrategies.Subscribe[String, String](Array(fromTopic), kafkaParams)) } inputStream.map{record => "hehe--"+record.value}.foreachRDD { rdd => //在這裡將RDD寫回Kafka,需要使用Kafka連線池 rdd.foreachPartition { items => val kafkaProxyPool = KafkaPool(brokers) val kafkaProxy = kafkaProxyPool.borrowObject() for (item <- items) { //使用這個連線池 kafkaProxy.kafkaClient.send(new ProducerRecord[String, String](toTopic, item)) } kafkaProxyPool.returnObject(kafkaProxy) } } ssc.start() ssc.awaitTermination() } //Kafka連線池 import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool} import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} import org.apache.kafka.common.serialization.StringSerializer //因為要將Scala的集合型別轉換成Java的 import scala.collection.JavaConversions._ class KafkaProxy(broker:String){ val conf = Map( //用於初始化連結到叢集的地址 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> broker, //Key與VALUE的序列化型別 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG->classOf[StringSerializer], ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG->classOf[StringSerializer] ) val kafkaClient = new KafkaProducer[String,String](conf) } //建立一個建立KafkaProxy的工廠 class KafkaProxyFactory(broker:String) extends BasePooledObjectFactory[KafkaProxy]{ //建立例項 override def create(): KafkaProxy = new KafkaProxy(broker) //包裝例項 override def wrap(t: KafkaProxy): PooledObject[KafkaProxy] = new DefaultPooledObject[KafkaProxy](t) } object KafkaPool { private var kafkaPool:GenericObjectPool[KafkaProxy]=null def apply(broker:String): GenericObjectPool[KafkaProxy] ={ if(kafkaPool == null){ this.kafkaPool = new GenericObjectPool[KafkaProxy](new KafkaProxyFactory(broker)) } kafkaPool } }
- 啟動zookeeper
zkServer.sh start
- 每個節點啟動kafka
kafka-server-start.sh /opt/apps/Kafka/kafka_2.11_2.0.0/config/server.properties &
- 建立兩個主題
[[email protected] ~]# kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181,slave3:2181,slave4:2181 --replication-factor 2 --partitions 2 --topic source
[ [email protected] ~]# kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181,slave3:2181,slave4:2181 --replication-factor 2 --partitions 2 --topic target
- 啟動producer 寫入資料到source
[[email protected] ~]# kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092,slave3:9092,slave4:9092 --topic source
- 啟動consumer 監聽target的資料
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave2:9092,slave3:9092,slave4:9092 --topic target
最終的流程是:建立兩個主題,source、target,從kafka生產者輸入資料source,接著到反序列化SparkStreaming的ConsumerConfig消費,接著通過代理序列化輸出至SparkStreaming的ProducerConfig生產端,然後可以處理資料,處理完資料之後,傳送到kafka的consumer消費者監聽target,監聽到的資料進行輸出。