1. 程式人生 > >第14課:spark RDD解密學習筆記

第14課:spark RDD解密學習筆記

第14課:spark RDD解密學習筆記

本期內容:

1.RDD:基於工作集的應用抽象

2.RDD內幕解密

3.RDD思考

精通了RDD,學習Spark的時間大大縮短。解決問題能力大大提高,

徹底把精力聚集在RDD的理解上,SparkStreaming、SparkSQL、SparkML底層封裝的都是RDD。

RDD是spark的基石,

1) RDD提供了通用的 抽象

2)  現在Spark有5個子框架SparkStreaming、SparkSQL、SparkML、GraphX、SparkR,可以根據自己從事的領域如醫療等建模後建立另外的庫。

所有頂級spark高手:

1解決bug,效能調優。包括框架的BUG及對框架的修改。

2.拿spark就是做修改的,以配合自己從事的具體業務

1. Hadoop的mr是基於資料集的處理

基於工作集&基於資料集的共同特徵:位置感知,容錯,負載均衡,

基於資料集的處理工作方式是從物理儲存裝置上載入資料,操作資料,寫入物理儲存裝置。

spark也是mr的一種方式,只是更細緻更高效。

其實基於資料集的方式也是一張有向無環圖。但與基於工作集不同。

基於資料集的方式每次都從物理儲存讀取資料操作資料然後寫回物理裝置。

hadoop的mr的劣勢、不適用的場景:

1.含有大量迭代演算法。

2.互動式查詢。重點是:基於資料流的方式不能複用曾經的結果或中間計算結果。

假設有數千人併發資料倉庫,假設100人的查詢完全相同,那麼每個人都需要重新查詢。Spark就可以避免,因為可以複用。

spark會對結果進行重用

假如有一千人查詢同一個資料倉庫

spark的話,如果第一個人計算過的步驟,其他人都可以複用。

RDD是基於工作集的,除了有共同特點外,還增加了resillient Distributed Dataset

RDD彈性:

1:自動的進行記憶體和磁碟資料儲存的切換

2.基於lineage的高效容錯

3.task失敗會自動進行特定次數的重試

4.stage如果失敗會自動進行特定次數的重試而且重試時只會試算失敗的分片。

5.checkpoint和persist,是效率和容錯的延伸。

6.資料排程彈性:DAG TASK和資源管理無關

7.資料分片的高度彈性

計算過程中有很多資料碎片,那麼Partition就會非常小,每個Partition都會由一個執行緒處理,就會降低處理效率。這時就要考慮把小檔案合併成一個大檔案。

另外一個方面,如果記憶體不多,而每個Partition比較大(資料Block大),就要考慮變成更小的分片,Sparke有更多的處理批次但不會出現OOM。

所以說根據資料分片的大小來提高並行度或降低並行度也是Spark高度彈性的表現。同時需要指出的是,不管是提高並行度還是降低並行度,仍具有資料本地性。當然,提高並行度還是降低度行度都是人工通過程式碼來調整的。

假設有一百萬個數據分片,每個資料分片都非常小(1K或10KB),如果要把資料分片調整為一萬個,如果使用repartition,就需要Shuffle。

從原始碼RDD.scala中的repartition方法可以看到內部呼叫的是coalesce,傳入的引數是shuffle並設定為true。原始碼如下:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {

    coalesce(numPartitions, shuffle = true)

  }

所以,如果要把很多小的資料分片合併成大的資料分片的話千萬不要直接呼叫repartition,而要呼叫coalesce,coalesce預設的shufflefalsecoalesce的原始碼如下:

  /**

   * Return a new RDD that is reduced into `numPartitions` partitions.

   *

   * This results in a narrow dependency, e.g. if you go from 1000 partitions

   * to 100 partitions, there will not be a shuffle, instead each of the 100

   * new partitions will claim 10 of the current partitions.

   *

   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,

   * this may result in your computation taking place on fewer nodes than

   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,

   * you can pass shuffle = true. This will add a shuffle step, but means the

   * current upstream partitions will be executed in parallel (per whatever

   * the current partitioning is).

   *

   * Note: With shuffle = true, you can actually coalesce to a larger number

   * of partitions. This is useful if you have a small number of partitions,

   * say 100, potentially with a few partitions being abnormally large. Calling

   * coalesce(1000, shuffle = true) will result in 1000 partitions with the

   * data distributed using a hash partitioner.

   */

  def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)

      : RDD[T] = withScope {

    if (shuffle) {

      /** Distributes elements evenly across output partitions, starting from a random partition. */

      val distributePartition = (index: Int, items: Iterator[T]) => {

        var position = (new Random(index)).nextInt(numPartitions)

        items.map { t =>

          // Note that the hash code of the key will just be the key itself. The HashPartitioner

          // will mod it with the number of total partitions.

          position = position + 1

          (position, t)

        }

      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed

      new CoalescedRDD(

        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),

        new HashPartitioner(numPartitions)),

        numPartitions).values

    } else {

      new CoalescedRDD(this, numPartitions)

    }

  }

如果要把一萬個資料分片變成一百萬個的話,可以用shuffle,也可以不用shuffle

RDD允許使用者在執行多個查詢時顯式地將工作集快取在記憶體中,如果有其他人執行同樣的查詢的話就可以存入工作集,這就極大地提高了效率。特別是在資料倉庫中,如果有一千人做同樣的查詢,第一個人在查詢之後,每二個人查詢時就可以直接從快取中取結果。退一步說,如果一千人執行的查詢只有前10個步驟是一樣的,如果第一個人計算完成後,後面的人的前10步就不需要再計算了。這就極大地提升了查詢速度。

如果是Hadoop的話一千個人執行同樣的查詢,就需要重複計算一千次。

快取隨時可以清理掉,如果記憶體或磁碟不足就需要根據優先度將不常使用的快取內容清理掉。

RDDcache是直接放在記憶體中的。RDDcache通過checkpoint來清除。但checkpoint是重量級的。

SparkStreaming經常進行Checkpoint,原因是經常要用到以前的內容。假設要統計一段時間的內容,那就需要以前的資料。

如果Spark的一個Stage中有一千個步驟的話,預設只會產生一次結果。如果是HadoopMR就會產生999次中間結果,如果資料量很大的話,記憶體和磁碟都可能存不下。

Spark本身就是RDD的內容,RDD是隻讀分割槽的集合。RDD是資料集合,可以簡單理解為ListArray

用一句話概括:RDD是分散式函數語言程式設計的抽象。基於RDD的程式設計一般都是通過高階函式的方式,原因是函式裡傳函式要對當前的Map函式作用的資料集進行記錄的明細化操作。

Spark的每一步操作都是對RDD進行操作,而RDD是隻讀分割槽的集合。

由於每一次操作都是隻讀的,而操作會改變資料,那麼產生中間結果怎麼辦?

=>不能立即計算。這就是lazy:不用時不算,用時才計算,所以不會產生中間結果

RDD的核心之一就是它的Lazy,因為這不計算,開始時只是對資料處理進行標記而已。例如WordCount中的map、flatmap其實並不計算資料,只是對資料操作的標記而已。

flatMap的原始碼如下:

/**

   *  Return a new RDD by first applying a function to all elements of this

   *  RDD, and then flattening the results.

   */

  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {

    val cleanF = sc.clean(f)

    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))

  }

可以看出在flatMap中建立了一個MapPartitionsRDD,但第一個引數是this,這個this是指它依賴的父RDD。每次建立新的RDD都會把父RDD作為第一個引數傳入。而這只是對資料處理的標記而已。這讓我們聯想到一種操作:

f(x)=x+2

x=y+1

y=z+3

執行時只是函式展開。

所以RDD每次構建物件都依賴於父RDD,最後就是函式展開。所以如果一個Stage中有一千個步驟的話,不會產生999次中間結果。

以前有人說spark不適合大規模計算,當時確實有其道理,主要有兩點原因:

1)Spark一直基於記憶體迭代會消耗大量記憶體。例如如一千個步驟雖然不產生中間結果,但如果要複用別人的結果時就需要手動persist或cache,這確實非常消耗記憶體。

2)主要是因為shuffle機制。但現在Shuffle支援很多種機制,如hashShuffle、sortBasedShuffle、鎢絲計劃等,而且現在Shuffle只是一個介面,一個外掛,可以自定義,所以可以應對任意規模的資料處理。

Spark1.2以前確實有規模的限制,是由其Shuffle機制導致的,但現在沒有了。現在生產環境spark最低要1.3版本,因為Spark1.3引入了dataframe,這是里程碑式的。推薦使用Spark1.6

由於RDD是隻讀的,為了應對計算模型,RDD又是lazy級別的。每次操作都會產生RDD,每次構建新的RDD都是把父RDD作為第一個引數傳入,這就構成了一個鏈條。在最後Action時才觸發,這就構成了一個從後往前回溯的過程,其實就是函式展開的過程。

由於這種從後往前的回溯機制,Spark的容錯的開銷會非常低。

常規容錯方式:

1)資料檢查點checkpoint

2)記錄資料的更新

資料檢查點非常致命。資料檢查點的工作方式是通過資料中心的網路連線不同的機器,每次操作時都要複製整個資料集,就相當於每次都有一個拷貝,拷貝是要通過網路的,而網路頻寬就是分散式系統的瓶頸。同時因為要拷貝,就需要重組資源,這也是對效能的非常大的消耗。

記錄資料的更新:每次資料變化時作記錄,不需要拷貝,但1)比較複雜,而且每次更新資料需要許可權,容易失控。2)耗效能。

spark的RDD就是記錄資料更新的方式,但為何高效?

1)RDD是不可變的而且是lazy的。由於RDD是不可變的,所以每次操作時就要產生新的RDD,新的RDD將父RDD作為第一個引數傳入,所以不存在全域性修改的問題,控制難度就有極大的下降。計算時每次都是從後往前回溯,不會產生中間結果。在此基礎上還有計算鏈條,出錯可以從中間開始恢復。

恢復點要麼是checkpoint要麼是前一個stage的結果(因為Stage結束時會自動寫磁碟)

2)如果每次對資料進行很小的修都要記錄,那代價很大。RDD是粗粒度的操作:原因是為了效率為了簡化。粗粒度就是每次操作時作用的都是所有的資料集合。細粒度代價太大。

對RDD的具體的資料的改變操作都是粗粒度的。--RDD的寫操作是粗粒度的。但RDD的讀操作既可以是粗粒度的又可以是細粒度的。

RDD的粗粒度的寫操作限制了RDD的應用場景。例如網路爬蟲就不適合sparkRDD。但現實中大資料處理場景大部分都是粗粒度的。特別是支援資料並行批處理的應用,例如機器學習,圖計算,資料探勘,都是在很多記錄上進行相應操作,都是粗粒度的表現。

RDD不適合做細粒度和非同步更新的應用。

如果想讓Spark直接操作MYSQL的資料或者操作HBASE資料就需要複寫RDD。

RDD的資料分片上執行的計算邏輯都是一樣的。對於每個計算邏輯都有計算函式compute

def compute(split: Partition, context: TaskContext): Iterator[T]

所有的RDD操作返回的都是一個迭代器。如Map/flatMap等。這樣的好處:假設用SparkSQL提取到了資料,產生了新的RDD,機器學習去訪問這個RDD,但根本不需要知道這是來自於SparkSQL。這就可以讓所有的框架無縫整合。結果就是機器學習可以直接呼叫SparkSQL,流處理也可以用機器學習進行訓練。因為無論是什麼操作返回的都是Iterator。所以就可以用hasNext來看看有沒有下一個元素,然後通過Next讀取下一個元素。Next具體怎麼讀取下一個元素和具體RDD實現有關。

Iterator的部分原始碼(scala.collection.Iterator):

trait Iterator[+Aextends TraversableOnce[A] {
  self =>

  def seq: Iterator[A] = this/** Tests whether this iterator can provide another element.   *   *  @return  `true` if a subsequent call to `next` will yield an element,   *           `false` otherwise.   *  @note    Reuse: $preservesIterator   */def hasNext: Boolean

  /** Produces the next element of this iterator.   *   *  @return  the next element of this iterator, if `hasNext` is `true`,   *           undefined behavior otherwise.   *  @note    Reuse: $preservesIterator   */def next(): A/** Tests whether this iterator is empty.   *   *  @return   `true` if hasNext is false, `false` otherwise.   *  @note     Reuse: $preservesIterator   */def isEmpty: Boolean = !hasNext

無論是什麼操作,返回的結果都是Iterator介面,面向介面程式設計時能不能操作SparkSQL/RDD的子類的方法?

=>Spark可以,java不可。原因是有this.type

從Java的角度講,面向介面程式設計不能呼叫子類的方法,但如果是this.type有執行時的支援,this.type會指向具體的子類,這樣就可以呼叫子類的方法。SparkStreaming可以呼叫ML的子類進行訓練,RDD本身是個abstract class,與機器學習等的演算法無關。但由於有了this.type可以通過runtime把例項賦值給RDD,這樣就可以操作了。

   如果開發了一個自己領域的子框架,例如金融領域,這個子框架就可以直接在程式碼中呼叫機器學習,呼叫圖計算進行風險預測、個性化分析、行為模式分析等,也可以呼叫SparkSQL用於資料探勘。同時機器學習也可以呼叫金融框架。

又例如開發一個電商框架,那麼使用者支付時又可以直接呼叫金融框架。

就是說每增加一個功能就會讓所有的功能都增強。每提出一個新的框架都可以使用其他所有的功能。這是核聚變級別的。

Spark的所有子框架都是基於RDD的,只不過是子類而已。

下面再看一下RDD.scala類中的preferredLocation的原始碼:

final def preferredLocations(split: Partition): Seq[String] = {
  checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
    getPreferredLocations(split)
  }
}

分散式大資料計算時優先考慮資料不動程式碼動,由於有了preferredLocation,可以說spark不僅可以處理大資料,Spark可以處理一切資料。可以處理資料的資料,可以處理普通檔案系統上的資料,可以在Linux上執行,也可以在Windows上執行,可以執行一切檔案格式。

由於preferredLocation,所以每次計算都符合完美的資料本地性。Spark要做一體化多元化的資料通用處理框架,相容一切檔案系統一切作業系統一切檔案格式。

Spark計算更快,運算元更豐富,使用更簡單,一統資料處理天下。

IBM在2016年6月16日宣佈承諾大力推進Apache Spark專案,並稱該專案為:在以資料為主導的,未來十年最為重要的新的開源專案。這一承諾的核心是將Spark嵌入IBM業內領先的分析和商務平臺,並將Spark作為一項服務,在IBM Bluemix平臺上提供給客戶。IBM還將投入超過3500名研究和開發人員在全球十餘個實驗室開展與Spark相關的專案,並將為Spark開源生態系統無償提供突破性的機器學習技術——IBM SystemML,同時,IBM還將培養超過100萬名Spark資料科學家和資料工程師。

因為Spark是執行在JVM上的,一切能執行在JVM上的資料Spark都能處理。

只有一點:spark替代不了實時事務處理。如銀行轉帳等,因為Spark反應還不夠快,而且實時事務性處理控制難度比較大。

Spark完全可以做實時處理。SparkStreaming可以達到1ms內的響應速度(官方200ms)。

spark要統一資料計算領域除了實時事務性處理。

下面再看一下RDD.scala中的dependencies的原始碼:

/** * Get the list of dependencies of this RDD, taking into account whether the * RDD is checkpointed or not. */final def dependencies: Seq[Dependency[_]] = {
  checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
    if (dependencies_ == null) {
      dependencies_ = getDependencies
    }
    dependencies_}
}

後面的RDD對前面的RDD都有依賴,所以容錯性非常好。

下面再看一下RDD.scla中的partitions的原始碼:

/** * Get the array of partitions of this RDD, taking into account whether the * RDD is checkpointed or not. */final def partitions: Array[Partition] = {
  checkpointRDD.map(_.partitions).getOrElse {
    if (partitions_ == null) {
      partitions_ = getPartitions
    }
    partitions_}
}

進行下一步操作時可以改變並行度。並行度是彈性的一部分。

RDD的缺陷:

不支援細粒度的更新操作和增量迭代計算(如網路爬蟲)

增量迭代時每次可能只迭代一部分資料,但RDD是粗粒度的,無法考慮是不是隻是一部分資料。

Jstorm支援增量迭代計算,是用Java的方式重構的Storm(由阿里開發)

真相給你自由,人的一切痛苦都源於不瞭解真相。所以必須瞭解Spark的真相,在工作時才能自由。

以上內容是王家林老師DT大資料夢工廠《 IMF傳奇行動》第14課的學習筆記。
王家林老師是Spark、Flink、DockerAndroid技術中國區佈道師。Spark亞太研究院院長和首席專家,DT大資料夢工廠創始人,Android軟硬整合原始碼級專家,英語發音魔術師,健身狂熱愛好者。

微信公眾賬號:DT_Spark

電話:18610086859

QQ:1740415547

微訊號:18610086859  

新浪微博:ilovepains

百度百科關於Java事務處理的參考資料:

java事務處理

編輯

Java的事務處理,如果對資料庫進行多次操作,每一次的執行或步驟都是一個事務.如果資料庫操作在某一步沒有執行或出現異常而導致事務失敗,這樣有的事務被執行有的就沒有被執行,從而就有了事務的回滾,取消先前的操作.....

中文名

JAVA事件處理

外文名

Java Transaction Processing

簡介

編輯

Java的事務處理,如果對資料庫進行多次操作,每一次的執行或步驟都是一個事務.如果資料庫操作在某一步沒有執行或出現異常而導致事務失敗,這樣有的事務被執行有的就沒有被執行,從而就有了事務的回滾,取消先前的操作.....

詳細說明

編輯

Java中使用事務處理,首先要求資料庫支援事務。如使用MySQL的事務功能,就要求MySQL的表型別為Innodb才支援事務。否則,在Java程式中做了commitrollback,但在資料庫中根本不能生效。

JavaBean中使用JDBC方式進行事務處理

public int delete(int sID) {

dbc = new DataBaseConnection();

Connection con = dbc.getConnection();

try {

con.setAutoCommit(false);// 更改JDBC事務的預設提交方式

dbc.executeUpdate("delete from xiao where ID=" + sID);

dbc.executeUpdate("delete from xiao_content where ID=" + sID);

dbc.executeUpdate("delete from xiao_affix where bylawid=" + sID);

con.commit();//提交JDBC事務

con.setAutoCommit(true);// 恢復JDBC事務的預設提交方式

dbc.close();

return 1;

}

catch (Exception exc) {

con.rollBack();//回滾JDBC事務

exc.printStackTrace();

dbc.close();

return -1;

}

}

在資料庫操作中,一項事務是指由一條或多條對資料庫更新的sql語句所組成的一個不可分割的工作單元。只有當事務中的所有操作都正常完成了,整個事務才能被提交到資料庫,如果有一項操作沒有完成,就必須撤消整個事務。

例如在銀行的轉帳事務中,假定張三從自己的帳號上把1000元轉到李四的帳號上,相關的sql語句如下:

update account set monery=monery-1000 where name='zhangsan'

update account set monery=monery+1000 where name='lisi'

這個兩條語句必須作為一個完成的事務來處理。只有當兩條都成功執行了,才能提交這個事務。如果有一句失敗,整個事務必須撤消。

connection類中提供了3個控制事務的方法:

1) setAutoCommit(Boolean autoCommit):設定是否自動提交事務;

2) commit();提交事務;

3) rollback();撤消事務;

jdbc api中,預設的情況為自動提交事務,也就是說,每一條對資料庫的更新的sql語句代表一項事務,操作成功後,系統自動呼叫commit()來提交,否則將呼叫rollback()來撤消事務。

jdbc api中,可以通過呼叫setAutoCommit(false) 來禁止自動提交事務。然後就可以把多條更新資料庫的sql語句做為一個事務,在所有操作完成之後,呼叫commit()來進行整體提交。倘若其中一項 sql操作失敗,就不會執行commit()方法,而是產生相應的sqlexception,此時就可以捕獲異常程式碼塊中呼叫rollback()方法撤消事務。

事務處理是企業應用需要解決的最主要的問題之一。J2EE通過JTA提供了完整的事務管理能力,包括多個事務性資源的管理能力。但是大部分應用都是執行在單一的事務性資源之上(一個數據庫),他們並不需要全域性性的事務服務。本地事務服務已然足夠(比如JDBC事務管理)。

事務的特性

編輯

Atomic原子性、Consistency一致性、Isolation隔離性和Durability永續性。

原子性:指整個事務是不可以分割的工作單元。只有事務中所有的操作執行成功,才算整個事務成功,事務中任何一個SQL語句執行失敗,那麼已經執行成功的SQL語句也必須撤銷,資料庫狀態應該回到執行事務前的狀態。

一致性:指資料庫事務不能破壞關係資料的完整性以及業務邏輯上的一致性。例如對於銀行轉賬事務,不管事務成功還是失敗,應該保證事務結束後兩個轉賬賬戶的存款總額是與轉賬前一致的。

隔離性:指的是在併發環境中,當不同的事務同時操縱相同的資料時,每個事務都有各自的完整資料空間。

永續性:指的是隻要事務成功結束,它對資料庫所做的更新就必須永久儲存下來。即使發生系統崩潰,重新啟動資料庫系統後,資料庫還能恢復到事務成功結束時的狀態。

本文並不討論應該採用何種事務處理方式,主要目的是討論如何更為優雅地設計事務服務。僅以JDBC事務處理為例。涉及到的DAOFactory,Proxy,Decorator等模式概念,請閱讀相關資料

另有一篇不錯的關於事務處理的文章:http://www.cnblogs.com/bicabo/archive/2011/11/14/2248044.html

相關推薦

14spark RDD解密學習筆記

第14課:spark RDD解密學習筆記 本期內容: 1.RDD:基於工作集的應用抽象 2.RDD內幕解密 3.RDD思考 精通了RDD,學習Spark的時間大大縮短。解決問題能力大大提高, 徹底把精力聚集在RDD的理解上,SparkStreaming、SparkSQL、

14spark RDD彈性表現和來源,容錯

hadoop 的MapReduce是基於資料集的,位置感知,容錯 負載均衡  基於資料集的處理:從物理儲存上載入資料,然後操作資料,然後寫入物理儲存裝置;  基於資料集的操作不適應的場景:  1,不適合於大量的迭代  2,互動式查詢

67Spark SQL下采用Java和Scala實現Join的案例綜合實戰(鞏固前面學習Spark SQL知識)

內容:     1.SparkSQL案例分析     2.SparkSQL下采用Java和Scala實現案例 一、SparkSQL下采用Java和Scala實現案例 學生成績: {"name":"Michael","score":98} {"name":"Andy"

72Spark SQL UDF和UDAF解密與實戰

內容:     1.SparkSQL UDF     2.SparkSQL UDAF 一、SparkSQL UDF和SparkSQL UDAF     1.解決SparkSQL內建函式不足問題,自定義內建函式,     2.UDF:User Define Functio

71Spark SQL視窗函式解密與實戰

內容:     1.SparkSQL視窗函式解析     2.SparkSQL視窗函式實戰 一、SparkSQL視窗函式解析     1.spark支援兩種方式使用視窗函式:  &nb

70Spark SQL內建函式解密與實戰

內容:     1.SparkSQL內建函式解析     2.SparkSQL內建函式實戰 一、SparkSQL內建函式解析     使用Spark SQL中的內建函式對資料進行分析,Spark

42 Spark Broadcast內幕解密Broadcast執行機制徹底解密、Broadcast原始碼解析、Broadcast最佳實踐

第42課:  Spark Broadcast內幕解密:Broadcast執行機制徹底解密、Broadcast原始碼解析、Broadcast最佳實踐Broadcast在機器學習、圖計算、構建日常的各種演算法中到處可見。 Broadcast就是將資料從一個節點發送到其它的節點上;

大資料IMF傳奇行動絕密課程63Spark SQL下Parquet內幕深度解密

Spark SQL下Parquet內幕深度解密 1、Spark SQL下的Parquet意義再思考 2、Spark SQL下的Parquet內幕揭祕 一、Spark SQL下的Parquet意義再思考 1、如果說HDFS是大資料時代分散式檔案系統儲存的事

12spark高可用(HA)框架

worker管理資源:記憶體,cpu 只有standby模式的master變成active模式時才能想叢集提交任務,master切換過程不會影響程式的執行 原因:程式在具體執行之前已經向叢集申請過資源,這些資源已經提交給driver了,也就是說已經分配好資源了,這是粗粒度分配,一次性分配

68Spark SQL通過JDBC操作MySQL

內容:     1.SparkSQL操作關係資料庫意義     2.SparkSQL操作關係資料庫 一、通過SparkSQL操作關係資料庫意義     1.SparkSQL可以通過jdbc從傳統關係型資料庫中讀寫資料,讀取資料後直接生成DataFrame,然後在加上藉助

73Spark SQL Thrift Server實戰

內容:     1.SparkSQL Thrift解析與測試     2.SparkSQL Thrift Server JDBC程式設計 一、SparkSQL Thrift解析與測試     ThriftServer是一個JDBC/ODBC介面,使用者可以通過JDBC/

4Spark Streaming的Exactly Once的事務處理

本期內容: Exactly once 輸出不重複 Exactly once 1,事務一定會被處理,且只被處理一次; 2,輸出能夠輸出且只會被輸出。 Receiver:資料通過BlockManager寫入記憶體+磁碟或者通過WAL來保證資料的安全性。WAL機制:寫資料

80Spark SQL網站搜尋綜合案例實戰

內容:     1.案例分析     2.案例實戰 一、案例分析     專案:以京東找出搜尋平臺排名的產品,The hottest     元資料:date,u

79Spark SQL基於網站Log的綜合案例綜合程式碼和實際執行測試

內容:     1.熱門論壇板塊排名     2.綜合程式碼實戰和測試 一、熱門論壇板塊排版 建立表:     spark.sql("createtable userlogs(date st

76Spark SQL實戰使用者日誌的輸入匯入Hive及SQL計算PV實戰

內容:     1.Hive資料匯入操作     2.SparkSQL對資料操作實戰 一、Hive資料匯入操作 create table userLogs(date String,timestamp bigint,userI

75Spark SQL基於網站Log的綜合案例實戰

內容:     1.案例概述     2.資料來源和分析 一、案例概述     PV:頁面訪問數     UV:獨立訪問數 二、資料來源和分析 packag

69Spark SQL通過Hive資料來源實戰

內容:     1.Spark SQL操作Hive解析     2.SparkSQL操作Hive實戰 一、Spark SQL操作Hive解析     1.在目前企業級大資料Spark開發的時候,

deeplearning.ai結構化機器學習專案

1 正交化 正交化的含義是在設計系統時,應該使得系統一個元件/引數的變化對另一個元件/引數的影響儘可能小。這樣就可以相對簡單的實驗系統的各個組成部分,可以減小系統的驗證和測試時間。 開發一個有監督機器學習系統時,應該依次序完成四件事情: 訓練集要對代價函式擬合的很好,

15Spark Streaming原始碼解讀之No Receivers徹底思考

背景:      目前No Receivers在企業中使用的越來越多。No Receivers具有更強的控制度,語義一致性。No Receivers是我們操作資料來源自然方式,操作資料來源使用一個封裝器,且是RDD型別的。所以Spark Streaming就產生了自定義R

大資料Spark “蘑菇雲”行動補充內容70 Spark SQL程式碼實戰和效能調優 4個spark sql調優技巧有用!!!!

大資料Spark “蘑菇雲”行動補充內容第70課: Spark SQL程式碼實戰和效能調優 dataframe: Row是沒有型別的,因為Row中的所有成員都被看著Object型別!!!untype