1. 程式人生 > >spark-streaming 程式設計(三)連線kafka消費資料

spark-streaming 程式設計(三)連線kafka消費資料

spark-streaming支援kafka消費,有以下方式:
這裡寫圖片描述

我實驗的版本是kafka0.10,試驗的是spark-streaming-kafka-0.8的接入方式。另外,spark-streaming-kafka-0.10的分支並沒有研究。

spark-streaming-kafka-0.8的方式支援kafka0.8.2.1以及更高的版本。有兩種方式:
(1)Receiver Based Approach:基於kafka high-level consumer api,有一個Receiver負責接收資料到執行器
(2)Direct Approcah:基於kafka simple consumer api,沒有receiver。

mavne專案需要新增依賴

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>

Reviced based approach程式碼:使用方法見註釋

package com.lgh.sparkstreaming

import org.apache
.spark.SparkConf import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils /** * Created by Administrator on 2017/8/23. */ object KafkaWordCount { def main(args: Array[String]): Unit = { if (args.length < 4) { System.err.println
("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } //引數分別為 zk地址,消費者group名,topic名 多個的話,分隔 ,執行緒數 val Array(zkQuorum, group, topics, numThreads) = args //setmaster,local是除錯模式使用 val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") //Map型別儲存的是 key: topic名字 values: 讀取該topic的消費者的分割槽數 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap //引數分別為StreamingContext,kafka的zk地址,消費者group,Map型別 val kafkamessage = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap) //_._2取出kafka的實際訊息流 val lines=kafkamessage.map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() ssc.awaitTermination() } }

Direct approach:

package com.lgh.sparkstreaming

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

/**
  * Created by Administrator on 2017/8/23.
  */
object DirectKafkaWordCount {

    def main(args: Array[String]) {
      if (args.length < 2) {
        System.err.println(s"""
                              |Usage: DirectKafkaWordCount <brokers> <topics>
                              |  <brokers> is a list of one or more Kafka brokers
                              |  <topics> is a list of one or more kafka topics to consume from
                              |
        """.stripMargin)
        System.exit(1)
      }
     //borkers : kafka的broker 列表,多個的話以逗號分隔
      //topics: kafka topic,多個的話以逗號分隔
      val Array(brokers, topics) = args

      // Create context with 2 second batch interval
      val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
      val ssc = new StreamingContext(sparkConf, Seconds(2))

      // Create direct kafka stream with brokers and topics
      val topicsSet = topics.split(",").toSet
      val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
      val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
        ssc, kafkaParams, topicsSet)

      // Get the lines, split them into words, count the words and print
      val lines = messages.map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
      wordCounts.print()

      // Start the computation
      ssc.start()
      ssc.awaitTermination()

  }

}

關於這兩種方式的區別

1.Simplified Parallelism
Direct 方式將會建立跟kafka分割槽一樣多的RDD partiions,並行的讀取kafka topic的partition資料。kafka和RDD partition將會有一對一的對應關係。
2.Efficiency
Receiver-based Approach需要啟用WAL才能保證消費不丟失資料
,效率比較低
3.Exactly-once semantics
Receiver-based Approach使用kafka high-level consumer api,儲存消費者offset在zookeeper中,跟Write Ahead Log配合使用,能夠實現至少消費一次語義。
Direct Approach 使用kafka simple consumer api,跟蹤offset資訊儲存在spark checkpoint中。能夠實現資料有且只消費一次語義。

相關推薦

spark-streaming 程式設計()連線kafka消費資料

spark-streaming支援kafka消費,有以下方式: 我實驗的版本是kafka0.10,試驗的是spark-streaming-kafka-0.8的接入方式。另外,spark-streaming-kafka-0.10的分支並沒有研究。 spar

Spark Streaming程式設計指南(

DStreams轉換(Transformation) 和RDD類似,轉換中允許輸入DStream中的資料被修改。DStream支援很多Spark RDD上的轉換。常用的轉換如下。 轉換 含義 map(func) 將源DS

Spark學習(拾叄)- Spark Streaming整合Flume&Kafka

文章目錄 處理流程畫圖剖析 日誌產生器開發並結合log4j完成日誌的輸出 使用Flume採集Log4j產生的日誌 使用KafkaSInk將Flume收集到的資料輸出到Kafka Spark Streaming消費Kafka的

Spark Streaming部分

updateStateByKey運算元 需求,統計到目前為止,累計出現的單詞個數(需要保持之前的狀態) def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {

Spark官方文件》Spark Streaming程式設計指南

spark-1.6.1 [原文地址] Spark Streaming程式設計指南 概覽 Spark Streaming是對核心Spark API的一個擴充套件,它能夠實現對實時資料流的流式處理,並具有很好的可擴充套件性、高吞吐量和容錯性。Spark Streaming支援從多種資料來源提取資

spark streaming程式因叢集kafka版本不一致造成ZkUtils類無法更新offset解決方案

問題: 因為CDH叢集環境問題,我spark streaming程式的依賴就依照其版本來進行,但這就遇到一個問題,叢集spark2支援的kafka版本是0.9.0,而我們程式操作zookeeper的ZkUtils類就不相容了。 解決方案: 重新KafkaCluster類,

Spark2.1.0文件:Spark Streaming 程式設計指南(上)

本文翻譯自spark官方文件,僅翻譯了Scala API部分,目前版本為2.1.0,如有疏漏錯誤之處請多多指教。 原文地址:http://spark.apache.org/docs/latest/streaming-programming-guide.html 因文件篇幅較

spark-streaming 程式設計(二) word count單詞計數統計

就那官方的例子來說明,程式碼基本上有註釋 package com.lgh.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLe

spark streaming小實戰之kafka讀取與儲存

本次小實戰主要介紹一下spark streaming如何讀取kafka資料涉及理論部分在這就不多說了,自己也剛入門先說下需求待處理日誌格式為ouMrq2r_aU1mtKRTmQclGo1UzY,3251210381,2018/11/29 13:46,上海,上海,210.2.2

關於Spark Streaming 如何進行commit kafka的offset

收到就提交提交方式 記為 X:接收到資料就會直接commit,假如某個batch失敗,重啟job,則消費會從上次commit的offset消費,所以會導致失敗的那個batch的部分資料丟失。 batch結束才commit的方式 記為 Y:  會有訊息重複消費的問題。如果

spark streaming 通過zookeeper讀取kafka上的資料

maven 依賴如下 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-c

#########好####### pyspark-Spark Streaming程式設計指南

參考: 1、http://spark.apache.org/docs/latest/streaming-programming-guide.html 2、https://github.com/apache/spark/tree/v2.2.0 Spark Streami

Spark Streaming 程式設計入門指南

Spark Streaming 是核心Spark API的擴充套件,可實現實時資料流的可伸縮,高吞吐量,容錯流處理。可以從許多資料來源(例如Kafka,Flume,Kinesis或TCP sockets)中提取資料,並且可以使用複雜的演算法處理資料,這些演算法用高階函式表示,如map、reduce、join和

Kafka:ZK+Kafka+Spark Streaming集群環境搭建()安裝spark2.2.1

node word clas 執行 選擇 dir clust 用戶名 uil 如何配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如

java8下spark-streaming結合kafka程式設計spark 2.3 kafka 0.10)

前面有說道spark-streaming的簡單demo,也有說到kafka成功跑通的例子,這裡就結合二者,也是常用的使用之一。 1.相關元件版本 首先確認版本,因為跟之前的版本有些不一樣,所以才有必要記錄下,另外仍然沒有使用scala,使用java8,spark 2.0.0,kafk

Spark Streaming實時流處理筆記(5)—— Kafka API 程式設計

1 新建 Maven工程 pom檔案 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLo

基於Python的Spark Streaming+Kafka程式設計實踐及調優總結

說明Spark Streaming的原理說明的文章很多,這裡不做介紹。本文主要介紹使用Kafka作為資料來源的程式設計模型,編碼實踐,以及一些優化說明演示環境Spark:1.6Kafka:kafka_2.11-0.9.0.1實現語言:Python程式設計模型目前Spark S

基於Python的Spark Streaming+Kafka程式設計實踐

說明 Spark Streaming的原理說明的文章很多,這裡不做介紹。本文主要介紹使用Kafka作為資料來源的程式設計模型,編碼實踐,以及一些優化說明 演示環境 Spark:1.6 Kafka:kafka_2.11-0.9.0.1 實現語言:P

java8下spark-streaming結合kafka程式設計spark 2.0 & kafka 0.10)

1.相關元件版本 首先確認版本,因為跟之前的版本有些不一樣,所以才有必要記錄下,另外仍然沒有使用scala,使用java8,spark 2.0.0,kafka 0.10。 2.引入maven包 網上找了一些結合的例子,但是跟我當前版本不一樣,所以根本就

spark----基於Python的Spark Streaming+Kafka程式設計實踐

來源:http://blog.csdn.net/eric_sunah/article/details/54096057?utm_source=tuicool&utm_medium=referral 說明 Spark Streaming的原理說明的文章很多,這裡不