1. 程式人生 > >用Apache Spark進行大資料處理四

用Apache Spark進行大資料處理四

如何安裝Spark

安裝和使用Spark有幾種不同方式。你可以在自己的電腦上將Spark作為一個獨立的框架安裝或者從諸如ClouderaHortonWorksMapR之類的供應商處獲取一個Spark虛擬機器映象直接使用。或者你也可以使用在雲端環境(如Databricks Cloud)安裝並配置好的Spark

在本文中,我們將把Spark作為一個獨立的框架安裝並在本地啟動它。最近Spark剛剛釋出了1.2.0版本。我們將用這一版本完成示例應用的程式碼展示。

如何執行Spark

當你在本地機器安裝了Spark或使用了基於雲端的Spark後,有幾種不同的方式可以連線到Spark引擎。

下表展示了不同的

Spark執行模式所需的Master URL引數。

如何與Spark互動

Spark啟動並執行後,可以用Spark shell連線到Spark引擎進行互動式資料分析。Spark shell支援ScalaPython兩種語言。Java不支援互動式的Shell,因此這一功能暫未在Java語言中實現。

可以用spark-shell.cmdpyspark.cmd命令分別執行Scala版本和Python版本的Spark Shell

Spark網頁控制檯

不論Spark執行在哪一種模式下,都可以通過訪問Spark網頁控制檯檢視Spark的作業結果和其他的統計資料,控制檯的URL地址如下:

http://localhost:4040

Spark控制檯如下圖3所示,包括StagesStorageEnvironmentExecutors四個標籤頁

(點選檢視大圖)

3. Spark網頁控制檯

共享變數

Spark提供兩種型別的共享變數可以提升叢集環境中的Spark程式執行效率。分別是廣播變數和累加器。

廣播變數:廣播變數可以在每臺機器上快取只讀變數而不需要為各個任務傳送該變數的拷貝。他們可以讓大的輸入資料集的叢集拷貝中的節點更加高效。

下面的程式碼片段展示瞭如何使用廣播變數。

//

// Broadcast Variables

//

val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar.value

累加器:只有在使用相關操作時才會新增累加器,因此它可以很好地支援並行。累加器可用於實現計數(就像在MapReduce中那樣)或求和。可以用add方法將執行在叢集上的任務新增到一個累加器變數中。不過這些任務無法讀取變數的值。只有驅動程式才能夠讀取累加器的值。

下面的程式碼片段展示瞭如何使用累加器共享變數:

//

// Accumulators

//

val accum = sc.accumulator(0, "My Accumulator")

sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

accum.value

Spark應用示例

本篇文章中所涉及的示例應用是一個簡單的字數統計應用。這與學習用Hadoop進行大資料處理時的示例應用相同。我們將在一個文字檔案上執行一些資料分析查詢。本示例中的文字檔案和資料集都很小,不過無須修改任何程式碼,示例中所用到的Spark查詢同樣可以用到大容量資料集之上。

為了讓討論儘量簡單,我們將使用Spark Scala Shell

首先讓我們看一下如何在你自己的電腦上安裝Spark

前提條件:

· 為了讓Spark能夠在本機正常工作,你需要安裝Java開發工具包(JDK)。這將包含在下面的第一步中。

· 同樣還需要在電腦上安裝Spark軟體。下面的第二步將介紹如何完成這項工作。

注:下面這些指令都是以Windows環境為例。如果你使用不同的作業系統環境,需要相應的修改系統變數和目錄路徑已匹配你的環境。

I. 安裝JDK

1)從Oracle網站上下載JDK。推薦使用JDK 1.7版本

JDK安裝到一個沒有空格的目錄下。對於Windows使用者,需要將JDK安裝到像c:\dev這樣的資料夾下,而不能安裝到“c:\Program Files”資料夾下。“c:\Program Files”資料夾的名字中包含空格,如果軟體安裝到這個資料夾下會導致一些問題。

注:不要“c:\Program Files”資料夾中安裝JDK或(第二步中所描述的)Spark軟體。

2)完成JDK安裝後,切換至JDK 1.7目錄下的”bin“資料夾,然後鍵入如下命令,驗證JDK是否正確安裝:

java -version

如果JDK安裝正確,上述命令將顯示Java版本。

II. 安裝Spark軟體:

Spark網站上下載最新版本的Spark。在本文發表時,最新的Spark版本是1.2。你可以根據Hadoop的版本選擇一個特定的Spark版本安裝。我下載了與Hadoop 2.4或更高版本匹配的Spark,檔名是spark-1.2.0-bin-hadoop2.4.tgz

將安裝檔案解壓到本地資料夾中(如:c:\dev)。

為了驗證Spark安裝的正確性,切換至Spark資料夾然後用如下命令啟動Spark Shell。這是Windows環境下的命令。如果使用LinuxMac OS,請相應地編輯命令以便能夠在相應的平臺上正確執行。

c:

cd c:\dev\spark-1.2.0-bin-hadoop2.4

bin\spark-shell

如果Spark安裝正確,就能夠在控制檯的輸出中看到如下資訊。

….

15/01/17 23:17:46 INFO HttpServer: Starting HTTP Server

15/01/17 23:17:46 INFO Utils: Successfully started service 'HTTP class server' on port 58132.

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 1.2.0

      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)

Type in expressions to have them evaluated.

Type :help for more information.

….

15/01/17 23:17:53 INFO BlockManagerMaster: Registered BlockManager

15/01/17 23:17:53 INFO SparkILoop: Created spark context..

Spark context available as sc.

可以鍵入如下命令檢查Spark Shell是否工作正常。

sc.version

(或)

sc.appName

完成上述步驟之後,可以鍵入如下命令退出Spark Shell視窗:

:quit

如果想啟動Spark Python Shell,需要先在電腦上安裝Python。你可以下載並安裝Anaconda,這是一個免費的Python發行版本,其中包括了一些比較流行的科學、數學、工程和資料分析方面的Python包。

然後可以執行如下命令啟動Spark Python Shell

c:

cd c:\dev\spark-1.2.0-bin-hadoop2.4

bin\pyspark

Spark示例應用

完成Spark安裝並啟動後,就可以用Spark API執行資料分析查詢了。

這些從文字檔案中讀取並處理資料的命令都很簡單。我們將在這一系列文章的後續文章中向大家介紹更高階的Spark框架使用的用例。

首先讓我們用Spark API執行流行的Word Count示例。如果還沒有執行Spark Scala Shell,首先開啟一個Scala Shell視窗。這個示例的相關命令如下所示:

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

val txtFile = "README.md"

val txtData = sc.textFile(txtFile)

txtData.cache()

我們可以呼叫cache函式將上一步生成的RDD物件儲存到快取中,在此之後Spark就不需要在每次資料查詢時都重新計算。需要注意的是,cache()是一個延遲操作。在我們呼叫cache時,Spark並不會馬上將資料儲存到記憶體中。只有當在某個RDD上呼叫一個行動時,才會真正執行這個操作。

現在,我們可以呼叫count函式,看一下在文字檔案中有多少行資料。

txtData.count()

然後,我們可以執行如下命令進行字數統計。在文字檔案中統計資料會顯示在每個單詞的後面。

val wcData = txtData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

wcData.collect().foreach(println)

如果想檢視更多關於如何使用Spark核心API的程式碼示例,請參考網站上的Spark文件

後續計劃

在後續的系列文章中,我們將從Spark SQL開始,學習更多關於Spark生態系統的其他部分。之後,我們將繼續瞭解Spark StreamingSpark MLlibSpark GraphX。我們也會有機會學習像TachyonBlinkDB等框架。

小結

在本文中,我們瞭解了Apache Spark框架如何通過其標準API幫助完成大資料處理和分析工作。我們還對Spark和傳統的MapReduce實現(如Apache Hadoop)進行了比較。SparkHadoop基於相同的HDFS檔案儲存系統,因此如果你已經在Hadoop上進行了大量投資和基礎設施建設,可以一起使用SparkMapReduce

此外,也可以將Spark處理與Spark SQL、機器學習以及Spark Streaming結合在一起。關於這方面的內容我們將在後續的文章中介紹。

利用Spark的一些整合功能和介面卡,我們可以將其他技術與Spark結合在一起。其中一個案例就是將SparkKafkaApache Cassandra結合在一起,其中Kafka負責輸入的流式資料,Spark完成計算,最後Cassandra NoSQL資料庫用於儲存計算結果資料。

不過需要牢記的是,Spark生態系統仍不成熟,在安全和與BI工具整合等領域仍然需要進一步的改進。

相關推薦

Apache Spark進行資料處理

如何安裝Spark 安裝和使用Spark有幾種不同方式。你可以在自己的電腦上將Spark作為一個獨立的框架安裝或者從諸如Cloudera,HortonWorks或MapR之類的供應商處獲取一個Spark虛擬機器映象直接使用。或者你也可以使用在雲端環境(如Databricks

Apache Spark進行資料處理Spark GraphX圖資料分析(6)

import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import java.util.Calendar // 先匯入邊 val graph = GraphLoader.edgeL

Apache Spark進行資料處理

Spark特性 Spark通過在資料處理過程中成本更低的洗牌(Shuffle)方式,將MapReduce提升到一個更高的層次。利用記憶體資料儲存和接近實時的處理能力,Spark比其他的大資料處理技術的效能要快很多倍。 Spark還支援大資料查詢的延遲計算,這可以幫助優化大資

Apache Spark資料處理統一引擎

工業和研究中資料的大幅增長為電腦科學帶來了巨大的機會與挑戰。由於資料大小超過了單臺機器的能力,使用者需要新的系統將計算擴充套件到多個節點。因此,針對不同計算工作負載的新叢集程式設計模型已呈爆炸式增長。 這些模型相對專業化。例如支援批處理的MapReduce,支援迭

Spark SQL資料處理並寫入Elasticsearch

1 # coding: utf-8 2 import sys 3 import os 4 5 pre_current_dir = os.path.dirname(os.getcwd()) 6 sys.path.append(pre_current_dir) 7 from pyspark.sq

Spark SQL 資料處理

InfoQ 上有學者對 Spark 的大資料處理,做了一些歸納演講 我嘗試著對這些演講做翻譯,加入了一些自己的理解和實驗 理解是我自己的,有可能是錯誤的,實驗是為了證明自己的理解是正確的 Big Data Processing with Apache Sp

資料演算法-Hadoop/Spark資料處理技巧》讀書筆記()——移動平均

移動平均:對時序序列按週期取其值的平均值,這種運算被稱為移動平均。典型例子是求股票的n天內的平均值。 移動平均的關鍵是如何求這個平均值,可以使用Queue來實現。 public class MovingAverageDriver { public

資料處理為何選擇Spark,而不是Hadoop

一.基礎知識1.SparkSpark是一個用來實現快速而通用的叢集計算的平臺。在速度方面,Spark擴充套件了廣泛使用的MapReduce計算模型,而且高效地支援更多計算模式,包括互動式查詢和流處理。Spark專案包含多個緊密整合的元件。Spark的核心是一個對由很多計算任務組成的、執行在多個工作機器或者是一

資料演算法-Hadoop/Spark資料處理技巧》讀書筆記(一)——二次排序

寫在前面: 在做直播的時候有同學問Spark不是用Scala語言作為開發語言麼,的確是的,從網上查資料的話也會看到大把大把的用Scala編寫的Spark程式,但是仔細看就會發現這些用Scala寫的文章

搭建資料處理叢集(Hadoop,Spark,Hbase)

搭建Hadoop叢集 配置每臺機器的 /etc/hosts保證每臺機器之間可以互訪。 120.94.158.190 master 120.94.158.191 secondMaster 1、建立hadoop使用者 先建立had

流式資料處理 (實時)的三種框架:Storm,Spark和Samza

摘要:許多分散式計算系統都可以實時或接近實時地處理大資料流。本文將對Storm、Spark和Samza等三種Apache框架分別進行簡單介紹,然後嘗試快速、高度概述其異同。 許多分散式計算系統都可以實時或接近實時地處理大資料流。本文將對三種Apache框架分別進行簡單介紹,

流式資料處理的三種框架:Storm,Spark和Samza

許多分散式計算系統都可以實時或接近實時地處理大資料流。本文將對三種Apache框架分別進行簡單介紹,然後嘗試快速、高度概述其異同。Apache Storm在Storm中,先要設計一個用於實時計算的圖狀結構,我們稱之為拓撲(topology)。這個拓撲將會被提交給叢集,由叢集中

Spark 』7. 使用 Spark DataFrame 進行資料分析

寫在前面 本系列是綜合了自己在學習spark過程中的理解記錄 + 對參考文章中的一些理解 + 個人實踐spark過程中的一些心得而來。寫這樣一個系列僅僅是為了梳理個人學習spark的筆記記錄,所以一切以能夠理解為主,沒有必要的細節就不會記錄了,而且文中有時候會出現英文

[BigData]流式資料處理的三種框架:Storm,Spark和Samza

許多分散式計算系統都可以實時或接近實時地處理大資料流。本文將對三種Apache框架分別進行簡單介紹,然後嘗試快速、高度概述其異同。 Apache Storm 在Storm中,先要設計一個用於實時計算的圖狀結構,我們稱之為拓撲(topology)。這個拓撲將會被提交給叢集,由叢集中的主控節點(maste

Hadoop 進行分散式資料處理,從 入門、進階到應用開發

[email protected]:~# hadoop-0.20 fs -ls output Found 2 items drwxr-xr-x - root supergroup 0 2010-05-12 19:04 /user/root/output/_logs -rw-r

資料處理之——簡明Spark介紹

很多涉及或者並行式機器學習工作或者大資料處理的崗位,基本都會有要求會使用Hadoop/Hive/Spark/Storm這幾個開源工具,那麼針對其中比較主流的Spark,我在這裡做一個比較簡單地總結。 什麼是Spark? 在技術不斷告訴更迭的程式

流式資料處理的三種框架:Storm,Spark和Flink

storm、spark streaming、flink都是開源的分散式系統,具有低延遲、可擴充套件和容錯性諸多優點,允許你在執行資料流程式碼時,將任務分配到一系列具有容錯能力的計算機上並行執行,都提供

資料處理引擎Spark與Flink對比分析!

大資料技術正飛速地發展著,催生出一代又一代快速便捷的大資料處理引擎,無論是Hadoop、Storm,還是後來的Spark、Flin

資料處理需要到的程式語言開發語言

你有一個大資料專案,你知道問題領域(problem domain),也知道使用什麼基礎設施,甚至可能已決定使用哪種框架來處理所有這

資料處理過程只需這步,讓你從0到1!

大資料這幾年火得不要不要,如同“站在風口上的豬”,但很多人只是停留在耳聞的階段,並不知道大資料真正的用途或是實操在哪,這其中也包括