【六】Spark Streaming接入HDFS的資料Local模式(使用Scala語言)
Spark Streaming接入HDFS的資料模擬一個wordcount的功能,結果列印到控制檯,使用Local模式,使用Scala語言。
專案目錄
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sid.spark</groupId> <artifactId>spark-train</artifactId> <version>1.0</version> <inceptionYear>2008</inceptionYear> <properties> <scala.version>2.11.8</scala.version> <kafka.version>0.9.0.0</kafka.version> <spark.version>2.2.0</spark.version> <hadoop.version>2.9.0</hadoop.version> <hbase.version>1.4.4</hbase.version> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!--<dependency>--> <!--<groupId>org.apache.hbase</groupId>--> <!--<artifactId>hbase-clinet</artifactId>--> <!--<version>${hbase.version}</version>--> <!--</dependency>--> <!--<dependency>--> <!--<groupId>org.apache.hbase</groupId>--> <!--<artifactId>hbase-server</artifactId>--> <!--<version>${hbase.version}</version>--> <!--</dependency>--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> <version>1.3.0</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project>
程式碼
package com.sid.spark import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by jy02268879 on 2018/7/16. * * Spark Streaming 處理 HDFS檔案資料 */ object HDFSWordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("HDFSWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * Create an input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable, value * as Text and input format as TextInputFormat). Files must be written to the * monitored directory by "moving" them from another location within the same * file system. File names starting with . are ignored. * * param directory HDFS directory to monitor for new file */ val lines = ssc.textFileStream("hdfs://node1:9000/testdata/sparkstreaming/hdfswordcount/") val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) result.print() ssc.start() ssc.awaitTermination() } }
啟動Hadoop
(只在namenode上面執行,即是node1)
cd /app/hadoop/hadoop-2.9.0/sbin
start-all.sh
在HDFS上建立路徑
cd /app/hadoop/hadoop-2.9.0/bin
hdfs dfs -mkdir -p /testdata/sparkstreaming/hdfswordcount
建立測試資料
cd /app/spark/test_data/monitor_file
vi test.log
IDEA點選run啟動程式
將要計數的檔案放到HDFS上
cd /app/spark/test_data/monitor_file
hdfs dfs -put test.log /testdata/sparkstreaming/hdfswordcount
檢視IDEA控制檯輸出
相關推薦
【六】Spark Streaming接入HDFS的資料Local模式(使用Scala語言)
Spark Streaming接入HDFS的資料模擬一個wordcount的功能,結果列印到控制檯,使用Local模式,使用Scala語言。 專案目錄 pom.xml <project xmlns="http://maven.apache.org/POM/4.
【轉】Spark Streaming和Kafka整合開發指南
thread ada 關系 方法 拷貝 理解 1.2 reduce arr 基於Receivers的方法 這個方法使用了Receivers來接收數據。Receivers的實現使用到Kafka高層次的消費者API。對於所有的Receivers,接收到的數據將會保存在Spark
【十五】Spark Streaming整合Kafka使用Direct方式(使用Scala語言)
官網介紹 Kafka提供了新的consumer api 在0.8版本和0.10版本之間。0.8的整合是相容0.9和0.10的。但是0.10的整合不相容以前的版本。 這裡使用的整合是spark-streaming-kafka-0-8。官方文件 配置SparkStrea
【八】Spark Streaming 用foreachRDD把結果寫入Mysql中Local模式(使用Scala語言)
DStream 的foreachRDD是允許把資料傳送到外部檔案系統中。然而使用不當會導致各種問題。 錯誤示範1:在driver建立連線,在woker使用。會報錯connection object not serializable。 錯誤示範2:rdd每個記錄都建立連
【十四】Spark Streaming整合Kafka使用Receiver方式(使用Scala語言)
官方網站 Kafka提供了新的consumer api 在0.8版本和0.10版本之間。0.8的整合是相容0.9和0.10的。但是0.10的整合不相容以前的版本。 這裡使用的整合是spark-streaming-kafka-0-8。官方文件 配置SparkStrea
【乾貨】各區塊鏈底層資料儲存分析(一)
網際網路的中心化發展模式是傳統網路安全的的軟肋,區塊鏈作為一種去中心化、集體維護、不可篡改的新興技術,是對網際網路底層架構的革新,是對當今生產力和生產關係的變革。區塊鏈也被譽為是繼蒸汽機、電力、資訊和網際網路科技之後,目前最有潛力觸發第五輪顛覆性革命浪潮的核心技術。 目前
【強烈推薦】:關於系統學習資料探勘(Data Mining)的一些建議!!
微信公眾號 關鍵字全網搜尋最新排名 【機器學習演算法】:排名第一 【機器學習】:排名第一 【Python】:排名第三 【演算法】:排名第四 關於資料探勘 提到收據挖掘(Data Mining, DM),很多想學習的同學大多數都會問我: 什麼是資料探勘? 怎麼培養資料分析的能力? 如何成為一名資料科學家? (
spark streaming讀取kafka資料令丟失(二)
方式二: 方法二就是每次streaming 消費了kafka的資料後,將消費的kafka offsets更新到zookeeper。當你的程式掛掉或者升級的時候,就可以接著上次的讀取,實現資料的令丟失和 at most once。而且使用checkpoint的方
【Java】基本類型和引用類型(值傳遞)
適合 and span print pri right bre enc this 【關鍵詞】 【問題】 · 加深對基本類型和引用類型的理解; 【效果圖】 【分析】 參見最後的【參考資料】 【解決方式】 【代碼】 public
【轉】PANDAS 數據合並與重塑(concat篇)
分享 levels 不同的 整理 con 簡單 post ignore num 轉自:http://blog.csdn.net/stevenkwong/article/details/52528616 1 concat concat函數是在pandas底下的方法,可以將數據
【轉載】Eclipse vs IDEA快捷鍵對比大全(win系統)
logs 常用 ima idea ref 技術 div log eclipse 花了幾天時間熟悉IDEA的各種操作,將各種快捷鍵都試了一下,感覺很是不錯!以下為我整理了一下開發過程中經常用的一些Eclipse快捷鍵與IDEA的對比,方便像我一樣使用Eclipse多年但想嘗試
【題解】 bzoj1864: [Zjoi2006]三色二叉樹 (動態規劃)
nod max cout esp build == node IT ron bzoj1864,懶得復制,戳我戳我 Solution: 其實想出來了\(dp\)方程推出來了最大值,一直沒想到推最小值 \(dp[i][1/0]\)表示\(i\)號節點的子樹中的綠色染色最大值,
【Codeforces】Codeforces Round #492 (Div. 2) [Thanks, uDebug!] (Contest 996)
spl === bsp -- href 分享 printf cli 位置 題目 傳送門:QWQ A:A - Hit the Lottery 分析: 大水題 模擬 代碼: #include <bits/stdc++.h&g
【筆記】跨域重定向中使用Ajax(XHR請求)導致跨域失敗
兩個 led stat -h java cut 報錯 blank direct 背景: 1、前端Web中有兩個域名,a.com和b.com,其中a.com是訪問主站(頁面),b.com是數據提交接口的服務器(XHR請求) 2、a.com中用XHR調用b.com/cerate
【atcoder】All Your Paths are Different Lengths[arc102D](亂搞)
ase owb arc line 傳送門 turn read sed 二進制 題目傳送門:https://arc102.contest.atcoder.jp/tasks/arc102_b 這道題有點毒瘤啊,罰時上天。。 顯然若$ l=2^n $那麽就可以直接
【題解】洛谷P1941 [NOIP2014TG] 飛揚的小鳥(揹包DP)
次元傳送門:洛谷P1941 思路 從題意可知 在每個單位時間內 可以無限地向上飛 但是隻能向下掉一次 所以我們可以考慮運用揹包解決這道題 上升時 用完全揹包 下降時 用01揹包 設f[x][y]為在座標(x,y)時的最小點選螢幕次數 當飛到天花板時和撞到柱子時特判 一開始設ans為極大值 如
【練習題】第十五章--類和物件(Think Python)
別名有可能讓程式讀起來有困難,因為在一個位置做出的修改有可能導致另外一個位置發生不可預知的情況。這樣也很難去追蹤指向一個物件的所有變數。所以就可以不用別名,而用複製物件的方法。copy 模組包含了一個名叫 copy 的函式,可以複製任意物件: >>> p1 = Point()
【練習題】第二章--變數,表示式,語句(Think Python)
一些比較特殊的用法: 不過+和*可以用在字串上面。 +加號的意思就是字串拼接了,會把兩個字串拼到一起,如下所示: >>> first = 'throat' >>> second = 'warbler' >>> first + s
【轉】Python+opencv利用sobel進行邊緣檢測(細節講解)
#! usr/bin/env python # coding:utf-8 # 2018年7月2日06:48:35 # 2018年7月2日23:11:59 import cv2 import numpy as np import matplotlib.pyplot as plt img = cv2
【原始碼】四階龍格庫塔法(Runge Kutta)求解常微分方程
MATLAB完整原始碼: % It calculates ODE using Runge-Kutta 4th order method % Author Ido Schwartz clc; % Clears the screen clear all; h