1. 程式人生 > >DStream運算元updateStateByKey實現全域性統計計數

DStream運算元updateStateByKey實現全域性統計計數

	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setMaster("local[2]")
				.setAppName("UpdateStateByKeyWordCount");  
	JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));

	jssc.checkpoint("hdfs://hadoop01:9000/wordcount_checkpoint");  
	// 命令: nc -lk 9999 
	JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
		
	JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Iterable<String> call(String line) throws Exception {
				return Arrays.asList(line.split(" "));  
			}
			
		});
		
		JavaPairDStream<String, Integer> pairs = words.mapToPair(
				
				new PairFunction<String, String, Integer>() {

					private static final long serialVersionUID = 1L;

					@Override
					public Tuple2<String, Integer> call(String word)
							throws Exception {
						return new Tuple2<String, Integer>(word, 1);
					}
					
				});

		JavaPairDStream<String, Integer> wordCounts = pairs.updateStateByKey(
				
		// 這裡的Optional,相當於Scala中的樣例類,就是Option,可以這麼理解
		// 它代表了一個值的存在狀態,可能存在,也可能不存在
		new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Optional<Integer> call(List<Integer> values,
					Optional<Integer> state) throws Exception {
				Integer newValue = 0;
				if(state.isPresent()) {
					newValue = state.get();
				}
				for(Integer value : values) {
					newValue += value;
				}
				
				return Optional.of(newValue);  
			}
			
		});
		
		wordCounts.print();
		
		jssc.start();
		jssc.awaitTermination();
		jssc.close();
	}
  Scala版本wordcount全域性計數:
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
        .setMaster("local[2]")  
        .setAppName("UpdateStateByKeyWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.checkpoint("hdfs://spark1:9000/wordcount_checkpoint")  
    
    val lines = ssc.socketTextStream("spark1", 9999)
    val words = lines.flatMap { _.split(" ") }   
    val pairs = words.map { word => (word, 1) } 
    val wordCounts = pairs.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
      var newValue = state.getOrElse(0)    
      for(value <- values) {
        newValue += value
      }
      Option(newValue)  
    })
    
    wordCounts.print()  
    
    ssc.start()
    ssc.awaitTermination()
  }


相關推薦

DStream運算元updateStateByKey實現全域性統計計數

public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("UpdateStateByKeyWordCount"); J

JS實現文字倒計數

console bsp else .text ack htm lan his practice <!DOCTYPE html> <html xmlns="http://www.w3.org/1999/xhtml"> <head>

arcgis api for js之echarts開源js庫實現地圖統計圖分析

不能 rgba data ron 創建 apc att load reat 前面寫過一篇關於arcgis api for js實現地圖統計圖的,具體見:http://www.cnblogs.com/giserhome/p/6727593.html 那是基於dojo組件來實

實現一個統計函數運行時間

app n) for datetime code 當前 運行 一個 color 1.datetime顯示當前時間 import datetime def Func(n): t = datetime.datetime.now() print t

旅遊公司用友財務軟件接口實現大批量數據導入財務模塊實現報表統計

北京 導入模塊 數據 設置 包含 調試 軟件 沒有 批量 一. 總體概況:....................................................................................................

分享一個開源的JavaScript統計圖表庫,40行代碼實現專業統計圖表

軟件 cal 比較 pie ogr too 掃描 earch 搜索 提升程序員工作效率的工具/技巧推薦系列 推薦一個功能強大的文件搜索工具SearchMyFiles 介紹一個好用的免費流程圖和UML繪制軟件-Diagram Designer 介紹Windows任務管理器的

Java實現資料統計的常用演算法

求和、平均值、眾數、中位數、中列數、四分位數、極差、四分位數、截斷均值、方差、絕對平均差(AAD)、中位數絕對偏差、標準差 的數學方法 package cn.javacodes.utils; import java.util.Arrays; import java.util.HashMap;

js陣列實現分類統計

將水果陣列中同類的水果合併為一條並求出總數 var fruits = [{ name: 'apple', value: 1 }, { name: 'apple', value: 2 }, // 總計3個蘋果 { name: 'banana', value: 2 }, {

VBS 指令碼通過條件程式實現全域性函式呼叫

效果動畫演示   步驟一:變數字典建立 步驟二:新建“視窗0”畫面 1、變數==>動畫(文字內容) 關聯 “變數” 2、按鈕“呼叫函式”  事件  “左鍵單擊”事件 Var.中間變數= Not Var.中間變

@ControllerAdvice和@ExceptionHandler實現全域性捕獲異常

##全域性捕獲異常:相當於整個web請求專案全域性捕獲異常,一般對整個controller層丟擲的異常做統一處理。 ##異常處理有兩種方式:1、捕獲返回json格式;2、捕獲返回頁面的 @ControllerAdvice(basePackages= {"com.demo"}) public cl

Jmeter如何實現 全域性變數

所謂全域性變數我們就可以理解為是在一個jmeter 測試指令碼中可以到處起作用的一個值。 基本步驟 在獲取資料的執行緒中先將我們想要的資料通過相應的後置處理器拿出來然後放在一個變數當中。 通過 setproperty 函式來設定具體的屬性名及屬性值,但是這句話

spark streaming DStream運算元大全

DStream作為spark 流處理的資料抽象,有三個主要的特徵: 1.依賴的DStream的列表 2.DStream生成RDD的時間間隔 3.用來生成RDD的方法 本篇pom.xml檔案spark streaming版本為1.6.0 目錄 window() reduce

Linq 多個DataTable表關聯查詢,實現考勤統計

最近在做考勤系統時,由於不同的分公司的資料來源在不同的伺服器上,關聯查詢比較麻煩,因此想到了用Linq實現關聯查詢。 思路:查詢各個考勤資訊的dataTable,然後用Linq實現DataTable的多張表左關聯查詢。貼出來,與大家分享。 程式碼實現如下: 1.View顯示:

MapReduce實現單詞統計

 mapreduce實現思路: Map階段: a) 從HDFS的源資料檔案中逐行讀取資料 b) 將每一行資料切分出單詞 c) 為每一個單詞構造一個鍵值對(單詞,1) d) 將鍵值對傳送給reduce   Reduce階段: a)&nb

React實現全域性元件:Toast輕提示

Toast是常用的輕提示彈框,常用於頁面loading和提示語彈窗。本例基於React實現一個隨時可呼叫且不隨頁面渲染的全域性元件。 如何使用 首先引入 import Toast from './components/toast' JSX中事件呼叫: <bu

Java程式設計中JFreeChart圖表繪製類庫巧妙利用JSP實現頁面統計

1 開發環境: 1、eclipse(可替換)2、jfreechart-1.0.19 2 說明: (1) source目錄:為 jfreechart的原始碼目錄;不會的主要看這裡。因為他的文件是收費的。(2) lib目錄:為包目錄,我們需要關注的包為 jfreechart-1.0.10.ja

Filter + 動態代理實現全域性編碼

html: <form action="/WEB24/EnCoding" method="get"> <input type="text" name="username"> <input type="submit" value="提交">

Storm實現單詞統計案例

需求 實時統計發射到Storm框架中單詞的總數 分析 設計一個topology,來實現對文件裡面的單詞出現的頻率進行統計,整個topology分為三個部分 (1)WordCountSpot:資料來源,在已知的英文句子中,隨機發送一條句子出去 package storm

從壹開始前後端分離 [.netCore 不定期更新 ] 三十五║ 完美實現全域性異常日誌記錄

緣起 哈嘍我是不定期更新的日常,昨天群裡小夥伴問到了記錄日誌,當然,以前我也挖過這個坑,後來一直沒有來得及填上,也想著 swagger 一直又有錯誤資訊展示的功能,就遲遲沒有新增這個功能,不過昨天夜裡想了想,還是需要增加上,旨在提高框架的高效性。不定期日常就直接上程式碼了,我有一個小想法,就是希望大家有好的

vue3.0實現全域性對話方塊

vue+vuex+vuetify 1.在資料夾components下新建資料夾Dialog.vue <template> <v-layout row justify-cneter> <v-dialog v-model="isSho