1. 程式人生 > >Spark程式設計指南入門之Java篇一-基本知識

Spark程式設計指南入門之Java篇一-基本知識

1. Spark的Java開發包

Spark提供Java的開發包,當前最新版本是2.0.2版本:spark-core_2.11-2.0.2.jar,可以從下面連結下載:

http://central.maven.org/maven2/org/apache/spark/spark-core_2.11/2.0.2/spark-core_2.11-2.0.2.jar

或者通過Maven配置:

<dependency>
    <groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.11</artifactId>
	<version>2.0.2</version>
</dependency>

* Spark 2.0.2版本需要Java 7或以上,本文使用Java 1.8.0_72版本

2. 初始化Spark
要使用Spark,第一步必須建立JavaSparkContext物件:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class HelloSpark {
	
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setMaster("local").setAppName("HelloSpark");
		try (JavaSparkContext jsc = new JavaSparkContext(conf)) {
			// do something here
		}
	}

}

上述程式碼通過SparkConf建立JavaSparkContext,SparkConf預設去讀取Spark.*的配置檔案,也可以通過呼叫set的方法配置屬性,例如上述的setMaster和setAppName。通過set方法配置的屬性會覆蓋讀取的配置檔案屬性,SparkConf裡面的所有set方法都支援鏈式呼叫chaining,例如上述的setMaster("local").setAppName("HelloSpark")。

* 上述程式碼使用了Java 7的try-with-resources語句,用於自動關閉JavaSparkContext
* setAppName:設定應用名字,此名字會在Spark web UI顯示
* setMaster:設定主節點URL,本例使用“local”是指本機單執行緒,另外還有以下選項:

local[K]:本機K執行緒
local[*]:本機多執行緒,執行緒數與伺服器核數相同
spark://HOST:PORT:Spark叢集地址和埠,預設埠為7077
mesos://HOST:PORT:Mesos叢集地址和埠,預設埠為5050
yarn:YARN叢集

詳細請參考以下連結:

http://spark.apache.org/docs/latest/submitting-applications.html#master-urls

3. 彈性分散式資料集Resilient Distributed Datasets(RDDs)


Spark使用了一個叫彈性分散式資料集resilient distributed dataset(RDD),它是支援並行操作的容錯性元素集合。RDD可以通過2種方式建立:

- 並行化一個存在的集合
- 引用外部儲存系統(例如共享檔案系統、HDFS、HBase或者任何支援Hadoop InputFormat的資料來源)的資料集

3.1 並行集合Parallelized Collections

並行集合Parallelized Collections可以通過呼叫JavaSparkContext的parallelize方法建立,例如:

import java.util.Arrays;
import java.util.List;

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
// jsc是上述程式碼已經建立的JavaSparkContext例項
JavaRDD<Integer> distData = jsc.parallelize(data);

上述JavaRDD一旦被建立,它就可以被並行操作,例如呼叫reduce方法計算集合裡面所有元素的總和:

distData.reduce((a, b) -> a + b)

* 上述程式碼使用Java 8的lambda語法(這裡不做介紹),在Java 8之前,上述程式碼可以使用下面程式碼實現:

import org.apache.spark.api.java.function.Function2;

distData.reduce(new Function2<Integer, Integer, Integer>() {
			
	private static final long serialVersionUID = 1L;

	@Override
	public Integer call(Integer a, Integer b) throws Exception {
		return a + b;
	}
	
});

並行集合的一個重要引數是用於分佈資料集的分割槽數量,Spark會對每一個數據分割槽執行一個任務,通常地,每一個CPU可以處理2到4個分割槽。一般地,Spark會基於叢集的資源情況自動設定分割槽數量,不過也可以使用parallelize的第二個引數自定義,例如jsc.parallelize(data, 5),這樣的話,資料就會被分為5個分割槽,Spark可以並行處理這5個分割槽,提高處理效率。

3.2 外部資料集External Datasets

上面提過,Spark可以從任何支援Hadoop InputFormat的資料來源建立分散式資料集,包括本地檔案系統,HDFS,Cassandra, HBase, Amazon S3等等,Spark支援文字檔案,序列化檔案和其它Hadoop InputFormat。例如,以下程式碼通過呼叫SparkContext的textFile方法讀取本地檔案系統建立一個文字格式的RDD:

JavaRDD<String> distFile = sc.textFile("data.txt");

* HDFS的路徑是以hdfs://開頭,Amazon S3的路徑是以s3://開頭

3.3 Spark讀取檔案的一些注意事項


- 如果路徑是本地的檔案系統路徑,那麼這個路徑必須能夠被在所有工作節點的同樣路徑訪問,你可以把檔案複製到所有工作節點的對應路徑或者使用共享檔案系統

- 所有基於檔案輸入的Spark方法,包括textFile,路徑可以是目錄,壓縮檔案和萬用字元。例如,textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz")

- textFile方法的第二個引數是可選的,用於控制檔案的分割槽數量。Spark預設對檔案的每一個block都建立一個分割槽(HDFS的預設block大小是64MB),通過設定該引數可以修改檔案的分割槽數量,但是分割槽數量不能少於block的數量

TO BE CONTINUED...O(∩_∩)O