1. 程式人生 > >第91講:sparkStreaming基於kafka的Direct詳解

第91講:sparkStreaming基於kafka的Direct詳解

有興趣想學習國內整套Spark+Spark Streaming+Machine learning最頂級課程的,可加我qq  471186150。共享視訊,價效比超高!

1:Direct方式特點:

1)Direct的方式是會直接操作kafka底層的元資料資訊,這樣如果計算失敗了,可以把資料重新讀一下,重新處理。即資料一定會被處理。拉資料,是RDD在執行的時候直接去拉資料。

2)由於直接操作的是kafka,kafka就相當於你底層的檔案系統。這個時候能保證嚴格的事務一致性,即一定會被處理,而且只會被處理一次。而Receiver的方式則不能保證,因為Receiver和ZK中的資料可能不同步,spark Streaming可能會重複消費資料,這個調優可以解決,但顯然沒有Direct方便。而Direct api直接是操作kafka的,spark streaming自己負責追蹤消費這個資料的偏移量或者offset,並且自己儲存到checkpoint,所以它的資料一定是同步的,一定不會被重複。即使重啟也不會重複,因為checkpoint了,但是程序升級的時候,不能讀取原先的checkpoint,面對升級checkpoint無效這個問題,怎麼解決呢?升級的時候讀取我指定的備份就可以了,即手動的指定checkpoint也是可以的,這就再次完美的確保了事務性,有且僅有一次的事務機制。那麼怎麼手動checkpoint呢?構建SparkStreaming的時候,有getorCreate這個api,它就會獲取checkpoint的內容,具體指定下這個checkpoint在哪就好了。或者如下圖:


而如果從checkpoint恢復後,如果資料累積太多處理不過來,怎麼辦?1)限速2)增強機器的處理能力3)放到資料緩衝池中。

3)由於底層是直接讀資料,沒有所謂的Receiver,直接是週期性(Batch Intervel)的查詢kafka,處理資料的時候,我們會使用基於kafka原生的Consumer  api來獲取kafka中特定範圍(offset範圍)中的資料。這個時候,Direct Api訪問kafka帶來的一個顯而易見的效能上的好處就是,如果你要讀取多個partition,Spark也會建立RDD的partition,這個時候RDD的partition和kafka的partition是一致的。而Receiver的方式,這2個partition是沒任何關係的。這個優勢是你的RDD,其實本質上講在底層讀取kafka的時候,kafka的partition就相當於原先hdfs上的一個block。這就符合了資料本地性。RDD和kafka資料都在這邊。所以讀資料的地方,處理資料的地方和驅動資料處理的程式都在同樣的機器上,這樣就可以極大的提高效能。不足之處是由於RDD和kafka的patition是一對一的,想提高並行度就會比較麻煩。提高並行度還是repartition,即重新分割槽,因為產生shuffle,很耗時。這個問題,以後也許新版本可以自由配置比例,不是一對一。因為提高並行度,可以更好的利用叢集的計算資源,這是很有意義的。

4)不需要開啟wal機制,從資料零丟失的角度來看,極大的提升了效率,還至少能節省一倍的磁碟空間。從kafka獲取資料,比從hdfs獲取資料,因為zero copy的方式,速度肯定更快。

2:實戰部分

kafka + spark streaming 叢集

前提:

spark 安裝成功,spark 1.6.0

zookeeper 安裝成功 

kafka 安裝成功

步驟:

1:先啟動三臺機器上的ZK,然後三臺機器同樣啟動kafka,

2:在kafka上建立topic  test

3:在worker1啟動kafka 生產者:

[email protected]:/usr/local/kafka_2.10-0.9.0.1# bin/kafka-console-producer.sh --broker-list localhost:9092

 --topic test

 

worker2中啟動消費者:

[email protected]:/usr/local/kafka_2.10-0.9.0.1# bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test

 

生產者生產的訊息,消費者可以消費到。說明kafka叢集沒問題。進入下一步。

master中啟動spark-shell

./spark-shell --master local[2] --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0,org.apache.kafka:kafka_2.10:0.8.2.1

筆者用的spark 是 1.6.0 ,讀者根據自己版本調整。

 

shell中的邏輯程式碼(wordcount:

import org.apache.spark.SparkConf

import kafka.serializer.StringDecoder

import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.spark.streaming.{Durations, StreamingContext}

val ssc = new StreamingContext(scDurations.seconds(5))KafkaUtils.createDirectStream[StringStringStringDecoderStringDecoder](sscMap("bootstrap.servers" -> "master:2181,worker1:2181,worker2:2181""metadata.broker.list" -> "master:9092,worker1:9092,worker2:9092""group.id" -> "StreamingWordCountSelfKafkaDirectStreamScala")Set("test")).map(t => t._2).flatMap(_.toString.split(" ")).map((_1)).reduceByKey(_ + _).print()ssc.start()

生產者再生產訊息:

 

spark streaming的反應:

 

返回worker2檢視消費者

 

可見,groupId不一樣,相互之間沒有互斥。

上述是使用 createDirectStream 方式連結kafka,實際使用中,其實就是和Receiver在api以及api中引數上有不同,其它基本一樣

參考:

http://spark.apache.org/docs/latest/streaming-kafka-integration.html


相關推薦

91sparkStreaming基於kafka的Direct

有興趣想學習國內整套Spark+Spark Streaming+Machine learning最頂級課程的,可加我qq  471186150。共享視訊,價效比超高! 1:Direct方式特點: 1)Direct的方式是會直接操作kafka底層的元資料資訊,這樣如果計算

91SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密 java.lang.ClassNotFoundException 踩坑解決問題詳細內幕版本

第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密    /* * *王家林老師授課http://weibo.com/ilovepains */  每天晚上20:00YY頻道現場授課頻道68917580 1、作業內容:SparkS

20 | 區塊鏈項目比特股BTS

代理服 work 直接 使用 代碼 for doc 地址 投資者   前面兩篇我們都聊了智能合約的話題,智能合約可以說是當下區塊鏈最熱的概念了,在多數人眼裏,沒有智能合約的區塊鏈價值幾乎就大打折扣。   那麽說,實際上是不是這樣呢,我們今天就來聊聊一個並沒有開放式智能合約

JavaSE七十一Target及ElementType

1. 繼續上一講內容,複習上一講內容我們講到了 Retention以及RetentionPolicy。這兩個都是成對出現的,因為Retention裡面包含了一個屬性value型別為 RetentionPolicy 列舉型別,它有三個列舉CLASS、RUNTIME、SOURC

JavaSE七十Retention及RetentionPolicy

1. 繼續上一講內容,上一講的內容講到自定義註解。 package com.ahuier.annotation; public @interface AnnotationTest { //注

Node入門教程(8)六章path 模塊

format QQ 調用 保留 微軟 posix interface join 結果 path 模塊詳解 path 模塊提供了一些工具函數,用於處理文件與目錄的路徑。由於windows和其他系統之間路徑不統一,path模塊還專門做了相關處理,屏蔽了彼此之間的差異。 可移

【搞定Java併發程式設計】7篇Java記憶體模型

上一篇:ThreadLocal詳解:https://blog.csdn.net/pcwl1206/article/details/84859661 其實在Java虛擬機器的學習中,我們或多或少都已經接觸過了有關Java記憶體模型的相關概念(點選檢視),只不過在Java虛擬機器中講的不夠詳細,因此

nginx教程九篇nginx配置檔案彙總

#####Nginx配置檔案nginx.conf中文詳解##### #定義Nginx執行的使用者和使用者組 user www www; #nginx程序數,建議設定為等於CPU總核心數。 worker_processes 8; #全域性錯誤日誌定義型別,

【搞定Java併發程式設計】23篇Fork/Join 框架

上一篇:Java中的阻塞佇列 BlockingQueue 詳解 本文目錄: 1、什麼是 Fork/Join 框架 2、工作竊取演算法 3、Fork/Join 框架的設計  4、使用 Fork/Join 框架 5、Fork/Join 框架的異常處理 6、Frok/

第一行程式碼——五章全域性大喇叭——廣播機制

目錄: 5.1 廣播機制簡介 5.2 接收系統廣播 5.2.1 動態註冊監聽網路變化 5.2.2 靜態註冊實現開機啟動 5.3 傳送自定義廣播 5.3.1 傳送標準廣播 5.3.2 傳送有序廣播 5.4 使用本地廣播 5.5 廣播最佳實踐——實現強制下線功能

04課GDB常用命令(上)

本課的核心內容如下:   run命令   continue命令   break命令   backtrace與frame命令   info break、enable、disable和delete命令   list命令   print和ptype命令 為了結合實踐,這裡以除錯Redis原始碼為例來

ADF 六篇Copy Data Activity

在Azure 資料工程中,可以使用Copy Data 活動把資料從on-premises 或雲中複製到其他儲存中。Copy Data 活動必須在一個IR(Integration Runtime)上執行,對於把儲存在on-premises中的資料複製到其他儲存時,必須建立一個self-hosted Integr

123Hadoop叢集管理之Namenode目錄元資料結構學習筆記

第123講:Hadoop叢集管理之Namenode目錄元資料結構詳解學習筆記 hadoop-2.x的叢集管理與hadoop-1.x有很大不同 hdfs-site.xml: dfs.replication dfs.namenode.name.dir  存放namenode元資

Git應用版本回退的三種方式與stash

前言 前情提要:Git應用詳解第三講:本地分支的重要操作 git作為一款版本控制工具,其最核心的功能就是版本回退,沒有之一。熟悉git版本回退的操作能夠讓你真真正正地放開手腳去開發,不用小心翼翼,怕一不小心刪除了不該刪除的檔案。本節除了介紹版本回退的內容之外,還會介紹stash的使用。 一、版本回退 在g

Git應用Git協作與Git pull常見問題

前言 前情提要:Git應用詳解第五講:遠端倉庫Github與Git圖形化介面 git除了可以很好地管理個人專案外,最大的一個用處就是實現團隊協作開發。況且,linus大神開發git的初衷就是為了維護Linux核心這一開源專案。所以,熟悉使用git進行多人協作開發的一般步驟和方法具有十分重要的意義。這一講將

Git應用Git refspec與遠端分支的重要操作

前言 前情提要:Git應用詳解第六講:Git協作與Git pull常見問題 這一節來介紹本地倉庫與遠端倉庫的分支對映關係:git refspec。徹底弄清楚本地倉庫到底是如何與遠端倉庫進行聯絡的。 一、Git refspec refspec是Reference Specification的縮寫,字面意思就

Git應用Git標籤、別名與Git gc

前言 前情提要:Git應用詳解第七講:Git refspec與遠端分支的重要操作 這一節主要介紹Git標籤、別名與Git的垃圾回收機制。 一、Git標籤(tag) 1.標籤的實質 標籤與分支十分相似,都是指向某一次提交;並且,它們的值都為各自指向提交的SHA1值;但是,不同於會隨著提交的變化而變化的分支,

Git應用Git cherry-pick與Git rebase

前言 前情提要:Git應用詳解第八講:Git標籤、別名與Git gc 這一節主要介紹git cherry-pick與git rebase的原理及使用。 一、Git cherry-pick Git cherry-pick的作用為移植提交。比如在dev分支錯誤地進行了兩次提交2nd和3rd,如果想要將這兩次提

Git應用Git子庫submodule與subtree.md

前言 前情提要:Git應用詳解第九講:Git cherry-pick與Git rebase 一箇中大型專案往往會依賴幾個模組,git提供了子庫的概念。可以將這些子模組存放在不同的倉庫中,通過submodule或subtree實現倉庫的巢狀。本講為Git應用詳解的倒數第二講,勝利離我們不遠了! 一、su

Git應用Git子庫submodule與subtree

前言 前情提要:Git應用詳解第九講:Git cherry-pick與Git rebase 一箇中大型專案往往會依賴幾個模組,git提供了子庫的概念。可以將這些子模組存放在不同的倉庫中,通過submodule或subtree實現倉庫的巢狀。本講為Git應用詳解的倒數第二講,勝利離我們不遠了! 一、su