1. 程式人生 > >spark streaming讀取kafka資料,記錄offset

spark streaming讀取kafka資料,記錄offset

如下是pom.xml檔案
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.demo</groupId>
	<artifactId>spark-streaming-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>spark-streaming-demo</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<spark.version>1.6.2</spark.version>
		<mysql-connector.version>5.1.35</mysql-connector.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>${spark.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming-kafka_2.10</artifactId>
			<version>${spark.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<version>${spark.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-sql_2.10</artifactId>
			<version>${spark.version}</version>
		</dependency>

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>${mysql-connector.version}</version>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>druid</artifactId>
			<version>1.0.31</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>com.stratio.datasource</groupId>
			<artifactId>spark-mongodb_2.11</artifactId>
			<version>0.12.0</version>
		</dependency>
	</dependencies>
</project>

程式碼如下:

package com.fosun.spark_streaming_demo;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import javax.sql.DataSource;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;

import com.alibaba.druid.pool.DruidDataSourceFactory;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class SparkstreamingOnDirectKafka {
	public static JavaStreamingContext createContext() throws Exception {
		SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SparkStreamingOnKafkaDirect");
		JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
		// jsc.checkpoint("/user/tenglq/checkpoint");

		Map<String, String> kafkaParams = new HashMap<String, String>();
		kafkaParams.put("metadata.broker.list", "fonova-hadoop1.fx01:9092,fonova-hadoop2.fx01:9092");
		kafkaParams.put("auto.offset.reset", "smallest");
		Set<String> topics = new HashSet<String>();
		topics.add("tlqtest3");

		final Map<String, String> params = new HashMap<String, String>();
		params.put("driverClassName", "com.mysql.jdbc.Driver");
		params.put("url", "jdbc:mysql://172.16.100.49:3306/test_sparkstreaming");
		params.put("username", "root");
		params.put("password", "root123456");

		Map<TopicAndPartition, Long> offsets = new HashMap<TopicAndPartition, Long>();
		DataSource ds = DruidDataSourceFactory.createDataSource(params);
		Connection conn = ds.getConnection();
		Statement stmt = conn.createStatement();
		ResultSet rs = stmt.executeQuery("SELECT topic,partition,offset FROM kafka_offsets WHERE topic = 'tlqtest3'");
		while (rs.next()) {
			TopicAndPartition topicAndPartition = new TopicAndPartition(rs.getString(1), rs.getInt(2));
			offsets.put(topicAndPartition, rs.getLong(3));
		}

		final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();

		JavaDStream<String> lines = null;

		if (offsets.isEmpty()) {
			JavaPairInputDStream<String, String> pairDstream = KafkaUtils.createDirectStream(jsc, String.class,
					String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
			lines = pairDstream
					.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
						private static final long serialVersionUID = 1L;

						public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
							OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
							offsetRanges.set(offsets);
							return rdd;
						}
					}).flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {
						private static final long serialVersionUID = 1L;

						public Iterable<String> call(Tuple2<String, String> t) throws Exception {
							return Arrays.asList(t._2);
						}
					});
		} else {
			JavaInputDStream<String> dstream = KafkaUtils.createDirectStream(jsc, String.class, String.class,
					StringDecoder.class, StringDecoder.class, String.class, kafkaParams, offsets,
					new Function<MessageAndMetadata<String, String>, String>() {

						private static final long serialVersionUID = 1L;

						public String call(MessageAndMetadata<String, String> v1) throws Exception {
							return v1.message();
						}
					});
			lines = dstream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
				private static final long serialVersionUID = 1L;

				public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
					OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
					offsetRanges.set(offsets);
					return rdd;
				}
			});

		}

		lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
			private static final long serialVersionUID = 1L;

			public void call(JavaRDD<String> rdd) throws Exception {
				// 操作rdd
				List<String> map = rdd.collect();
				String[] array = new String[map.size()];
				System.arraycopy(map.toArray(new String[map.size()]), 0, array, 0, map.size());
				List<String> l = Arrays.asList(array);
				Collections.sort(l);
				for (String value : l) {
					System.out.println(value);
				}

				// 儲存offset
				DataSource ds = DruidDataSourceFactory.createDataSource(params);
				Connection conn = ds.getConnection();
				Statement stmt = conn.createStatement();
				for (OffsetRange offsetRange : offsetRanges.get()) {
					ResultSet rs = stmt.executeQuery("select count(1) from kafka_offsets where topic='"
							+ offsetRange.topic() + "' and partition='" + offsetRange.partition() + "'");
					if (rs.next()) {
						int count = rs.getInt(1);
						if (count > 0) {
							stmt.executeUpdate("update kafka_offsets set offset ='" + offsetRange.untilOffset()
									+ "'  where topic='" + offsetRange.topic() + "' and partition='"
									+ offsetRange.partition() + "'");
						} else {
							stmt.execute("insert into kafka_offsets(topic,partition,offset) values('"
									+ offsetRange.topic() + "','" + offsetRange.partition() + "','"
									+ offsetRange.untilOffset() + "')");
						}
					}

					rs.close();
				}

				stmt.close();
				conn.close();
			}

		});

		return jsc;
	}

	public static void main(String[] args) {
		JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
			public JavaStreamingContext create() {
				try {
					return createContext();
				} catch (Exception e) {
					throw new RuntimeException(e);
				}
			}
		};

		// JavaStreamingContext jsc =
		// JavaStreamingContext.getOrCreate("/user/tenglq/checkpoint", factory);

		JavaStreamingContext jsc = factory.create();

		jsc.start();

		jsc.awaitTermination();
		jsc.close();

	}
}


相關推薦

spark streaming讀取kafka資料記錄offset

如下是pom.xml檔案<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocati

Spark Streaming接收kafka資料輸出到HBase

需求 Kafka + SparkStreaming + SparkSQL + HBase 輸出TOP5的排名結果 排名作為Rowkey,word和count作為Column 實現 建立kafka生產者模擬隨機生產資料 object produ

spark streaming讀取kafka資料令丟失(二)

方式二: 方法二就是每次streaming 消費了kafka的資料後,將消費的kafka offsets更新到zookeeper。當你的程式掛掉或者升級的時候,就可以接著上次的讀取,實現資料的令丟失和 at most once。而且使用checkpoint的方

Spark-Streaming獲取kafka資料的兩種方式:Receiver與Direct的方

 簡單理解為:Receiver方式是通過zookeeper來連線kafka佇列,Direct方式是直接連線到kafka的節點上獲取資料 回到頂部 使用Kafka的高層次Consumer API來實現。receiver從Kafka中獲取的資料都儲存在Spark Exec

學習筆記 --- Kafka Spark Streaming獲取Kafka資料 Receiver與Direct的區別

Receiver 使用Kafka的高層次Consumer API來實現 receiver從Kafka中獲取的資料都儲存在Spark Executor的記憶體中,然後Spark Streaming啟動的job會去處理那些資料 要啟用高可靠機制,讓資料零丟失,就必須啟用Spark

spark streaming 接收kafka資料寫入Hive分割槽表

直接上程式碼 object KafkaToHive{ def main(args: Array[String]){ val sparkConf = new SparkConf().setAppName("KafkaToHive") val sc = new SparkConte

SparkStreaming《三》讀取kafka資料增量儲存在Mysql裡

一、SparkStreaming讀取kafka資料 package org.apache.spark.examples.streaming import java.sql.{PreparedStatement, Connection, DriverManager} import java.uti

Spark Streaming整合KafkaMysql實時儲存資料到Mysql(直接讀取方式)

叢集分配如下: 192.168.58.11 spark01 192.168.58.12 spark02 192.168.58.13 spark03 spark版本:spark-2.1.0-bin-hadoop2.7 kafka版本:kafka_2.11-2.0.0 Spark St

spark Streaming 直接消費Kafka資料儲存到 HDFS 實戰程式設計實踐

最近在學習spark streaming 相關知識,現在總結一下 主要程式碼如下 def createStreamingContext():StreamingContext ={ val sparkConf = new SparkConf().setAppName("

Spark StreamingKafka中獲取資料並進行實時單詞統計統計URL出現的次數

1、建立Maven專案 2、啟動Kafka 3、編寫Pom檔案 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.or

Spark StreamingKafka中獲取數據並進行實時單詞統計統計URL出現的次數

scrip 發送消息 rip mark 3.2 umt 過程 bject ttr 1、創建Maven項目 創建的過程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2、啟動Kafka A:安裝ka

kafka(六):與spark streaming對接spark streaming接收kafka資料來源

1.功能實現 spark streaming從kafka接收資料,有兩種方式,receiver和direct兩種方式。 2.pom依賴 針對kafka_2.10-0.8.2.1版本         <!-- https

資料學習之路97-kafka直連方式(spark streaming 整合kafka 0.10版本)

我們之前SparkStreaming整合Kafka的時候用的是傻瓜式的方式-----createStream,但是這種方式的效率很低。而且在kafka 0.10版本之後就不再提供了。 接下來我們使用Kafka直連的方式,這種方式其實是呼叫Kafka底層的消費資料的API,我們知道,越底層的東

Spark Streaming整合KafkaMysql實時儲存資料到Mysql(基於Receiver的方式)

叢集分配如下: 192.168.58.11 spark01 192.168.58.12 spark02 192.168.58.13 spark03 spark版本:spark-2.1.0-bin-hadoop2.7 kafka版本:kafka_2.11-2.0.0 Spark St

Spark Streaming消費Kafka資料進行統計

流處理平臺: 這裡是第四步的實現: Spark Streaming整合Kafka採用的是Receiver-based,另一種方式Direct Approach,稍作修改就行。 package spark import org.apache.spark.SparkConf impo

Spark Streaming消費Kafka Direct方式資料零丟失實現

一、概述 上次寫這篇文章文章的時候,Spark還是1.x,kafka還是0.8x版本,轉眼間spark到了2.x,kafka也到了2.x,儲存offset的方式也發生了改變,筆者根據上篇文章和網上文章,將offset儲存到Redis,既保證了併發也保證了資料不丟失,經過測試,有效。 二、

Spark Streaming結合 Kafka 兩種不同的資料接收方式比較

DirectKafkaInputDStream 只在 driver 端接收資料,所以繼承了 InputDStream,是沒有 receivers 的 在結合 Spark Streaming 及 Kafka 的實時應用中,我們通常使用以下兩個 API 來獲取最初的 DStream(這裡不關心這兩個 API 的

spark讀取kafka資料(兩種方式比較及flume配置檔案)

a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.channels.c1.type = memory a1.channels.c1.capacity

Flume+Kafka+Spark Streaming實現大資料實時流式資料採集

大資料實時流式資料處理是大資料應用中最為常見的場景,與我們的生活也息息相關,以手機流量實時統計來說,它總是能夠實時的統計出使用者的使用的流量,在第一時間通知使用者流量的使用情況,並且最為人性化的為使用者提供各種優惠的方案,如果採用離線處理,那麼等到使用者流量超標

spark讀取kafka資料寫入hbase

package com.prince.demo.test import java.util.UUID import com.typesafe.config.{Config, ConfigFactory} import org.apache.hadoop.hbase.HBa