1. 程式人生 > >spark streaming從指定offset處消費Kafka數據

spark streaming從指定offset處消費Kafka數據

tpc asi 4.2 nes 配置 sof 我們 erl examples

 spark streaming從指定offset處消費Kafka數據
2017-06-13 15:19 770人閱讀 評論(2) 收藏 舉報
 分類: spark(5)  

原文地址:http://blog.csdn.net/high2011/article/details/53706446

      首先很感謝原文作者,看到這篇文章我少走了很多彎路,轉載此文章是為了保留一份供復習用,請大家支持原作者,移步到上面的連接去看,謝謝


一、情景:當Spark streaming程序意外退出時,數據仍然再往Kafka中推送,然而由於Kafka默認是從latest的offset讀取,這會導致數據丟失。為了避免數據丟失,那麽我們需要記錄每次消費的offset,以便下次檢查並且從指定的offset開始讀取
二、環境:kafka
-0.9.0、Spark-1.6.0、jdk-1.7、Scala-2.10.5、idea16 三、實現代碼: 1、引入spark和kafka的相關依賴包 [html] view plain copy <?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd
"> <modelVersion>4.0.0</modelVersion> <groupId>com.ngaa</groupId> <artifactId>test-my</artifactId> <version>1.0-SNAPSHOT</version> <inceptionYear>2008</inceptionYear> <properties> <project.build.sourceEncoding>UTF-8
</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <!--add maven release--> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <encoding>UTF-8</encoding> <!--scala版本--> <scala.version>2.10.5</scala.version> <!--測試機器上的scala版本--> <test.scala.version>2.11.7</test.scala.version> <jackson.version>2.3.0</jackson.version> <!--slf4j版本--> <slf4j-version>1.7.20</slf4j-version> <!--cdh-spark--> <spark.cdh.version>1.6.0-cdh5.8.0</spark.cdh.version> <spark.streaming.cdh.version>1.6.0-cdh5.8.0</spark.streaming.cdh.version> <kafka.spark.cdh.version>1.6.0-cdh5.8.0</kafka.spark.cdh.version> <!--cdh-hadoop--> <hadoop.cdh.version>2.6.0-cdh5.8.0</hadoop.cdh.version> <!--http client必需要兼容CDH中的hadoop版本(cd /opt/cloudera/parcels/CDH/lib/hadoop/lib)--> <httpclient.version>4.2.5</httpclient.version> <!--http copre--> <httpcore.version>4.2.5</httpcore.version> <!--fastjson--> <fastjson.version>1.1.39</fastjson.version> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> <!--配置依賴庫地址(用於加載CDH依賴的jar包) --> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <!--fastjson--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <!--httpclient--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>${httpclient.version}</version> </dependency> <!--http core--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>${httpcore.version}</version> </dependency> <!--slf4j--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j-version}</version> </dependency> <!--hadoop--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.cdh.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.cdh.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.cdh.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <!--spark scala--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${jackson.version}</version> </dependency> <!--spark streaming和kafka的相關包--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.streaming.cdh.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>${kafka.spark.cdh.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!--引入windows本地庫的spark包--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-assembly_2.10</artifactId> <version>${spark.cdh.version}</version> <scope>system</scope> <systemPath>D:/crt_send_document/spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar</systemPath> </dependency> <!--引入測試環境linux本地庫的spark包--> <!--<dependency>--> <!--<groupId>org.apache.spark</groupId>--> <!--<artifactId>spark-assembly_2.10</artifactId>--> <!--<version>${spark.cdh.version}</version>--> <!--<scope>system</scope>--> <!--<systemPath>/opt/cloudera/parcels/CDH/lib/spark/lib/spark-examples-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar--> <!--</systemPath>--> <!--</dependency>--> <!--引入中央倉庫的spark包--> <!--<dependency>--> <!--<groupId>org.apache.spark</groupId>--> <!--<artifactId>spark-assembly_2.10</artifactId>--> <!--<version>${spark.cdh.version}</version>--> <!--</dependency>--> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-server-web-proxy --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-web-proxy</artifactId> <version>2.6.0-cdh5.8.0</version> </dependency> </dependencies> <!--maven打包--> <build> <finalName>test-my</finalName> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.7</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project> 2、新建測試類 [java] view plain copy import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.slf4j.LoggerFactory /** * Created by yangjf on 2016/12/18 * Update date: * Time: 11:10 * Describle :從指定偏移量讀取kafka數據 * Result of Test: * Command: * Email: [email protected] */ object ReadBySureOffsetTest { val logger = LoggerFactory.getLogger(ReadBySureOffsetTest.getClass) def main(args: Array[String]) { //設置打印日誌級別 Logger.getLogger("org.apache.kafka").setLevel(Level.ERROR) Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR) Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) logger.info("測試從指定offset消費kafka的主程序開始") if (args.length < 1) { System.err.println("Your arguments were " + args.mkString(",")) System.exit(1) logger.info("主程序意外退出") } //hdfs://hadoop1:8020/user/root/spark/checkpoint val Array(checkpointDirectory) = args logger.info("checkpoint檢查:" + checkpointDirectory) val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => { createContext(checkpointDirectory) }) logger.info("streaming開始啟動") ssc.start() ssc.awaitTermination() } def createContext(checkpointDirectory: String): StreamingContext = { //獲取配置 val brokers = "hadoop3:9092,hadoop4:9092" val topics = "20161218a" //默認為5秒 val split_rdd_time = 8 // 創建上下文 val sparkConf = new SparkConf() .setAppName("SendSampleKafkaDataToApple").setMaster("local[2]") .set("spark.app.id", "streaming_kafka") val ssc = new StreamingContext(sparkConf, Seconds(split_rdd_time)) ssc.checkpoint(checkpointDirectory) // 創建包含brokers和topic的直接kafka流 val topicsSet: Set[String] = topics.split(",").toSet //kafka配置參數 val kafkaParams: Map[String, String] = Map[String, String]( "metadata.broker.list" -> brokers, "group.id" -> "apple_sample", "serializer.class" -> "kafka.serializer.StringEncoder" // "auto.offset.reset" -> "largest" //自動將偏移重置為最新偏移(默認) // "auto.offset.reset" -> "earliest" //自動將偏移重置為最早的偏移 // "auto.offset.reset" -> "none" //如果沒有為消費者組找到以前的偏移,則向消費者拋出異常 ) /** * 從指定位置開始讀取kakfa數據 * 註意:由於Exactly Once的機制,所以任何情況下,數據只會被消費一次! * 指定了開始的offset後,將會從上一次Streaming程序停止處,開始讀取kafka數據 */ val offsetList = List((topics, 0, 22753623L),(topics, 1, 327041L)) //指定topic,partition_no,offset val fromOffsets = setFromOffsets(offsetList) //構建參數 val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //構建MessageAndMetadata //使用高級API從指定的offset開始消費,欲了解詳情, //請進入"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$"查看 val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) //數據操作 messages.foreachRDD(mess => { //獲取offset集合 val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges mess.foreachPartition(lines => { lines.foreach(line => { val o: OffsetRange = offsetsList(TaskContext.get.partitionId) logger.info("++++++++++++++++++++++++++++++此處記錄offset+++++++++++++++++++++++++++++++++++++++") logger.info(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") logger.info("+++++++++++++++++++++++++++++++此處消費數據操作++++++++++++++++++++++++++++++++++++++") logger.info("The kafka line is " + line) }) }) }) ssc } //構建Map def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = { var fromOffsets: Map[TopicAndPartition, Long] = Map() for (offset <- list) { val tp = TopicAndPartition(offset._1, offset._2)//topic和分區數 fromOffsets += (tp -> offset._3) // offset位置 } fromOffsets } } 四、參考文檔: 1、spark API http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$ 2、Kafka官方配置說明:http://kafka.apache.org/documentation.html#configuration 3、Kafka SampleConsumer:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example 4、Spark streaming 消費遍歷offset說明:http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html 5、Kafka官方API說明:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html 註:以上測試通過,可以根據需要修改。如有疑問,請留言!

spark streaming從指定offset處消費Kafka數據