圖平行計算實踐(二)(spark streaming+graphx+kafka)
上回利用transform方法實現對於資料流的圖平行計算過程,今天繼續改進完善已有的計算機制,加入updateStateByKey和checkpoint機制,保障圖平行計算在故障中仍能保證零誤差。
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
object DirectKafkaGraphxUpdate {
def updateFunction(newValues: Seq[Set[Long]], runningCount: Option[Set[Long]]): Option[Set[Long]] = {
def show(x: Option[Set[Long]]) = x match {
case Some(s) => s
case None => Set()
}
if(newValues.isEmpty){
return runningCount
}else {
return Some(newValues(0) ++ show(runningCount))
}
}
def functionToCreateContext(brokers:String,topics:String): StreamingContext = {
val sparkConf = new SparkConf().setAppName("DirectKafkaGraphx")
val ssc = new StreamingContext(sparkConf,Seconds(30)) // new context
ssc.checkpoint("checkpoints" ) // set checkpoint directory
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
val words = lines.map(_.split(","))
val cleanedDStream = words.transform(rdd=>{
rdd.map(x=>Edge(x(1).toInt,x(2).toInt,1))
})
cleanedDStream.print()
val graphDStream=cleanedDStream.transform(rdd=>
Graph.fromEdges(rdd,"a").collectNeighborIds(EdgeDirection.Out).map(e=>(e._1,e._2.toSet))
);
val graphDStreams = graphDStream.updateStateByKey[Set[Long]](updateFunction(_,_))
graphDStreams.print()
graphDStreams.saveAsTextFiles("sstest/kafka_graph_streamings","txt")
return ssc
}
def main(args: Array[String]) {
//System.setProperty("hadoop.home.dir", "E:\\software\\hadoop-2.5.2");
//StreamingExamples.setStreamingLogLevels()
val brokers = "101.271.251.161:9092"
val topics = "page_visits"
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
}else{
val Array(brokers, topics) = args
}
val ssc = StreamingContext.getOrCreate("checkpoints", ()=>functionToCreateContext(brokers, topics))
//Start the computation
ssc.start()
ssc.awaitTermination()
}
}
相比之前而言,主要差別有:
1.提供updateStateByKey方法,實現對於圖中個頂點的出度頂點的集合,實時獲取與該點相關的其它頂點;
2.強制加入checkpoint機制,spark streaming為了減少長連結的計算,在計算過程中會將計算的中間結果快取到hdfs上,當系統出現故障時,可以從失敗點恢復到失敗前的狀態,確保計算過程的零誤差。
相關推薦
圖平行計算實踐(二)(spark streaming+graphx+kafka)
上回利用transform方法實現對於資料流的圖平行計算過程,今天繼續改進完善已有的計算機制,加入updateStateByKey和checkpoint機制,保障圖平行計算在故障中仍能保證零誤差。 import kafka.serializer.Strin
LeetCode 145 Binary Tree Postorder Traversal(二叉樹的興許遍歷)+(二叉樹、叠代)
int truct fin for data- right class span popu 翻譯 給定一個二叉樹。返回其興許遍歷的節點的值。 比如: 給定二叉樹為 {1。 #, 2, 3} 1 2 / 3 返回
表達式求值(二叉樹方法/C++語言描述)(三)
urn sse 二叉 返回 新的 求值 calc ken node 二叉樹方法求值對運算數處理的方法與棧方法求值不太相同,除了將字符串中的運算數轉換為浮點類型外,還需要生成新的節點: 1 void Calculator::dealWithNumber(char *&
wireless(二維數組前綴和)
整數 out 政府 continue 聯賽 str 覆蓋 示意圖 ring 1 . 無線網絡發射器選址(wireless.cpp/c/pas)【問題描述】隨著智能手機的日益普及,人們對無線網的需求日益增大。某城市決定對城市內的公共場所覆蓋無線網。假設該城市的布局為由嚴格平行
紅黑樹-RBT(二、基本操作之左旋)
都是 spa 左旋 class body 節點 圖片 如果 info 一、左旋 1、當在含有n個關鍵字的紅黑樹上運行時,TREE-INSERT和TREE-DELETE操作對樹作了修改,結果可能違反(一、紅黑樹--》2、定義)中給出的紅黑樹的性質,為了保持這些性質,就要改
[POJ1014]Dividing(二進制優化多重背包)
tdi sin namespace esp arp getch 原來 ring poj #include <cstdio> #include <algorithm> #include <cstring> using namesp
DZY Loves Fibonacci Numbers CodeForces - 446C (二次剩餘+線段樹維護等比數列)
二次剩餘: 斐波那契通項公式: 先打表求出根號5在模1e9+9意義下的數。 然後就化簡成立區間加上等比數列的形式,維護每段區間加了多少次等比數列就行。 下面我們來看如何維護一個等比數列。假如我對區間[L,R]的加上1,2,4,8...2^n
Leetcode 103 二叉樹的鋸齒形層次遍歷 (二叉樹的層次遍歷)
給定一個二叉樹,返回其節點值的鋸齒形層次遍歷。(即先從左往右,再從右往左進行下一層遍歷,以此類推,層與層之間交替進行)。 例如: 給定二叉樹 [3,9,20,null,null,15,7], 3 / \ 9 20 / \ 15 7 返回鋸
C語言(二 運算子、條件語句、指標)
C運算子 包括算數運算子,邏輯運算子,關係運算符,位運算子,賦值運算子,其他運算子。 算術運算子 就是加減乘除求餘,自增自減等算術。 邏輯運算子 與:&&,或:||,非:! 關係運算符 等於,大於,小於等組合 位運算子 位與:&,位或:|,位左移<<,位右
從零開始之驅動發開、linux驅動(二十八、framebuffer驅動框架)
框架 1.註冊一個framebuffer類。 2.註冊一個主裝置號,因為fb個數通常比較少,所以可以用老的介面統一註冊。 3.為2中的註冊實現通用的fops,注意這裡是通用的,特殊的架構在通用的裡面還是要呼叫專門fb註冊時實現的操作介面。(參考下面) 4.提供統一的註冊,解除安裝
由淺入深:求給定兩個樹節點的最低公共祖先(二叉樹、普通樹結構)JAVA實現
最近看了一道面試題目,覺得很有意思,而且常常被問到,今天綜合歸納了一下這道題目,並給出了各種變形題目,附上JAVA版的程式解答。 題目是這樣的:尋找二叉樹的最低公共祖先?(其中隱含著一個盲點:樹是什麼樹?排序二叉樹、普通二叉樹、或者不是二叉樹?)所以要分別考慮哈各種情況哈 形式一:當樹
深入理解jvm(二、常用的垃圾收集器)
1.Serial 單執行緒收集器,它在進行垃圾收集時必須暫停其他工作執行緒,直到收集結束。是虛擬機器執行在客戶端下的預設新生代收集器。 相對於其他收集器的單執行緒來說,簡單高效。 2.ParNew 相當於Serial收集器的多執行緒版本,一般是執行在服務端的虛擬機器首選的新生代收集器
【BZOJ3821/UOJ46】玄學(二進制分組,線段樹)
ble online -- calc == ret int 一個 http 【BZOJ3821/UOJ46】玄學(二進制分組,線段樹) 題面 BZOJ UOJ 題解 嗚,很好的題目啊QwQ。 離線做法大概可以線段樹分治,或者直接點記錄左右兩次操作時的結果,兩個除一下就可以直
劍指offer題解(二叉樹與雙向連結串列)
題目描述 輸入一棵二叉搜尋樹,將該二叉搜尋樹轉換成一個排序的雙向連結串列。要求不能建立任何新的結點,只能調整樹中結點指標的指向。 解題思路 中序遍歷搜尋二叉樹,用pre儲存中序遍歷的前一個節點,cur為當前節點,然後使pre->right=cu
spring security 5.x 使用及分析(二:自定義配置—初階)
二、自定義配置(初階): 自定義的配置,就要修改一些預設配置的資訊,從那開始入手呢? 1、第一步:建立Spring Security 的Java配置,改配置建立一個名為springSecurityFilterChain的servlet過濾器,它負責應用程式的安
[LintCode] Binary Tree Level Order Traversal(二叉樹的層次遍歷)
描述 給出一棵二叉樹,返回其節點值的層次遍歷(逐層從左往右訪問) 樣例 給一棵二叉樹 {3,9,20,#,#,15,7} : 3 / \ 9 20 / \ 15 7 返回他的分層遍歷結果:
LeetCode:102. Binary Tree Level Order Traversal(二叉樹的層次遍歷)
Given a binary tree, return the level order traversal of its nodes' values. (ie, from left to right, level by level). For example: Given b
Leetcode 103 二叉樹的鋸齒形層次遍歷 (二叉樹的層次遍歷)
給定一個二叉樹,返回其節點值的鋸齒形層次遍歷。(即先從左往右,再從右往左進行下一層遍歷,以此類推,層與層之間交替進行)。 例如: 給定二叉樹 [3,9,20,null,null,15,7], 3 / \ 9 20 / \ 15 7
ActiveMQ訊息佇列的使用及應用(二丶JMS基本概念和模型)
一丶JMS基本概念(1) 概念 JMS Java Message Service,Java訊息服務,是Java EE中的一個技術 JMS規範 JMS 定義了Java中訪問訊息中介軟體的介面,並沒有給予實現,實現JMS介面的訊息中介軟體叫JMS Provider,例如ActiveMQ JMS Prov
LeetCode:236. Lowest Common Ancestor of a Binary Tree(二叉樹中最小祖先)
Given a binary tree, find the lowest common ancestor (LCA) of two given nodes in the tree. According to the definition of LCA on Wikipedia: “Th