1. 程式人生 > >圖平行計算實踐(二)(spark streaming+graphx+kafka)

圖平行計算實踐(二)(spark streaming+graphx+kafka)

上回利用transform方法實現對於資料流的圖平行計算過程,今天繼續改進完善已有的計算機制,加入updateStateByKey和checkpoint機制,保障圖平行計算在故障中仍能保證零誤差。

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._

object
DirectKafkaGraphxUpdate {
def updateFunction(newValues: Seq[Set[Long]], runningCount: Option[Set[Long]]): Option[Set[Long]] = { def show(x: Option[Set[Long]]) = x match { case Some(s) => s case None => Set() } if(newValues.isEmpty){ return runningCount }else
{ return Some(newValues(0) ++ show(runningCount)) } } def functionToCreateContext(brokers:String,topics:String): StreamingContext = { val sparkConf = new SparkConf().setAppName("DirectKafkaGraphx") val ssc = new StreamingContext(sparkConf,Seconds(30)) // new context ssc.checkpoint("checkpoints"
) // set checkpoint directory val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) val lines = messages.map(_._2) val words = lines.map(_.split(",")) val cleanedDStream = words.transform(rdd=>{ rdd.map(x=>Edge(x(1).toInt,x(2).toInt,1)) }) cleanedDStream.print() val graphDStream=cleanedDStream.transform(rdd=> Graph.fromEdges(rdd,"a").collectNeighborIds(EdgeDirection.Out).map(e=>(e._1,e._2.toSet)) ); val graphDStreams = graphDStream.updateStateByKey[Set[Long]](updateFunction(_,_)) graphDStreams.print() graphDStreams.saveAsTextFiles("sstest/kafka_graph_streamings","txt") return ssc } def main(args: Array[String]) { //System.setProperty("hadoop.home.dir", "E:\\software\\hadoop-2.5.2"); //StreamingExamples.setStreamingLogLevels() val brokers = "101.271.251.161:9092" val topics = "page_visits" if (args.length < 2) { System.err.println(s""" |Usage: DirectKafkaWordCount <brokers> <topics> | <brokers> is a list of one or more Kafka brokers | <topics> is a list of one or more kafka topics to consume from | """.stripMargin) }else{ val Array(brokers, topics) = args } val ssc = StreamingContext.getOrCreate("checkpoints", ()=>functionToCreateContext(brokers, topics)) //Start the computation ssc.start() ssc.awaitTermination() } }

相比之前而言,主要差別有:
1.提供updateStateByKey方法,實現對於圖中個頂點的出度頂點的集合,實時獲取與該點相關的其它頂點;
2.強制加入checkpoint機制,spark streaming為了減少長連結的計算,在計算過程中會將計算的中間結果快取到hdfs上,當系統出現故障時,可以從失敗點恢復到失敗前的狀態,確保計算過程的零誤差。

相關推薦

平行計算實踐spark streaming+graphx+kafka

上回利用transform方法實現對於資料流的圖平行計算過程,今天繼續改進完善已有的計算機制,加入updateStateByKey和checkpoint機制,保障圖平行計算在故障中仍能保證零誤差。 import kafka.serializer.Strin

LeetCode 145 Binary Tree Postorder Traversal叉樹的興許遍歷+叉樹、叠代

int truct fin for data- right class span popu 翻譯 給定一個二叉樹。返回其興許遍歷的節點的值。 比如: 給定二叉樹為 {1。 #, 2, 3} 1 2 / 3 返回

表達式求值叉樹方法/C++語言描述

urn sse 二叉 返回 新的 求值 calc ken node   二叉樹方法求值對運算數處理的方法與棧方法求值不太相同,除了將字符串中的運算數轉換為浮點類型外,還需要生成新的節點: 1 void Calculator::dealWithNumber(char *&

wireless維數組前綴和

整數 out 政府 continue 聯賽 str 覆蓋 示意圖 ring 1 . 無線網絡發射器選址(wireless.cpp/c/pas)【問題描述】隨著智能手機的日益普及,人們對無線網的需求日益增大。某城市決定對城市內的公共場所覆蓋無線網。假設該城市的布局為由嚴格平行

紅黑樹-RBT、基本操作之左旋

都是 spa 左旋 class body 節點 圖片 如果 info 一、左旋   1、當在含有n個關鍵字的紅黑樹上運行時,TREE-INSERT和TREE-DELETE操作對樹作了修改,結果可能違反(一、紅黑樹--》2、定義)中給出的紅黑樹的性質,為了保持這些性質,就要改

[POJ1014]Dividing進制優化多重背包

tdi sin namespace esp arp getch 原來 ring poj #include <cstdio> #include <algorithm> #include <cstring> using namesp

DZY Loves Fibonacci Numbers CodeForces - 446C 次剩餘+線段樹維護等比數列

二次剩餘:   斐波那契通項公式: 先打表求出根號5在模1e9+9意義下的數。 然後就化簡成立區間加上等比數列的形式,維護每段區間加了多少次等比數列就行。 下面我們來看如何維護一個等比數列。假如我對區間[L,R]的加上1,2,4,8...2^n

Leetcode 103 叉樹的鋸齒形層次遍歷 叉樹的層次遍歷

給定一個二叉樹,返回其節點值的鋸齒形層次遍歷。(即先從左往右,再從右往左進行下一層遍歷,以此類推,層與層之間交替進行)。 例如: 給定二叉樹 [3,9,20,null,null,15,7], 3 / \ 9 20 / \ 15 7 返回鋸

C語言 運算子、條件語句、指標

C運算子 包括算數運算子,邏輯運算子,關係運算符,位運算子,賦值運算子,其他運算子。 算術運算子 就是加減乘除求餘,自增自減等算術。 邏輯運算子 與:&&,或:||,非:! 關係運算符 等於,大於,小於等組合 位運算子 位與:&,位或:|,位左移<<,位右

從零開始之驅動發開、linux驅動十八、framebuffer驅動框架

框架 1.註冊一個framebuffer類。 2.註冊一個主裝置號,因為fb個數通常比較少,所以可以用老的介面統一註冊。 3.為2中的註冊實現通用的fops,注意這裡是通用的,特殊的架構在通用的裡面還是要呼叫專門fb註冊時實現的操作介面。(參考下面) 4.提供統一的註冊,解除安裝

由淺入深:求給定兩個樹節點的最低公共祖先叉樹、普通樹結構JAVA實現

最近看了一道面試題目,覺得很有意思,而且常常被問到,今天綜合歸納了一下這道題目,並給出了各種變形題目,附上JAVA版的程式解答。 題目是這樣的:尋找二叉樹的最低公共祖先?(其中隱含著一個盲點:樹是什麼樹?排序二叉樹、普通二叉樹、或者不是二叉樹?)所以要分別考慮哈各種情況哈 形式一:當樹

深入理解jvm、常用的垃圾收集器

1.Serial 單執行緒收集器,它在進行垃圾收集時必須暫停其他工作執行緒,直到收集結束。是虛擬機器執行在客戶端下的預設新生代收集器。 相對於其他收集器的單執行緒來說,簡單高效。 2.ParNew 相當於Serial收集器的多執行緒版本,一般是執行在服務端的虛擬機器首選的新生代收集器

【BZOJ3821/UOJ46】玄學進制分組,線段樹

ble online -- calc == ret int 一個 http 【BZOJ3821/UOJ46】玄學(二進制分組,線段樹) 題面 BZOJ UOJ 題解 嗚,很好的題目啊QwQ。 離線做法大概可以線段樹分治,或者直接點記錄左右兩次操作時的結果,兩個除一下就可以直

劍指offer題解叉樹與雙向連結串列

題目描述 輸入一棵二叉搜尋樹,將該二叉搜尋樹轉換成一個排序的雙向連結串列。要求不能建立任何新的結點,只能調整樹中結點指標的指向。   解題思路   中序遍歷搜尋二叉樹,用pre儲存中序遍歷的前一個節點,cur為當前節點,然後使pre->right=cu

spring security 5.x 使用及分析:自定義配置—初階

二、自定義配置(初階): 自定義的配置,就要修改一些預設配置的資訊,從那開始入手呢? 1、第一步:建立Spring Security 的Java配置,改配置建立一個名為springSecurityFilterChain的servlet過濾器,它負責應用程式的安

[LintCode] Binary Tree Level Order Traversal叉樹的層次遍歷

描述 給出一棵二叉樹,返回其節點值的層次遍歷(逐層從左往右訪問) 樣例 給一棵二叉樹 {3,9,20,#,#,15,7} : 3 / \ 9 20 / \ 15 7 返回他的分層遍歷結果:

LeetCode:102. Binary Tree Level Order Traversal叉樹的層次遍歷

Given a binary tree, return the level order traversal of its nodes' values. (ie, from left to right, level by level). For example: Given b

Leetcode 103 叉樹的鋸齒形層次遍歷 叉樹的層次遍歷

給定一個二叉樹,返回其節點值的鋸齒形層次遍歷。(即先從左往右,再從右往左進行下一層遍歷,以此類推,層與層之間交替進行)。 例如: 給定二叉樹 [3,9,20,null,null,15,7], 3 / \ 9 20 / \ 15 7

ActiveMQ訊息佇列的使用及應用丶JMS基本概念和模型

一丶JMS基本概念(1) 概念 JMS Java Message Service,Java訊息服務,是Java EE中的一個技術 JMS規範 JMS 定義了Java中訪問訊息中介軟體的介面,並沒有給予實現,實現JMS介面的訊息中介軟體叫JMS Provider,例如ActiveMQ JMS Prov

LeetCode:236. Lowest Common Ancestor of a Binary Tree叉樹中最小祖先

Given a binary tree, find the lowest common ancestor (LCA) of two given nodes in the tree. According to the definition of LCA on Wikipedia: “Th