1. 程式人生 > >Spark重點難點知識總結(二)

Spark重點難點知識總結(二)

1.join:join函式主要用來拼接字串,將字串、元組、列表中的元素以指定的字元(分隔符)連線生成一個新的字串。

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
scala> rdd1.join(rdd2).collect
res: Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))

2.countByKey:統計每個key對應的value個數。

val scores=Array(Tuple2(1,100),Tuple2(2,90),Tuple2(3,100),Tuple2(2,90),Tuple2(3,100))
val content=sc.parallelize(scores)
val data=content.countByKey()
res:data: scala.collection.Map[Int,Long] = Map(1 -> 1, 3 -> 2, 2 -> 2)

3.寬依賴和窄依賴:窄依賴是指每個父RDD的一個partition最多被子RDD的一個partition所使用,例如map,filter,union等都會產生窄依賴。窄依賴,1對1,n對1。寬依賴是指一個父RDD的Partition會被多個子RDD的Partition所使用,例如groupByKey、ReduceByKey、sortByKey等操作都會產生寬依賴。總結:如果父RDD的一個Partition被一個子RDD的Partition所使用就是窄依賴,否則的話就是寬依賴。

4.DataFrame與RDD:DataFrame是一種分散式二維資料結構,R和Python語言中都有DataFrame,Spark中的DataFrame最大的不同點是其天生是分散式的,可以簡單的認為Spark中的DataFrame是一個分散式的Table,形式如下所示。


而RDD型別為

簡單來說,RDD是一個個Person例項,RDD並不知道里面有什麼型別的資料。

(1)RDD以Record為單位,Spark在優化的時候無法洞悉Record內部的細節,所以也就無法進行更深度的優化,這極大的限制了Spark SQL效能的提示。
(2)DataFrame包含了每個Record的Metadata資訊,也就是說DataFrame的優化是基於列內部的優化,而不是像RDD一樣,只能夠基於行進行優化。


5.RDD轉DataFrame

import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;
/*
 * 使用反射的方式將RDD轉化為DataFrame*/
public class RDD2DataFrame {
    public static void main(String[] args) {
	SparkConf conf=new SparkConf().setAppName("RDD2DataFrame").setMaster("local");
	JavaSparkContext sc=new JavaSparkContext(conf);
	SQLContext sqlContext=new SQLContext(sc);
	JavaRDD<String> lines=sc.textFile("C://Users//Jason Shu//Desktop//persons.txt");
        JavaRDD<Person> persons=lines.map(new Function<String, Person>()
             {
                   public Person call(String line) throws Exception {
			String[] splited=line.split(",");
			Person p =new Person();
			p.setId(Integer.valueOf(splited[0].trim()));
			p.setName(splited[1]);
			p.setAge(Integer.valueOf(splited[0].trim()));
			return p;
		}
	});
        DataFrame df= sqlContext.createDataFrame(persons, Person.class);
        df.registerTempTable("persons");//註冊一張臨時表
        DataFrame bigData=sqlContext.sql("select * from persons where age >=6");
        JavaRDD<Row> bigDataRDD=bigData.javaRDD();
        JavaRDD<Person> result=bigDataRDD.map(new Function<Row, Person>() {
	   public Person call(Row row) throws Exception {
		Person p =new Person();
		p.setId(row.getInt(0));
		p.setName(row.getString(1));
		p.setAge(row.getInt(2));
		return p;
	    }
           });
       List<Person> personList=result.collect();
       for(Person p:personList){
	   System.out.println(p);
         }   
       }
      }


   轉換過程示意圖

6.Spark SQL和DataFrame:Spark SQL 是 Spark 生態系統裡用於處理結構化大資料的模組,該模組裡最重要的概念就是DataFrame, Spark 的 DataFrame 是基於早期版本中的 SchemaRDD,所以很自然的使用分散式大資料處理的場景。DataFrame 以 RDD 為基礎,但是帶有 Schema 資訊,它類似於傳統資料庫中的二維表格。Spark SQL 模組目前支援將多種外部資料來源的資料轉化為 DataFrame,並像操作 RDD 或者將其註冊為臨時表的方式處理和分析這些資料。當前支援的資料來源有:Json,文字檔案,RDD,關係資料庫,Hive,Parquet。一旦將 DataFrame 註冊成臨時表,我們就可以使用類 SQL 的方式操作這些資料。Spark SQL的表資料在記憶體中儲存不是採用原生態的JVM物件儲存方式,而是採用記憶體列儲存,如下圖所示。

7.Schema的方式建立DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}

object schemaDataFrame {
  def main(args:Array[String]): Unit ={
    val conf=new SparkConf()
    conf.setMaster("local").setAppName("schemaDataFrame ")
    val sc=new SparkContext(conf)
    val RowRDD =sc.textFile("C://Users//Jason Shu//Desktop//InputFile.txt").map(x=>x.split(" ")).map(p=>Row(p(0),p(1)))
    val sqlContext=new SQLContext(sc)
    val peopleSchema=StructType(Array(
      StructField("name", StringType, true),
      StructField("age", IntegerType, true),
      StructField("sex", BooleanType, true)
            ))
    val peopleDataFrame = sqlContext.createDataFrame(RowRDD, peopleSchema)//建立DataFrame,第一個引數為Row[RDD],第二個引數為StructType
    peopleDataFrame.registerTempTable("people")//表的名字隨便取一個
    val results = sqlContext.sql("SELECT name FROM people")
    results.map(t => "name: " + t(0)).collect().foreach(println)
  }
}

8.first():Return the first element in this RDD,first返回RDD中的第一個元素,不排序。

scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21
 
scala> rdd1.first
res14: (String, String) = (A,1)
 
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21
 
scala> rdd1.first
res8: Int = 10

9.contains():Returns true if and only if this string contains the specified sequence of char values,當且僅當此字串包含指定的char值序列返回true

10.parallellize():在一個已經存在的Scala集合上建立的RDD, 集合的物件將會被拷貝,創建出一個可以被並行操作的分散式資料集。

val data = List(1, 2, 3, 4, 5)  

val distData = sc.parallelize(data)//distData此時為RDD[Int]


11.socketTextStream():相當於Socket客戶端,裡面的引數就是socket伺服器的ip和埠,執行該語句的時候就向socket伺服器傳送了建立請求了。伺服器端接受到了請求就可以給socketTextStream傳送訊息了

12.filter():使用filter方法,你可以篩選出集合中你需要的元素,形成一個新的集合。

val x = List.range(1, 10)
     
val evens = x.filterNot(_ % 2 == 0)

Res:evens: List[Int] = List(1, 3, 5, 7, 9)

13.String.valueOf():要把引數中給的值,轉化為String型別,這裡的引數是Any,任意的引數都可以。

14.Integer.parseInt:將整數的字串,轉化為整數。

val b="123"
  
val a=Integer.parseInt(b)
  
println(a)//列印結果123

15.collect():將RDD轉成Scala陣列,並返回。

16.Spark中的partition:提供一種劃分資料的依據。例如wordcount程式中的:

val lines=sc.textFile(path, 8)
這個地方的8就是指8個分割槽,當然如果資料量不夠或不夠複雜,可以不分為8個。