1. 程式人生 > >Spark:RDD操作和持久化

Spark:RDD操作和持久化

建立RDD

進行Spark核心程式設計時,首先要做的第一件事,就是建立一個初始的RDD。該RDD中,通常就代表和包含了Spark應用程式的輸入源資料。然後在建立了初始的RDD之後,才可以通過Spark Core提供的transformation運算元,對該RDD進行轉換,來獲取其他的RDD

Spark Core提供了三種建立RDD的方式

  • 使用程式中的集合建立RDD
  • 使用本地檔案建立RDD
  • 使用HDFS檔案建立RDD

並行化集合建立RDD

如果要通過並行化集合來建立RDD,需要針對程式中的集合,呼叫SparkContext的parallelize()方法。Spark會將集合中的資料拷貝到叢集上去,形成一個分散式的資料集合,也就是一個RDD。相當於是,集合中的部分資料會到一個節點上,而另一部分資料會到其他節點上。然後就可以用並行的方式來操作這個分散式資料集合,即RDD。

public class ParallelizeCollection {

	public static void main(String[] args) {
		// 建立SparkConf
		SparkConf conf = new SparkConf().setAppName("ParallelizeCollection").setMaster("local");

		// 建立JavaSparkContext
		JavaSparkContext sc = new JavaSparkContext(conf);

		// 要通過並行化集合的方式建立RDD,那麼就呼叫SparkContext以及其子類,的parallelize()方法
		JavaRDD<Integer> numberRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));

		// 執行reduce運算元操作
		// 相當於,先進行1 + 2 = 3;然後再用3 + 3 = 6;然後再用6 + 4 = 10。。。以此類推
		int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer num1, Integer num2) throws Exception {
				return num1 + num2;
			}

		});

		// 輸出累加的和
		System.out.println("1到10的累加和:" + sum);

		// 關閉JavaSparkContext
		sc.close();
	}
}
object ParallelizeCollection {
  
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("ParallelizeCollection")
    
    val sc = new SparkContext(conf)
    
    val numberRDD = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 5)
    
    val sum = numberRDD.reduce(_ + _)
    
    println(sum)
    
  }
}

呼叫parallelize()時,有一個重要的引數可以指定,就是要將集合切分成多少個partition。Spark會為每一個partition執行一個task來進行處理。

Spark官方的建議是,為叢集中的每個CPU建立2~4個partition。Spark預設會根據叢集的情況來設定partition的數量。但是也可以在呼叫parallelize()方法時,傳入第二個引數,來設定RDD的partition數量。

使用本地檔案和HDFS建立RDD

Spark是支援使用任何Hadoop支援的儲存系統上的檔案建立RDD的,比如說HDFS、Cassandra、HBase以及本地檔案。通過呼叫SparkContext的textFile()方法,可以針對本地檔案或HDFS檔案建立RDD。

注意事項:

  1. 如果是針對本地檔案的話,如果是在windows上本地測試,windows上有一份檔案即可;如果是在spark叢集上針對linux本地檔案,那麼需要將檔案拷貝到所有worker節點上。
  2. Spark的textFile()方法支援針對目錄、壓縮檔案以及萬用字元進行RDD建立
  3. Spark預設會為hdfs檔案的每一個block建立一個partition,但是也可以通過textFile()的第二個引數手動設定分割槽數量,只能比block數量多,不能比block數量少。
public class HDFSFile {

	public static void main(String[] args) {
		// 建立SparkConf
		// 修改:去除setMaster()設定,修改setAppName()
		SparkConf conf = new SparkConf().setAppName("HDFSFile");
		// 建立JavaSparkContext
		JavaSparkContext sc = new JavaSparkContext(conf);

		// 使用SparkContext以及其子類的textFile()方法,針對HDFS檔案建立RDD
		// 只要把textFile()內的路徑修改為hdfs檔案路徑即可
		JavaRDD<String> lines = sc.textFile("hdfs://spark1:9000/spark.txt");

		// 統計文字檔案內的字數
		JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(String v1) throws Exception {
				return v1.length();
			}

		});

		int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}

		});

		System.out.println("檔案總字數是:" + count);

		// 關閉JavaSparkContext
		sc.close();
	}
}
object LocalFile {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("LocalFile").setMaster("local")

    val sc = new SparkContext(conf)

    val lines = sc.textFile("spark.txt", 5)

    val counts = lines.map(line => line.length()).reduce(_ + _)

    println(counts)

  }
}

Spark的textFile()除了可以針對上述幾種普通的檔案建立RDD之外,還有一些特列的方法來建立RDD:

  1. SparkContext.wholeTextFiles()方法
    可以針對一個目錄中的大量小檔案,返回<filename, fileContent>組成的pair,作為一個PairRDD,而不是普通的RDD。普通的textFile()返回的RDD中,每個元素就是檔案中的一行文字。
  2. SparkContext.sequenceFileK, V方法
    可以針對SequenceFile建立RDD,K和V泛型型別就是SequenceFile的key和value的型別。K和V要求必須是Hadoop的序列化型別,比如IntWritable、Text等。
  3. SparkContext.hadoopRDD()方法
    對於Hadoop的自定義輸入型別,可以建立RDD。該方法接收JobConf、InputFormatClass、Key和Value的Class。
  4. SparkContext.objectFile()方法
    可以針對之前呼叫RDD.saveAsObjectFile()建立的物件序列化的檔案,反序列化檔案中的資料,並建立一個RDD。

操作RDD

Spark支援兩種RDD操作:transformation和action。transformation操作會針對已有的RDD建立一個新的RDD;而action則主要是對RDD進行最後的操作,比如遍歷、reduce、儲存到檔案等,並可以返回結果給Driver程式。

transformation的特點就是lazy特性。lazy特性指的是,如果一個spark應用中只定義了transformation操作,那麼即使你執行該應用,這些操作也不會執行。也就是說,transformation是不會觸發spark程式的執行的,它們只是記錄了對RDD所做的操作,但是不會自發的執行。只有當transformation之後,接著執行了一個action操作,那麼所有的transformation才會執行。Spark通過這種lazy特性,來進行底層的spark應用執行的優化,避免產生過多中間結果。

action操作執行,會觸發一個spark job的執行,從而觸發這個action之前所有的transformation的執行。這是action的特性。

在這裡插入圖片描述

常用transformation介紹

操作 介紹
map 將RDD中的每個元素傳入自定義函式,獲取一個新的元素,然後用新的元素組成新的RDD
filter 對RDD中每個元素進行判斷,如果返回true則保留,返回false則剔除。
flatMap 與map類似,但是對每個元素都可以返回一個或多個新元素。
gropuByKey 根據key進行分組,每個key對應一個Iterable
reduceByKey 對每個key對應的value進行reduce操作。
sortByKey 對每個key對應的value進行排序操作。
join 對兩個包含<key,value>對的RDD進行join操作,每個key join上的pair,都會傳入自定義函式進行處理。
cogroup 同join,但是是每個key對應的Iterable都會傳入自定義函式進行處理。

常用action介紹

操作 介紹
reduce 將RDD中的所有元素進行聚合操作。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。
collect 將RDD中所有元素獲取到本地客戶端。
count 獲取RDD元素總數。
take(n) 獲取RDD中前n個元素。
saveAsTextFile 將RDD元素儲存到檔案中,對每個元素呼叫toString方法
countByKey 對每個key對應的值進行count計數。
foreach 遍歷RDD中的每個元素。

transformation操作

map
map運算元,是對任何型別的RDD,都可以呼叫的,java中,map運算元接收的引數是Function物件,建立的Function物件,一定會讓你設定第二個泛型引數,這個泛型型別,就是返回的新元素的型別,同時call()方法的返回型別,也必須與第二個泛型型別同步,在call()方法內部,就可以對原始RDD中的每一個元素進行各種處理和計算,並返回一個新的元素,新的元素就會組成一個新的RDD

JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {

					private static final long serialVersionUID = 1L;

					// 傳入call()方法的,就是1,2,3,4,5
					// 返回的就是2,4,6,8,10
					@Override
					public Integer call(Integer v1) throws Exception {
						return v1 * 2;
					}

				});
val multipleNumberRDD = numberRDD.map { num => num * 2 } 

filter
filter運算元,傳入的也是Function,其他的使用注意點,實際上和map是一樣的,但是,唯一的不同,就是call()方法的返回型別是Boolean,每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯來判斷這個元素是否是你想要的,如果你想在新的RDD中保留這個元素,那麼就返回true;否則,不想保留這個元素,返回false

JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() {

					private static final long serialVersionUID = 1L;

					// 在這裡,1到10,都會傳入進來
					// 但是根據我們的邏輯,只有2,4,6,8,10這幾個偶數,會返回true
					// 所以,只有偶數會保留下來,放在新的RDD中
					@Override
					public Boolean call(Integer v1) throws Exception {
						return v1 % 2 == 0;
					}

				});
val evenNumberRDD = numberRDD.filter { num => num % 2 == 0 }

flatMap
flatMap運算元,在java中,接收的引數是FlatMapFunction,需要自己定義FlatMapFunction的第二個泛型型別,即,代表了返回的新元素的型別,call()方法,返回的型別不是U,而是Iterable<>,這裡的U也與第二個泛型型別相同,flatMap其實就是,接收原始RDD中的每個元素,並進行各種邏輯的計算和處理,可以返回多個元素,即封裝在Iterable集合中,可以使用ArrayList等集合,新的RDD中,即封裝了所有的新元素;也就是說,新的RDD的大小一定是 >= 原始RDD的大小

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

			private static final long serialVersionUID = 1L;

			// 在這裡會,比如,傳入第一行,hello you
			// 返回的是一個Iterable<String>(hello, you)
			@Override
			public Iterable<String> call(String t) throws Exception {
				return Arrays.asList(t.split(" "));
			}

		});
val words = lines.flatMap { line => line.split(" ") }

gropuByKey
groupByKey運算元,返回的JavaPairRDD,JavaPairRDD的第一個泛型型別不變,第二個泛型型別變成Iterable這種集合型別,也就是說,按照了key進行分組,那麼每個key可能都會有多個value,此時多個value聚合成了Iterable

// 模擬集合
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
				new Tuple2<String, Integer>("class1", 80),
				new Tuple2<String, Integer>("class2", 75),
				new Tuple2<String, Integer>("class1", 90),
				new Tuple2<String, Integer>("class2", 65));

JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);

JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();
 val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75),Tuple2("class1", 90), Tuple2("class2", 60))
 val scores = sc.parallelize(scoreList, 1)
 val groupedScores = scores.groupByKey()

reduceByKey
reduceByKey,接收的引數是Function2型別,它有三個泛型引數,實際上代表了三個值,第一個泛型型別和第二個泛型型別,代表了原始RDD中的元素的value的型別,因此對每個key進行reduce,都會依次將第一個、第二個value傳入,將值再與第三個value傳入,因此此處,會自動定義兩個泛型型別,代表call()方法的兩個傳入引數的型別。第三個泛型型別,代表了每次reduce操作返回的值的型別,預設也是與原始RDD的value型別相同的。

JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() {

			private static final long serialVersionUID = 1L;

			// 對每個key,都會將其value,依次傳入call方法
			// 從而聚合出每個key對應的一個value
			// 然後,將每個key對應的一個value,組合成一個Tuple2,作為新RDD的元素
			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}

		});
val totalScores = scores.reduceByKey(_ + _)

sortByKey
sortByKey其實就是根據key進行排序,可以手動指定升序,或者降序。

JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);
val sortedScores = scores.sortByKey(false)

join
join,會根據key進行join並返回JavaPairRDD,JavaPairRDD的第一個泛型型別是之前兩個JavaPairRDD的key的型別,因為是通過key進行join的,第二個泛型型別,是Tuple2<v1, v2>的型別,Tuple2的兩個泛型分別為原始RDD的value的型別,join就返回的RDD的每一個元素,就是通過key join上的一個pair

例:兩個RDD

  • RDD1 (1, 1) (1, 2) (1, 3)
  • RDD2 (1, 4) (2, 1) (2, 2)

join 結果: (1 (1, 4)) (1, (2, 4)) (1, (3, 4))

// 模擬集合
List<Tuple2<Integer, String>> studentList = Arrays.asList(
				new Tuple2<Integer, String>(1, "leo"),
				new Tuple2<Integer, String>(2, "jack"),
				new Tuple2<Integer, String>(3, "tom"));
		
List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
				new Tuple2<Integer, Integer>(1, 100),
				new Tuple2<Integer, Integer>(2, 90),
				new Tuple2<Integer, Integer>(3, 60));
		
// 並行化兩個RDD
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);

JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
 val studentScores = students.join(scores)

cogroup
相當於是,一個key join上的所有value,都給放到一個Iterable裡面去了

List<Tuple2<Integer, String>> studentList = Arrays.asList(
				new Tuple2<Integer, String>(1, "leo"),
				new Tuple2<Integer, String>(2, "jack"),
				new Tuple2<Integer, String>(3, "tom"));
		
List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
				new Tuple2<Integer, Integer>(1, 100),
				new Tuple2<Integer, Integer>(2, 90),
				new Tuple2<Integer, Integer>(3, 60),
				new Tuple2<Integer, Integer>(1, 70),
				new Tuple2<Integer, Integer>(2, 80),
				new Tuple2<Integer, Integer>(3, 50));
		
// 並行化兩個RDD
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);

JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = students.cogroup(scores)
val studentScores = students.cogroup(scores)

action操作

reduce
reduce操作對集合中的數字進行累加,就是聚合,將多個元素聚合成一個元素
原理:

首先將第一個和第二個元素,傳入call()方法,進行計算,會獲取一個結果,比如1 + 2 = 3
接著將該結果與下一個元素傳入call()方法,進行計算,比如3 + 3 = 6
以此類推

int sum = numbers.reduce(new Function2<Integer, Integer, Integer>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}

		});
val sum = numbers.reduce(_ + _)

collect
collect,將分佈在遠端叢集上的結果RDD的資料拉取到本地,一般不建議使用,因為如果rdd中的資料量比較大的話,效能會比較差,從遠端走大量的網路傳輸,將資料獲取到本地,在rdd中資料量特別大的情況下,發生oom異常,記憶體溢位。

List<Integer> doubleNumberList = doubleNumbers.collect();

for (Integer num : doubleNumberList) {
	System.out.println(num);
}
val doubleNumberList = numbers.map(number => number * 2).collect()

for (num <- doubleNumberList) {
     println(num)
}

count
對rdd使用count操作,統計它有多少個元素

List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);

long count = numbers.count();
System.out.println(count);
val count = numbers.count()

take
take操作,與collect類似,也是從遠端叢集上,獲取rdd的資料,collect是獲取rdd的所有資料,take只是獲取前n個數據

List<Integer> top3Numbers = numbers.take(3);

for (Integer num : top3Numbers) {
	System.out.println(num);
}
val top3Numbers = numbers.take(3)

saveAsTextFile
直接將rdd中的資料,儲存在HFDS檔案中,注意,我們這裡只能指定資料夾,也就是目錄

List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);

// 使用map操作將集合中所有數字乘以2
JavaRDD<Integer> doubleNumbers = numbers.map(new Function<Integer, Integer>() {

		private static final long serialVersionUID = 1L;

				@Override
				public Integer call(Integer v1) throws Exception {
					return v1 * 2;
				}
		});

// 儲存為目錄中的/double_number/part-00000檔案
doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number");
val doubleNumbers = numbers.map(number => number * 2)

doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt")

countByKey
統計每個key對應的元素個數,countByKey返回的型別,直接就是Map<String, Object>

List<Tuple2<String, String>> scoreList = Arrays.asList(
				new Tuple2<String, String>("class1", "leo"), 
				new Tuple2<String, String>("class2", "jack"), 
				new Tuple2<String, String>("class1", "marry"), 
				new Tuple2<String, String>("class2", "tom"), 
				new Tuple2<String, String>("class2", "david"));

JavaPairRDD<String, String> students = sc.parallelizePairs(scoreList);

Map<String, Object> studentCounts = students.countByKey();
val studentCounts = students.countByKey()

RDD持久化

不使用RDD持久話會出現什麼呢?
不使用RDD持久化的問題的原理

Spark最重要的一個功能,就是在不同操作間,持久化(或快取)一個數據集在記憶體中。當你持久化一個RDD,每一個結點都將把它的計算分塊結果儲存在記憶體中,並在對此資料集(或者衍生出的資料集)進行的其它動作中重用。這將使得後續的動作(action)變得更加迅速(通常快10倍)。快取是用Spark構建迭代演算法的關鍵。RDD的快取能夠在第一次計算完成後,將計算結果儲存到記憶體、本地檔案系統或者Tachyon(分散式記憶體檔案系統)中。通過快取,Spark避免了RDD上的重複計算,能夠極大地提升計算速度。
RDD持久化的工作原理
如何持久化
要持久化一個RDD,只要呼叫其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接快取在每個節點中。而且Spark的持久化機制還是自動容錯的,如果持久化的RDD的任何partition丟失了,那麼Spark會自動通過其源RDD,使用transformation操作重新計算該partition。實際上cache()是使用persist(MEMORY_ONLY)的快捷方法。如果需要從記憶體中清楚快取,那麼可以使用unpersist()方法。

Spark自己也會在shuffle操作時,進行資料的持久化,比如寫入磁碟,主要是為了在節點失敗時,避免需要重新計算整個過程。

SparkConf conf = new SparkConf().setAppName("Persist").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

// cache()或者persist()的使用,是有規則的
// 必須在transformation或者textFile等建立了一個RDD之後,直接連續呼叫cache()或persist()才可以
// 如果你先建立一個RDD,然後單獨另起一行執行cache()或persist()方法,是沒有用的
// 而且,會報錯,大量的檔案會丟失
JavaRDD<String> lines = sc.textFile("spark.txt").cache();

RDD持久化策略
RDD持久化是可以手動選擇不同的策略的。比如可以將RDD持久化在記憶體中、持久化到磁碟上、使用序列化的方式持久化,多持久化的資料進行多路複用。只要在呼叫persist()時傳入對應的StorageLevel即可。

cache()方法使用了預設的儲存級別—StorageLevel.MEMORY_ONLY

在這裡插入圖片描述

儲存級別的選擇
Spark的多個儲存級別意味著在記憶體利用率和cpu利用效率間的不同權衡。推薦通過下面的過程選擇一個合適的儲存級別:

  1. 優先使用MEMORY_ONLY,如果可以快取所有資料的話,那麼就使用這種策略。因為純記憶體速度最快,而且沒有序列化,不需要消耗CPU進行反序列化操作。
  2. 如果MEMORY_ONLY策略,無法儲存的下所有資料的話,那麼使用MEMORY_ONLY_SER,將資料進行序列化進行儲存,純記憶體操作還是非常快,只是要消耗CPU進行反序列化
  3. 如果需要進行快速的失敗恢復,那麼就選擇帶字尾為_2的策略,進行資料的備份,這樣在失敗時,就不需要重新計算了。
  4. 能不使用DISK相關的策略,就不用使用,有的時候,從磁碟讀取資料,還不如重新計算一次。

在這裡插入圖片描述

注意只能設定一種:不然會拋異常: Cannot change storage level of an RDD after it was already assigned a level

如何使用快取

//記憶體
JavaRDD<String> lines = sc.textFile("spark.txt").persist(StorageLevel.MEMORY_ONLY());

在這裡插入圖片描述

//磁碟儲存
JavaRDD<String> lines = sc.textFile("spark.txt").persist(StorageLevel.DISK_ONLY())

在這裡插入圖片描述

清除快取

lines.unpersist();

在這裡插入圖片描述