1. 程式人生 > >【六】Spark Streaming接入HDFS的資料Local模式(使用Scala語言)

【六】Spark Streaming接入HDFS的資料Local模式(使用Scala語言)

Spark Streaming接入HDFS的資料模擬一個wordcount的功能,結果列印到控制檯,使用Local模式,使用Scala語言。

專案目錄

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.sid.spark</groupId>
  <artifactId>spark-train</artifactId>
  <version>1.0</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.11.8</scala.version>
    <kafka.version>0.9.0.0</kafka.version>
    <spark.version>2.2.0</spark.version>
    <hadoop.version>2.9.0</hadoop.version>
    <hbase.version>1.4.4</hbase.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>${kafka.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <!--<dependency>-->
      <!--<groupId>org.apache.hbase</groupId>-->
      <!--<artifactId>hbase-clinet</artifactId>-->
      <!--<version>${hbase.version}</version>-->
    <!--</dependency>-->

    <!--<dependency>-->
      <!--<groupId>org.apache.hbase</groupId>-->
      <!--<artifactId>hbase-server</artifactId>-->
      <!--<version>${hbase.version}</version>-->
    <!--</dependency>-->

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>net.jpountz.lz4</groupId>
      <artifactId>lz4</artifactId>
      <version>1.3.0</version>
    </dependency>

  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

程式碼

package com.sid.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by jy02268879 on 2018/7/16.
  *
  * Spark Streaming 處理 HDFS檔案資料
  */
object HDFSWordCount {
  def  main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local").setAppName("HDFSWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    /**
      * Create an input stream that monitors a Hadoop-compatible filesystem
      * for new files and reads them as text files (using key as LongWritable, value
      * as Text and input format as TextInputFormat). Files must be written to the
      * monitored directory by "moving" them from another location within the same
      * file system. File names starting with . are ignored.
      *
      * param directory HDFS directory to monitor for new file
      */
    val lines = ssc.textFileStream("hdfs://node1:9000/testdata/sparkstreaming/hdfswordcount/")

    val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

    result.print()

    ssc.start()
    ssc.awaitTermination()
  }


}

啟動Hadoop

(只在namenode上面執行,即是node1)

cd /app/hadoop/hadoop-2.9.0/sbin

start-all.sh

在HDFS上建立路徑

cd /app/hadoop/hadoop-2.9.0/bin

hdfs dfs -mkdir -p /testdata/sparkstreaming/hdfswordcount

建立測試資料

cd /app/spark/test_data/monitor_file

vi test.log

IDEA點選run啟動程式

將要計數的檔案放到HDFS上

cd /app/spark/test_data/monitor_file

hdfs dfs -put test.log /testdata/sparkstreaming/hdfswordcount

檢視IDEA控制檯輸出

相關推薦

Spark Streaming接入HDFS資料Local模式使用Scala語言

Spark Streaming接入HDFS的資料模擬一個wordcount的功能,結果列印到控制檯,使用Local模式,使用Scala語言。 專案目錄 pom.xml <project xmlns="http://maven.apache.org/POM/4.

Spark Streaming和Kafka整合開發指南

thread ada 關系 方法 拷貝 理解 1.2 reduce arr 基於Receivers的方法 這個方法使用了Receivers來接收數據。Receivers的實現使用到Kafka高層次的消費者API。對於所有的Receivers,接收到的數據將會保存在Spark

十五Spark Streaming整合Kafka使用Direct方式使用Scala語言

官網介紹 Kafka提供了新的consumer api 在0.8版本和0.10版本之間。0.8的整合是相容0.9和0.10的。但是0.10的整合不相容以前的版本。 這裡使用的整合是spark-streaming-kafka-0-8。官方文件 配置SparkStrea

Spark Streaming 用foreachRDD把結果寫入Mysql中Local模式使用Scala語言

DStream 的foreachRDD是允許把資料傳送到外部檔案系統中。然而使用不當會導致各種問題。 錯誤示範1:在driver建立連線,在woker使用。會報錯connection object not serializable。 錯誤示範2:rdd每個記錄都建立連

十四Spark Streaming整合Kafka使用Receiver方式使用Scala語言

官方網站 Kafka提供了新的consumer api 在0.8版本和0.10版本之間。0.8的整合是相容0.9和0.10的。但是0.10的整合不相容以前的版本。 這裡使用的整合是spark-streaming-kafka-0-8。官方文件 配置SparkStrea

乾貨各區塊鏈底層資料儲存分析

網際網路的中心化發展模式是傳統網路安全的的軟肋,區塊鏈作為一種去中心化、集體維護、不可篡改的新興技術,是對網際網路底層架構的革新,是對當今生產力和生產關係的變革。區塊鏈也被譽為是繼蒸汽機、電力、資訊和網際網路科技之後,目前最有潛力觸發第五輪顛覆性革命浪潮的核心技術。 目前

強烈推薦:關於系統學習資料探勘Data Mining的一些建議!!

微信公眾號 關鍵字全網搜尋最新排名 【機器學習演算法】:排名第一 【機器學習】:排名第一 【Python】:排名第三 【演算法】:排名第四 關於資料探勘 提到收據挖掘(Data Mining, DM),很多想學習的同學大多數都會問我: 什麼是資料探勘? 怎麼培養資料分析的能力? 如何成為一名資料科學家? (

spark streaming讀取kafka資料令丟失

方式二: 方法二就是每次streaming 消費了kafka的資料後,將消費的kafka offsets更新到zookeeper。當你的程式掛掉或者升級的時候,就可以接著上次的讀取,實現資料的令丟失和 at most once。而且使用checkpoint的方

Java基本類型和引用類型值傳遞

適合 and span print pri right bre enc this 【關鍵詞】 【問題】 · 加深對基本類型和引用類型的理解; 【效果圖】 【分析】 參見最後的【參考資料】 【解決方式】 【代碼】 public

PANDAS 數據合並與重塑concat篇

分享 levels 不同的 整理 con 簡單 post ignore num 轉自:http://blog.csdn.net/stevenkwong/article/details/52528616 1 concat concat函數是在pandas底下的方法,可以將數據

轉載Eclipse vs IDEA快捷鍵對比大全win系統

logs 常用 ima idea ref 技術 div log eclipse 花了幾天時間熟悉IDEA的各種操作,將各種快捷鍵都試了一下,感覺很是不錯!以下為我整理了一下開發過程中經常用的一些Eclipse快捷鍵與IDEA的對比,方便像我一樣使用Eclipse多年但想嘗試

題解 bzoj1864: [Zjoi2006]三色二叉樹 動態規劃

nod max cout esp build == node IT ron bzoj1864,懶得復制,戳我戳我 Solution: 其實想出來了\(dp\)方程推出來了最大值,一直沒想到推最小值 \(dp[i][1/0]\)表示\(i\)號節點的子樹中的綠色染色最大值,

CodeforcesCodeforces Round #492 (Div. 2) [Thanks, uDebug!] Contest 996

spl === bsp -- href 分享 printf cli 位置 題目 傳送門:QWQ A:A - Hit the Lottery 分析: 大水題 模擬 代碼: #include <bits/stdc++.h&g

筆記跨域重定向中使用AjaxXHR請求導致跨域失敗

兩個 led stat -h java cut 報錯 blank direct 背景: 1、前端Web中有兩個域名,a.com和b.com,其中a.com是訪問主站(頁面),b.com是數據提交接口的服務器(XHR請求) 2、a.com中用XHR調用b.com/cerate

atcoderAll Your Paths are Different Lengths[arc102D]亂搞

ase owb arc line 傳送門 turn read sed 二進制   題目傳送門:https://arc102.contest.atcoder.jp/tasks/arc102_b   這道題有點毒瘤啊,罰時上天。。   顯然若$ l=2^n $那麽就可以直接

題解洛谷P1941 [NOIP2014TG] 飛揚的小鳥揹包DP

次元傳送門:洛谷P1941 思路 從題意可知 在每個單位時間內 可以無限地向上飛 但是隻能向下掉一次 所以我們可以考慮運用揹包解決這道題 上升時 用完全揹包 下降時 用01揹包 設f[x][y]為在座標(x,y)時的最小點選螢幕次數 當飛到天花板時和撞到柱子時特判 一開始設ans為極大值 如

練習題第十五章--類和物件Think Python

別名有可能讓程式讀起來有困難,因為在一個位置做出的修改有可能導致另外一個位置發生不可預知的情況。這樣也很難去追蹤指向一個物件的所有變數。所以就可以不用別名,而用複製物件的方法。copy 模組包含了一個名叫 copy 的函式,可以複製任意物件: >>> p1 = Point()

練習題第二章--變數,表示式,語句Think Python

一些比較特殊的用法: 不過+和*可以用在字串上面。 +加號的意思就是字串拼接了,會把兩個字串拼到一起,如下所示: >>> first = 'throat' >>> second = 'warbler' >>> first + s

Python+opencv利用sobel進行邊緣檢測細節講解

#! usr/bin/env python # coding:utf-8 # 2018年7月2日06:48:35 # 2018年7月2日23:11:59 import cv2 import numpy as np import matplotlib.pyplot as plt img = cv2

原始碼四階龍格庫塔法Runge Kutta求解常微分方程

MATLAB完整原始碼: % It calculates ODE using Runge-Kutta 4th order method % Author Ido Schwartz clc; % Clears the screen clear all; h