1. 程式人生 > >本地Spark程式提交到hadoop叢集執行流程

本地Spark程式提交到hadoop叢集執行流程

1.本地環境準備

本文是將eclipse開發環境下的maven+Spark+scala程式移植到叢集環境上執行過程,寫的很粗糙,見諒。

本地用eclipse編寫Spark小程式,完成從txt檔案讀取資料操作。

本地maven+Spark+scala環境就不多說了,如果配置出問題,就下載最新的eclipse然後從商店裝外掛吧。

注意在maven的配置檔案pom.xml中插入Spark引用:

		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<version>1.2.0</version>
		</dependency>

2.編寫測試程式碼

本地資料檔案放在data資料夾下,資料檔案格式是“性別”,“姓名”。scala程式碼如下,程式碼中註釋掉的是本地檔案讀取,替換為HDFS上的路徑,程式碼中制定local不需要修改,會在後面的執行指令碼中被覆蓋。“*”表示伺服器IP,埠為預設埠。

package com.******.scalaprogram

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import scala.collection.mutable.LinkedList

object TestScala {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext("local[2]", "Pations Data Analysis")
//    val peopledata = sc.textFile("data/aaaaaa.txt")
    val peopledata = sc.textFile("hdfs://*.*.*.*:8020/user/mydir/aaaaaa.txt")
    val peoplelines = peopledata.collect()
    def parse(line:String) = {
      val pieces = line.split(",")
      val sex = pieces(0).toString()
      val name = pieces(1).toString()
      (sex,name)
    }    
    var sexLinkedList = LinkedList[String]("sex")
    var nameLinkedList = LinkedList[String]("name")
    for (i<-0 to (peoplelines.length-1)){
         var peoplerowcollectline = parse(peoplelines(i))
         val parse_1 = peoplerowcollectline._1
         val parse_2 = peoplerowcollectline._2
         sexLinkedList.append(LinkedList(parse_1))
         nameLinkedList.append(LinkedList(parse_2))
    }
    val sexList:List[String] = sexLinkedList.toList
    val nameList:List[String] = nameLinkedList.toList
    val peopleList : List[(String,String)] = sexList.zip(nameList).tail
    val rdd2 = sc.parallelize(peopleList)
    val combinByKeyRDD2 = rdd2.combineByKey(
        (x :String)=>(List(x),1),          
        (peo:(List[String],Int),x:String)=>(x::peo._1,peo._2+1),
        (sex1:(List[String],Int),sex2:(List[String],Int)) => (sex1._1:::sex2._1,sex1._2+sex2._2))
    combinByKeyRDD2.foreach(println)

    
  }
}

3.資料檔案上傳

將資料檔案上傳到伺服器,這裡用的是Xftp。上傳完畢後緊接著上傳到HDFS,注意要與上面程式碼中的路徑一致。

[[email protected]***** spark_dir]#hadoop fs -put aaaaaa.txt /user/mydir/aaaaaa.txt

4.程式打包和上傳

這裡使用maven打包檔案,打包方式有多種,這裡選擇:在工程包位置右擊 -> Run as -> Maven install,之後會在下圖中位置出現jar包。


將jar包拷貝到伺服器。


5.叢集執行程式

編寫執行指令碼,這個網上一大堆,我也是參考的。注意其中spark的路徑和程式的class路徑


最後執行指令碼:./combinbykey.sh >>spark.log得到結果


由於涉及公司等個人資訊,為了避免麻煩,做了處理導致圖片有點難看,將就下哈哈。整個程式很簡單,細心就可以了。