SparkStreaming讀取kafka資料進行反序列化以及mapPartition優化例項
阿新 • • 發佈:2019-02-08
val monitorWrappedMessage1 = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder]( ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2).mapPartitions( partitions => { val zookeeperAvroSchemaPersister = new ZookeeperAvroSchemaPersister zookeeperAvroSchemaPersister.setServers("10.194.1.2:2181") zookeeperAvroSchemaPersister.setConnectionTimeout(10000) zookeeperAvroSchemaPersister.init() val avroMessageDecoder = new AvroMessageDecoder avroMessageDecoder.setAvroMessageEntityPackageToScan("com.networkbench.newlens.datacollector.backend.aggregate.wrappedmessage.own") avroMessageDecoder.setAvroSchemaPersister(zookeeperAvroSchemaPersister) val mWMessage = partitions.map(line => avroMessageDecoder.decode(line._2).asInstanceOf[MonitorWrappedMessage]).toList mWMessage.toIterator })
該段程式碼涉及到 反序列化與 mapPartition優化
===============================mapPartition優化案例===================================
val newRd = myRdd.mapPartitions(partition => { val connection = new DbConnection /*creates a db connection per partition*/ val newPartition = partition.map(record => { readMatchingFromDB(record, connection) }).toList // consumes the iterator, thus calls readMatchingFromDB connection.close() // close dbconnection here newPartition.iterator // create a new iterator })
參考文獻:
http://www.hongyusu.com/amt/spark-streaming-kafka-avro-and-registry.html https://github.com/cpbaranwal/Avro-SparkStreaming-Kafka https://community.hortonworks.com/articles/33275/receiving-avro-messages-through-kafka-in-a-spark-s.html
附程式碼:
package com.sparkdeveloper.receiver import java.io.ByteArrayOutputStream import java.util.HashMap import org.apache.avro.SchemaBuilder import org.apache.avro.io.EncoderFactory import org.apache.avro.specific.SpecificDatumWriter import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.serializer.KryoSerializer import kafka.serializer._ import org.apache.spark.streaming.{Seconds, StreamingContext} import java.io.{ByteArrayOutputStream, File, IOException} import org.apache.spark.streaming.{Seconds, StreamingContext, Time} import org.apache.avro.file.{DataFileReader, DataFileWriter} import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder} import org.apache.avro.io.EncoderFactory import org.apache.avro.io._ import org.apache.avro.SchemaBuilder import org.apache.avro.Schema import org.apache.avro._ import org.apache.spark.sql.SQLContext import org.apache.spark.streaming.kafka._ import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import com.databricks.spark.avro._ import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.DeserializationFeature case class HashtagEntities(text: String, start: Double, end: Double) case class User(id: Double, name: String, screenName: String, location: String, description: String, url: String, statusesCount: Double) case class Tweet(text: String, createdAt: String, lang: String, source: String, expandedURL: String, url: String, screenName: String, description: String, name: String, retweetCount: Double, timestamp: Long, favoriteCount: Double, user: Option[User], hashtags: HashtagEntities) /** * Created by timothyspann */ object KafkaConsumer { val tweetSchema = SchemaBuilder .record("tweet") .fields .name("tweet").`type`().stringType().noDefault() .name("timestamp").`type`().longType().noDefault() .endRecord def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR) val logger: Logger = Logger.getLogger("com.sparkdeveloper.receiver.KafkaConsumer") val sparkConf = new SparkConf().setAppName("Avro to Kafka Consumer") sparkConf.set("spark.cores.max", "24") // For my sandbox sparkConf.set("spark.serializer", classOf[KryoSerializer].getName) sparkConf.set("spark.sql.tungsten.enabled", "true") sparkConf.set("spark.eventLog.enabled", "true") sparkConf.set("spark.app.id", "KafkaConsumer") // want to know your app in the UI sparkConf.set("spark.io.compression.codec", "snappy") sparkConf.set("spark.rdd.compress", "true") sparkConf.set("spark.streaming.backpressure.enabled", "true") sparkConf.set("spark.sql.parquet.compression.codec", "snappy") sparkConf.set("spark.sql.parquet.mergeSchema", "true") sparkConf.set("spark.sql.parquet.binaryAsString", "true") val sc = new SparkContext(sparkConf) sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") val ssc = new StreamingContext(sc, Seconds(2)) try { val kafkaConf = Map( "metadata.broker.list" -> "sandbox.hortonworks.com:6667", "zookeeper.connect" -> "sandbox.hortonworks.com:2181", // Default zookeeper location "group.id" -> "KafkaConsumer", "zookeeper.connection.timeout.ms" -> "1000") val topicMaps = Map("meetup" -> 1) // Create a new stream which can decode byte arrays. val tweets = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder] (ssc, kafkaConf,topicMaps, StorageLevel.MEMORY_ONLY_SER) try { tweets.foreachRDD((rdd, time) => { if (rdd != null) { try { val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val rdd2 = rdd.map { case (k, v) => parseAVROToString(v) } try { val result = rdd2.mapPartitions(records => { val mapper = new ObjectMapper() mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) mapper.registerModule(DefaultScalaModule) records.flatMap(record => { try { Some(mapper.readValue(record, classOf[Tweet])) } catch { case e: Exception => None; } }) }, true) val df1 = result.toDF() logger.error("Registered tweets: " + df1.count()) df1.registerTempTable("tweets") // To show how easy it is to write multiple formats df1.write.format("orc").mode(org.apache.spark.sql.SaveMode.Append).orc("orcresults") df1.write.format("avro").mode(org.apache.spark.sql.SaveMode.Append).avro("avroresults") df1.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).parquet("parquetresults") df1.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).json("jsonresults") } catch { case e: Exception => None; } } catch { case e: Exception => None; } } }) } catch { case e: Exception => println("Writing files after job. Exception:" + e.getMessage); e.printStackTrace(); } } catch { case e: Exception => println("Kafka Stream. Writing files after job. Exception:" + e.getMessage); e.printStackTrace(); } ssc.start() ssc.awaitTermination() } def parseAVROToString(rawTweet: Array[Byte]): String = { try { if (rawTweet.isEmpty) { println("Rejected Tweet") "Empty" } else { deserializeTwitter(rawTweet).get("tweet").toString } } catch { case e: Exception => println("Exception:" + e.getMessage); "Empty" } } def deserializeTwitter(tweet: Array[Byte]): GenericRecord = { try { val reader = new GenericDatumReader[GenericRecord](tweetSchema) val decoder = DecoderFactory.get.binaryDecoder(tweet, null) reader.read(null, decoder) } catch { case e: Exception => None; null; } } } // scalastyle:on println build.sbt name := "KafkaConsumer" version := "1.0" scalaVersion := "2.10.6" jarName in assembly := "kafkaconsumer.jar" libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided" libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided" libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" % "provided" libraryDependencies += "com.databricks" %% "spark-avro" % "2.0.1" libraryDependencies += "org.apache.avro" % "avro" % "1.7.6" % "provided" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.0" libraryDependencies += "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13" libraryDependencies += "com.google.code.gson" % "gson" % "2.3" mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard case "log4j.properties" => MergeStrategy.discard case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first }