spark streaming讀取kafka資料,記錄offset
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.demo</groupId> <artifactId>spark-streaming-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>spark-streaming-demo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spark.version>1.6.2</spark.version> <mysql-connector.version>5.1.35</mysql-connector.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql-connector.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.31</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.stratio.datasource</groupId> <artifactId>spark-mongodb_2.11</artifactId> <version>0.12.0</version> </dependency> </dependencies> </project>
程式碼如下:
package com.fosun.spark_streaming_demo; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import javax.sql.DataSource; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; import org.apache.spark.streaming.kafka.HasOffsetRanges; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.kafka.OffsetRange; import com.alibaba.druid.pool.DruidDataSourceFactory; import kafka.common.TopicAndPartition; import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder; import scala.Tuple2; public class SparkstreamingOnDirectKafka { public static JavaStreamingContext createContext() throws Exception { SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingOnKafkaDirect"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1)); // jsc.checkpoint("/user/tenglq/checkpoint"); Map<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", "fonova-hadoop1.fx01:9092,fonova-hadoop2.fx01:9092"); kafkaParams.put("auto.offset.reset", "smallest"); Set<String> topics = new HashSet<String>(); topics.add("tlqtest3"); final Map<String, String> params = new HashMap<String, String>(); params.put("driverClassName", "com.mysql.jdbc.Driver"); params.put("url", "jdbc:mysql://172.16.100.49:3306/test_sparkstreaming"); params.put("username", "root"); params.put("password", "root123456"); Map<TopicAndPartition, Long> offsets = new HashMap<TopicAndPartition, Long>(); DataSource ds = DruidDataSourceFactory.createDataSource(params); Connection conn = ds.getConnection(); Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery("SELECT topic,partition,offset FROM kafka_offsets WHERE topic = 'tlqtest3'"); while (rs.next()) { TopicAndPartition topicAndPartition = new TopicAndPartition(rs.getString(1), rs.getInt(2)); offsets.put(topicAndPartition, rs.getLong(3)); } final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>(); JavaDStream<String> lines = null; if (offsets.isEmpty()) { JavaPairInputDStream<String, String> pairDstream = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); lines = pairDstream .transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { private static final long serialVersionUID = 1L; public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); return rdd; } }).flatMap(new FlatMapFunction<Tuple2<String, String>, String>() { private static final long serialVersionUID = 1L; public Iterable<String> call(Tuple2<String, String> t) throws Exception { return Arrays.asList(t._2); } }); } else { JavaInputDStream<String> dstream = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, offsets, new Function<MessageAndMetadata<String, String>, String>() { private static final long serialVersionUID = 1L; public String call(MessageAndMetadata<String, String> v1) throws Exception { return v1.message(); } }); lines = dstream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { private static final long serialVersionUID = 1L; public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); return rdd; } }); } lines.foreachRDD(new VoidFunction<JavaRDD<String>>() { private static final long serialVersionUID = 1L; public void call(JavaRDD<String> rdd) throws Exception { // 操作rdd List<String> map = rdd.collect(); String[] array = new String[map.size()]; System.arraycopy(map.toArray(new String[map.size()]), 0, array, 0, map.size()); List<String> l = Arrays.asList(array); Collections.sort(l); for (String value : l) { System.out.println(value); } // 儲存offset DataSource ds = DruidDataSourceFactory.createDataSource(params); Connection conn = ds.getConnection(); Statement stmt = conn.createStatement(); for (OffsetRange offsetRange : offsetRanges.get()) { ResultSet rs = stmt.executeQuery("select count(1) from kafka_offsets where topic='" + offsetRange.topic() + "' and partition='" + offsetRange.partition() + "'"); if (rs.next()) { int count = rs.getInt(1); if (count > 0) { stmt.executeUpdate("update kafka_offsets set offset ='" + offsetRange.untilOffset() + "' where topic='" + offsetRange.topic() + "' and partition='" + offsetRange.partition() + "'"); } else { stmt.execute("insert into kafka_offsets(topic,partition,offset) values('" + offsetRange.topic() + "','" + offsetRange.partition() + "','" + offsetRange.untilOffset() + "')"); } } rs.close(); } stmt.close(); conn.close(); } }); return jsc; } public static void main(String[] args) { JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { public JavaStreamingContext create() { try { return createContext(); } catch (Exception e) { throw new RuntimeException(e); } } }; // JavaStreamingContext jsc = // JavaStreamingContext.getOrCreate("/user/tenglq/checkpoint", factory); JavaStreamingContext jsc = factory.create(); jsc.start(); jsc.awaitTermination(); jsc.close(); } }
相關推薦
spark streaming讀取kafka資料,記錄offset
如下是pom.xml檔案<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocati
Spark Streaming接收kafka資料,輸出到HBase
需求 Kafka + SparkStreaming + SparkSQL + HBase 輸出TOP5的排名結果 排名作為Rowkey,word和count作為Column 實現 建立kafka生產者模擬隨機生產資料 object produ
spark streaming讀取kafka資料令丟失(二)
方式二: 方法二就是每次streaming 消費了kafka的資料後,將消費的kafka offsets更新到zookeeper。當你的程式掛掉或者升級的時候,就可以接著上次的讀取,實現資料的令丟失和 at most once。而且使用checkpoint的方
Spark-Streaming獲取kafka資料的兩種方式:Receiver與Direct的方
簡單理解為:Receiver方式是通過zookeeper來連線kafka佇列,Direct方式是直接連線到kafka的節點上獲取資料 回到頂部 使用Kafka的高層次Consumer API來實現。receiver從Kafka中獲取的資料都儲存在Spark Exec
學習筆記 --- Kafka Spark Streaming獲取Kafka資料 Receiver與Direct的區別
Receiver 使用Kafka的高層次Consumer API來實現 receiver從Kafka中獲取的資料都儲存在Spark Executor的記憶體中,然後Spark Streaming啟動的job會去處理那些資料 要啟用高可靠機制,讓資料零丟失,就必須啟用Spark
spark streaming 接收kafka資料寫入Hive分割槽表
直接上程式碼 object KafkaToHive{ def main(args: Array[String]){ val sparkConf = new SparkConf().setAppName("KafkaToHive") val sc = new SparkConte
SparkStreaming《三》讀取kafka資料,增量儲存在Mysql裡
一、SparkStreaming讀取kafka資料 package org.apache.spark.examples.streaming import java.sql.{PreparedStatement, Connection, DriverManager} import java.uti
Spark Streaming整合Kafka,Mysql,實時儲存資料到Mysql(直接讀取方式)
叢集分配如下: 192.168.58.11 spark01 192.168.58.12 spark02 192.168.58.13 spark03 spark版本:spark-2.1.0-bin-hadoop2.7 kafka版本:kafka_2.11-2.0.0 Spark St
spark Streaming 直接消費Kafka資料,儲存到 HDFS 實戰程式設計實踐
最近在學習spark streaming 相關知識,現在總結一下 主要程式碼如下 def createStreamingContext():StreamingContext ={ val sparkConf = new SparkConf().setAppName("
Spark Streaming從Kafka中獲取資料,並進行實時單詞統計,統計URL出現的次數
1、建立Maven專案 2、啟動Kafka 3、編寫Pom檔案 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.or
Spark Streaming從Kafka中獲取數據,並進行實時單詞統計,統計URL出現的次數
scrip 發送消息 rip mark 3.2 umt 過程 bject ttr 1、創建Maven項目 創建的過程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2、啟動Kafka A:安裝ka
kafka(六):與spark streaming對接,spark streaming接收kafka資料來源
1.功能實現 spark streaming從kafka接收資料,有兩種方式,receiver和direct兩種方式。 2.pom依賴 針對kafka_2.10-0.8.2.1版本 <!-- https
大資料學習之路97-kafka直連方式(spark streaming 整合kafka 0.10版本)
我們之前SparkStreaming整合Kafka的時候用的是傻瓜式的方式-----createStream,但是這種方式的效率很低。而且在kafka 0.10版本之後就不再提供了。 接下來我們使用Kafka直連的方式,這種方式其實是呼叫Kafka底層的消費資料的API,我們知道,越底層的東
Spark Streaming整合Kafka,Mysql,實時儲存資料到Mysql(基於Receiver的方式)
叢集分配如下: 192.168.58.11 spark01 192.168.58.12 spark02 192.168.58.13 spark03 spark版本:spark-2.1.0-bin-hadoop2.7 kafka版本:kafka_2.11-2.0.0 Spark St
Spark Streaming消費Kafka的資料進行統計
流處理平臺: 這裡是第四步的實現: Spark Streaming整合Kafka採用的是Receiver-based,另一種方式Direct Approach,稍作修改就行。 package spark import org.apache.spark.SparkConf impo
Spark Streaming消費Kafka Direct方式資料零丟失實現
一、概述 上次寫這篇文章文章的時候,Spark還是1.x,kafka還是0.8x版本,轉眼間spark到了2.x,kafka也到了2.x,儲存offset的方式也發生了改變,筆者根據上篇文章和網上文章,將offset儲存到Redis,既保證了併發也保證了資料不丟失,經過測試,有效。 二、
Spark Streaming結合 Kafka 兩種不同的資料接收方式比較
DirectKafkaInputDStream 只在 driver 端接收資料,所以繼承了 InputDStream,是沒有 receivers 的 在結合 Spark Streaming 及 Kafka 的實時應用中,我們通常使用以下兩個 API 來獲取最初的 DStream(這裡不關心這兩個 API 的
spark讀取kafka資料(兩種方式比較及flume配置檔案)
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.channels.c1.type = memory a1.channels.c1.capacity
Flume+Kafka+Spark Streaming實現大資料實時流式資料採集
大資料實時流式資料處理是大資料應用中最為常見的場景,與我們的生活也息息相關,以手機流量實時統計來說,它總是能夠實時的統計出使用者的使用的流量,在第一時間通知使用者流量的使用情況,並且最為人性化的為使用者提供各種優惠的方案,如果採用離線處理,那麼等到使用者流量超標
spark讀取kafka資料寫入hbase
package com.prince.demo.test import java.util.UUID import com.typesafe.config.{Config, ConfigFactory} import org.apache.hadoop.hbase.HBa