1. 程式人生 > >Python海量資料處理之_Hadoop&Spark

Python海量資料處理之_Hadoop&Spark

1. 說明

 前篇介紹了安裝和使用Hadoop,本篇將介紹Hadoop+Spark的安裝配置及如何用Python呼叫Spark。
 當資料以TB,PB計量時,用單機處理資料變得非常困難,於是使用Hadoop建立計算叢集處理海量資料,Hadoop分為兩部分,一部分是資料儲存HDFS,另一部分是資料計算MapReduce。MapReduce框架將資料處理分成map,reduce兩段,使用起來比較麻煩,並且有一些限制,如:資料都是流式的,且必須所有Map結束後才能開始Reduce。我們可以引入Spark加以改進。
 Spark的優點在於它的中間結果儲存在記憶體中,而非HDFS檔案系統中,所以速度很快。用Scala 語言可以像操作本地集合物件一樣輕鬆地操作分散式資料集。雖然它支援中間結果儲存在記憶體,但叢集中的多臺機器仍然需要讀寫資料集,所以它經常與HDFS共同使用。因此,它並非完全替代Hadoop。
 Spark的框架是使用Scala語言編寫的,Spark的開發可以使用語言有:Scala、R語言、Java、Python。

2. Scala

 Scala是一種類似java的程式語言,使用Scala語言相對來說程式碼量更少,呼叫spark更方便,也可以將它和其它程式混用。
 在不安裝scala的情況下,啟動hadoop和spark,python的基本例程也可以正常執行。但出於進一步開發的需要,最好安裝scala。

(1) 下載scala

(2) 安裝

$ cd /home/hadoop #使用者可選擇安裝的資料夾
$ tar xvzf tgz/scala-2.11.12.tgz
$ ln -s scala-2.11.12/ scala

在.bashrc中加入
export PATH=/home/hadoop/scala/bin:$PATH

3. 下載安裝Spark

(1) 下載spark

(2) 安裝spark

$ cd /home/hadoop #使用者可選擇安裝的資料夾
$ tar xvzf spark-2.2.1-bin-hadoop2.7.tgz
$ ln -s spark-2.2.1-bin-hadoop2.7/ spark

在.bashrc中加入
export SPARK_HOME=/home/hadoop/spark
export PATH=$SPARK_HOME/bin:$PATH

(3) 配置檔案

 不做配置,pyspark可以在本機上執行,但不能使用叢集中其它機器。配置檔案在$SPARK_HOME/conf/目錄下。

i. 配置spark-env.sh

$ cd $SPARK_HOME/conf/
$ cp spark-env.sh.template spark-env.sh
按具體配置填寫內容
export SCALA_HOME=/home/hadoop/scala
export JAVA_HOME=/exports/android/jdk/jdk1.8.0_91/
export SPARK_MASTER_IP=master
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/home/hadoop/hadoop/etc/hadoop/

ii. 設定主從伺服器slave

$ cp slaves.template slaves 
在其中列出從伺服器地址,單機不用設

iii. 設定spark-defaults.conf

$ cp conf/spark-defaults.conf.template conf/spark-defaults.conf
按具體配置填寫內容
spark.master                     spark://master:7077
spark.eventLog.enabled           false
spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.driver.memory              1g
spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

(4) 啟動

 執行spark之前,需要執行hadoop,具體見之前的Hadoop文件

$ $SPARK_HOME/sbin/start-all.sh

 該指令碼啟動了所有master和workers,在本機用jps檢視,增加了Worker和Master,

4. 命令列呼叫

 下面我們來看看從程式層面如何使用Spark

(1) 準備工作

 在使用相對路徑時,系統預設是從hdfs://localhost:9000/中讀資料,因此需要先把待處理的本地檔案複製到HDFS上,常用命令見之前的Hadoop有意思。

$ hadoop fs -mkdir -p /usr/hadoop
$ hadoop fs -copyFromLocal README.md /user/hadoop/

(2) Spark命令列

$ pyspark
>>> textFile = spark.read.text("README.md")
>>> textFile.count() # 返回行數
>>> textFile.first() # 返回第一行
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark")) # 返回所有含Spark行的資料集

5. 程式

(1) 實現功能

 統計檔案中的詞頻

(2) 程式碼

 這裡使用了spark自帶的例程 /home/hadoop/spark/examples/src/main/python/wordcount.py,和之前介紹過的hadoop程式一樣,同樣是實現的針對key,value的map,reduce,一個檔案就完成了,看起來更簡徢更靈活,像是hadoop自帶MapReduce的加強版。具體內容如下:

from __future__ import print_function

import sys 
from operator import add 

from pyspark.sql import SparkSession

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        exit(-1)

    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect() # 收集結果
    for (word, count) in output:
        print("%s: %i" % (word, count))

    spark.stop()

(3) 執行

 spark-submit命令在$HOME_SPARK/bin目錄下,之前設定了PATH,可以直接使用

$ spark-submit $SPARK_HOME/examples/src/main/python/wordcount.py /user/hadoop/README.md

 引數是hdfs中的檔案路徑。
 此時訪問$SPARK_IP:8080埠,可以看到程式PythonWordCount正在hadoop中執行。

6. 多臺機器上安裝Spark以建立叢集

 和hadoop的叢集設定類似,同樣是把整個spark目錄複製叢集中其它的伺服器上,用slaves檔案設定主從關係,然後啟動$SPARK_HOME/sbin/start-all.sh。正常開啟後可以通過網頁檢視狀態:SparkMaster_IP:8080

7. 參考