1. 程式人生 > >Spark2.0學習(二)--------RDD詳解

Spark2.0學習(二)--------RDD詳解

新增針對scala檔案的編譯外掛

------------------------------

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.it18zhang</groupId>
<artifactId>SparkDemo1</artifactId>
<version>1.0-SNAPSHOT</version>

<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<recompileMode>incremental</recompileMode>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
</project>

 

RDD:----------------
是spark的基本資料結構,是不可變資料集。RDD中的資料集進行邏輯分割槽,每個分割槽可以單獨在叢集節點
進行計算。可以包含任何java,scala,python和自定義型別。

RDD是隻讀的記錄分割槽集合。RDD具有容錯機制。

建立RDD方式,一、並行化一個現有集合。

hadoop 花費90%時間使用者rw。、

記憶體處理計算。在job間進行資料共享。記憶體的IO速率高於網路和disk的10 ~ 100之間。

內部包含5個主要屬性
-----------------------
1.分割槽列表
2.針對每個split的計算函式。
3.對其他rdd的依賴列表
4.可選,如果是KeyValueRDD的話,可以帶分割槽類。
5.可選,首選塊位置列表(hdfs block location);

//預設併發度
local.backend.defaultParallelism() = scheduler.conf.getInt("spark.default.parallelism", totalCores)
taskScheduler.defaultParallelism = backend.defaultParallelism()
sc.defaultParallelism =...; taskScheduler.defaultParallelism
defaultMinPartitions = math.min(defaultParallelism, 2)
sc.textFile(path,defaultMinPartitions) //1,2

RDD變換

------------------
返回指向新rdd的指標,在rdd之間建立依賴關係。每個rdd都有計算函式和指向父RDD的指標。

map() //對每個元素進行變換,應用變換函式
//(T)=>V


filter() //過濾器,(T)=>Boolean
flatMap() //壓扁,T => TraversableOnce[U]

mapPartitions() //對每個分割槽進行應用變換,輸入的Iterator,返回新的迭代器,可以對分割槽進行函式處理。
//Iterator<T> => Iterator<U>

mapPartitionsWithIndex(func) //同上,(Int, Iterator<T>) => Iterator<U>

sample(withReplacement, fraction, seed) //取樣返回取樣的RDD子集。
//withReplacement 元素是否可以多次取樣.
//fraction : 期望取樣數量.[0,1]

union() //類似於mysql union操作。
//select * from persons where id < 10
//union select * from id persons where id > 29 ;

intersection //交集,提取兩個rdd中都含有的元素。
distinct([numTasks])) //去重,去除重複的元素。

groupByKey() //(K,V) => (K,Iterable<V>)

reduceByKey(*) //按key聚合。

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
//按照key進行聚合
key:String U:Int = 0

sortByKey //排序

join(otherDataset, [numTasks]) //連線,(K,V).join(K,W) =>(K,(V,W))

cogroup //協分組
//(K,V).cogroup(K,W) =>(K,(Iterable<V>,Iterable<!-- <W> -->))
cartesian(otherDataset) //笛卡爾積,RR[T] RDD[U] => RDD[(T,U)]

pipe //將rdd的元素傳遞給指令碼或者命令,執行結果返回形成新的RDD
coalesce(numPartitions) //減少分割槽
repartition //可增可減
repartitionAndSortWithinPartitions(partitioner)
//再分割槽並在分割槽內進行排序


RDD Action
------------------
collect() //收集rdd元素形成陣列.
count() //統計rdd元素的個數
reduce() //聚合,返回一個值。
first //取出第一個元素take(1)
take //
takeSample (withReplacement,num, [seed])
takeOrdered(n, [ordering])

saveAsTextFile(path) //儲存到檔案
saveAsSequenceFile(path) //儲存成序列檔案

saveAsObjectFile(path) (Java and Scala)

countByKey() //按照key,統計每個key下value的個數.

spark整合hadoop ha
-------------------------
1.複製core-site.xml + hdfs-site.xml到spark/conf目錄下
2.分發檔案到spark所有work節點
3.啟動spark叢集
4.啟動spark-shell,連線spark叢集上
$>spark-shell --master spark://s201:7077
$scala>sc.textFile("hdfs://mycluster/user/centos/test.txt").collect();