1. 程式人生 > >Spark Streaming原始碼解讀之State管理之updateStateByKey和mapWithState解密

Spark Streaming原始碼解讀之State管理之updateStateByKey和mapWithState解密

源地址:http://blog.csdn.net/snail_gesture/article/details/5151058

背景: 
整個Spark Streaming是按照Batch Duractions劃分Job的。但是很多時候我們需要算過去的一天甚至一週的資料,這個時候不可避免的要進行狀態管理,而Spark Streaming每個Batch Duractions都會產生一個Job,Job裡面都是RDD,所以此時面臨的問題就是怎麼對狀態進行維護?這個時候就需要藉助updateStateByKey和mapWithState方法完成核心的步驟。 
原始碼分析: 
1. 無論是updateStateByKey還是mapWithState方法在DStream中均沒有,但是是通過隱身轉換函式實現其功能。

<code class="hljs markdown has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">object DStream {

  // <span class="hljs-code" style="box-sizing: border-box;">`toPairDStreamFunctions`</span> was in SparkContext before 1.3 and users had to
  // <span class="hljs-code" style="box-sizing: border-box;">`import StreamingContext._`</span> to enable it. Now we move it here to make the compiler find
  // it automatically. However, we still keep the old function in StreamingContext for backward
  // compatibility and forward to the following function directly.

  implicit def toPairDStreamFunctions[<span class="hljs-link_label" style="box-sizing: border-box;">K, V</span>](<span class="hljs-link_url" style="box-sizing: border-box;">stream: DStream[(K, V</span>)])
<span class="hljs-code" style="box-sizing: border-box;">      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):</span>
<span class="hljs-code" style="box-sizing: border-box;">    PairDStreamFunctions[K, V] = {</span>
<span class="hljs-code" style="box-sizing: border-box;">    new PairDStreamFunctions[K, V](stream)</span>
  }
</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li></ul>

updateStateByKey: 
1. 在PairDStreamFunctions中updateStateByKey具體實現如下: 
在已有的歷史基礎上,updateFunc對歷史資料進行更新。該函式的返回值是DStream型別的。

<code class="hljs fsharp has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">/**
 * Return a <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"state"</span> DStream where the state <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> each key is updated by applying
 * the given <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">function</span> on the previous state <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> the key <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">and</span> the <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> values <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> each key.
 * Hash partitioning is used <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> generate the RDDs <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">with</span> Spark's <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">default</span> number <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> partitions.
 * @param updateFunc State update <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">function</span>. If `this` <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">function</span> returns None, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">then</span>
 *                   corresponding state key-value pair will be eliminated.
 * @tparam S State <span class="hljs-class" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">type</span></span>
 */
def updateStateByKey[S: ClassTag](
    updateFunc: (Seq[V], Option[S]) => Option[S]
  ): DStream[(K, S)] = ssc.withScope {
<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// defaultPartitioner</span>
  updateStateByKey(updateFunc, defaultPartitioner())
}
</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li></ul>
2.  defaultPartitioner:
<code class="hljs cs has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span>[streaming] def <span class="hljs-title" style="box-sizing: border-box;">defaultPartitioner</span>(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
  <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> HashPartitioner(numPartitions)
}
</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li></ul>
3.  partitioner就是控制RDD的每個patition
<code class="hljs coffeescript has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">/**
 * Return a <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"state"</span> DStream where the state <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> each key <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">is</span> updated <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">by</span> applying
 * the given <span class="hljs-reserved" style="box-sizing: border-box;">function</span> <span class="hljs-literal" style="color: rgb(0, 102, 102); box-sizing: border-box;">on</span> the previous state <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> the key <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">and</span> the <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> values <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> the key.
 * org.apache.spark.Partitioner <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">is</span> used to control the partitioning <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> each RDD.
 * <span class="hljs-property" style="box-sizing: border-box;">@param</span> updateFunc State update <span class="hljs-reserved" style="box-sizing: border-box;">function</span>. If `<span class="javascript" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span></span>` <span class="hljs-reserved" style="box-sizing: border-box;">function</span> returns None, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">then</span>
 *                   corresponding state key-value pair will be eliminated.
 * <span class="hljs-property" style="box-sizing: border-box;">@param</span> partitioner Partitioner <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> controlling the partitioning <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> each RDD <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">in</span> the <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span>
 *                    DStream.
 * <span class="hljs-property" style="box-sizing: border-box;">@tparam</span> S State type
 */
def updateStateByKey[<span class="hljs-attribute" style="box-sizing: border-box; color: rgb(0, 136, 0);">S</span>: ClassTag](
    <span class="hljs-attribute" style="box-sizing: border-box; color: rgb(0, 136, 0);">updateFunc</span>: <span class="hljs-function" style="box-sizing: border-box;"><span class="hljs-params" style="color: rgb(102, 0, 102); box-sizing: border-box;">(Seq[V], Option[S])</span> =></span> Option[S],
    <span class="hljs-attribute" style="box-sizing: border-box; color: rgb(0, 136, 0);">partitioner</span>: Partitioner
  ): DStream[(K, S)] = ssc.withScope {
  val cleanedUpdateF = sparkContext.clean(updateFunc)
  val <span class="hljs-function" style="box-sizing: border-box;"><span class="hljs-title" style="box-sizing: border-box;">newUpdateFunc</span> = <span class="hljs-params" style="color: rgb(102, 0, 102); box-sizing: border-box;">(iterator: Iterator[(K, Seq[V], Option[S])])</span> =></span> {
    iterator.flatMap(t<span class="hljs-function" style="box-sizing: border-box;"> =></span> cleanedUpdateF(t._2, t._3).map(s<span class="hljs-function" style="box-sizing: border-box;"> =></span> (t._1, s)))
  }
  updateStateByKey(newUpdateFunc, partitioner, <span class="hljs-literal" style="color: rgb(0, 102, 102); box-sizing: border-box;">true</span>)
}
</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li></ul>
4.  rememberPartitioner預設為true
<code class="hljs applescript has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">/**
 * Return a new <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"state"</span> DStream <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">where</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> state <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> each key <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">is</span> updated <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">by</span> applying
 * <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">given</span> function <span class="hljs-function_start" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">on</span></span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> previous state <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> key <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">and</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> new values <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> each key.
 * org.apache.spark.Partitioner <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">is</span> used <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> control <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> partitioning <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> each RDD.
 * @param updateFunc State update function. Note, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">that</span> this function may generate a different
 *                   tuple <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">with</span> a different key than <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> input key. Therefore keys may be removed
 *                   <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">or</span> added <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">in</span> this way. It <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">is</span> up <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> developer <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> decide whether <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span>
 *                   remember <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> partitioner despite <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> key being changed.
 * @param partitioner Partitioner <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> controlling <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> partitioning <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">of</span> each RDD <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">in</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> new
 *                    DStream
 * @param rememberPartitioner Whether <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">to</span> remember <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> paritioner object <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">in</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">the</span> generated RDDs.
 * @tparam S State type
 */
def updateStateByKey[S: ClassTag](
    updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
    partitioner: Partitioner,
    rememberPartitioner: Boolean
  ): DStream[(K, S)] = ssc.withScope {
   new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)
}
</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li></ul>
5.  在StateDStream中,StorageLevel是直接儲存到磁碟,因為此時的資料非常大
<code class="hljs haskell has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"><span class="hljs-class" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">class</span> <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">StateDStream</span>[<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">K</span>: <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">ClassTag</span>, <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">V</span>: <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">ClassTag</span>, <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">S</span>: <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">ClassTag</span>]<span class="hljs-container" style="box-sizing: border-box;">(
    <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">parent</span>: <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">DStream</span>[(<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">K</span>, <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">V</span>)</span>],
    updateFunc: <span class="hljs-container" style="box-sizing: border-box;">(<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">Iterator</span>[(<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">K</span>, <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">Seq</span>[<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">V</span>], <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">Option</span>[<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">S</span>])</span>]) => <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">Iterator</span>[<span class="hljs-container" style="box-sizing: border-box;">(<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">K</span>, <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">S</span>)</span>],
    partitioner: <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">Partitioner</span>,
    preservePartitioning: <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">Boolean</span>,
    initialRDD : <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">Option</span>[<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">RDD</span>[<span class="hljs-container" style="box-sizing: border-box;">(<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">K</span>, <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">S</span>)</span>]]
  ) extends <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">DStream</span>[<span class="hljs-container" style="box-sizing: border-box;">(<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">K</span>, <span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">S</span>)</span>]<span class="hljs-container" style="box-sizing: border-box;">(<span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">parent</span>.<span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">ssc</span>)</span> {

  super.persist<span class="hljs-container" style="box-sizing: border-box;">(<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">StorageLevel</span>.<span class="hljs-type" style="box-sizing: border-box; color: rgb(102, 0, 102);">MEMORY_ONLY_SER</span>)</span>
</span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li></ul>
  1. 在computeUsingPreiviousRDD原始碼如下:
<code class="hljs scala has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> [<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">this</span>] <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">def</span> computeUsingPreviousRDD (
  parentRDD : RDD[(K, V)], prevStateRDD : RDD[(K, S)]) = {
  <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// Define the function for the mapPartition operation on cogrouped RDD;</span>
  <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// first map the cogrouped tuple to tuples of required type,</span>
  <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// and then apply the update function</span>
  <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> updateFuncLocal = updateFunc
  <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
    <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> i = iterator.map(t => {
      <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> itr = t._2._2.iterator
      <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> headOption = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">if</span> (itr.hasNext) Some(itr.next()) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">else</span> None
      (t._1, t._2._1.toSeq, headOption)
    })
    updateFuncLocal(i)
  }
<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//cogroup每次計算的時候都會遍歷prevSrateRDD中的所有parititioner的資訊</span>
<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//</span>
  <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
  <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">val</span> stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
  Some(stateRDD)
}
</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li></ul>

所以,如果資料很多的時候不建議使用updateStateByKey。 
updateStateByKey函式實現如下:

這裡寫圖片描述

mapWithState: 
1. 返回MapWithStateDStream函式,維護和更新歷史狀態都是基於Key。使用一個function對key-value形式的資料進行狀態維護。

<code class="hljs lua has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-radius: 0px; word-wrap: normal; background: transparent;">/**
 * :: Experimental ::
 * Return a <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">[[MapWithStateDStream]]</span> by applying a <span class="hljs-function" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">function</span> <span class="hljs-title" style="box-sizing: border-box;">to</span> <span class="hljs-title" style="box-sizing: border-box;">every</span> <span class="hljs-title" style="box-sizing: border-box;">key</span>-<span class="hljs-title" style="box-sizing: border-box;">value</span> <span class="hljs-title" style="box-sizing: border-box;">element</span> <span class="hljs-title" style="box-sizing: border-box;">of</span>
 * `<span class="hljs-title" style="box-sizing: border-box;">this</span>` <span class="hljs-title" style="box-sizing: border-box;">stream</span>, <span class="hljs-title" style="box-sizing: border-box;">while</span> <span class="hljs-title" style="box-sizing: border-box;">maintaining</span> <span class="hljs-title" style="box-sizing: border-box;">some</span> <span class="hljs-title" style="box-sizing: border-box;">state</span> <span class="hljs-title" style="box-sizing: border-box;">data</span> <span class="hljs-title" style="box-sizing: border-box;">for</span> <span class="hljs-title" style="box-sizing: border-box;">each</span> <span class="hljs-title" style="box-sizing: border-box;">unique</span> <span class="hljs-title" style="box-sizing: border-box;">key</span>. <span class="hljs-title" style="box-sizing: border-box;">The</span> <span class="hljs-title" style="box-sizing: border-box;">mapping</span> <span class="hljs-title" style="box-sizing: border-box;">function</span>
 * <span class="hljs-title" style="box-sizing: border-box;">and</span> <span class="hljs-title" style="box-sizing: border-box;">other</span> <span class="hljs-title" style="box-sizing: border-box;">specification</span> <span class="hljs-params" style="color: rgb(102, 0, 102); box-sizing: border-box;">(e.g. partitioners, timeouts, initial state data, etc.)</span></span> of this
 * transformation can be specified using <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">[[StateSpec]]</span> class. The state data is accessible <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">in</span>
 * as a parameter of <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">type</span> <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">[[State]]</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">in</span> the mapping <span class="hljs-function" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">function</span>.
 *
 * <span class="hljs-title" style="box-sizing: border-box;">Example</span> <span class="hljs-title" style="box-sizing: border-box;">of</span> <span class="hljs-title" style="box-sizing: border-box;">using</span> `<span class="hljs-title" style="box-sizing: border-box;">mapWithState</span>`:
 * {{{
 *    // <span class="hljs-title" style="box-sizing: border-box;">A</span> <span class="hljs-title" style="box-sizing: border-box;">mapping</span> <span class="hljs-title" style="box-sizing: border-box;">function</span> <span class="hljs-title" style="box-sizing: border-box;">that</span> <span class="hljs-title" style="box-sizing: border-box;">maintains</span> <span class="hljs-title" style="box-sizing: border-box;">an</span> <span class="hljs-title" style="box-sizing: border-box;">integer</span> <span class="hljs-title" style="box-sizing: border-box;">state</span> <span class="hljs-title" style="box-sizing: border-box;">and</span> <span class="hljs-title" style="box-sizing: border-box;">return</span> <span class="hljs-title" style="box-sizing: border-box;">a</span> <span class="hljs-title" style="box-sizing: border-box;">String</span>
//此時的<span class="hljs-title" style="box-sizing: border-box;">state</span>就可以看成一張表,這張表記錄了狀態維護中所有的歷史狀態。
 *    <span class="hljs-title" style="box-sizing: border-box;">def</span> <span class="hljs-title" style="box-sizing: border-box;">mappingFunction</span><span class="hljs-params" style="color: rgb(102, 0, 102); box-sizing: border-box;">(key: String, value: Option[Int], state: State[Int])</span></span>: Option[String] = {
 *      // Use state.exists(), state.get(), state.update() <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">and</span> state.remove()
 *      // to manage state, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">and</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">return</span> the necessary <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">string</span>
 *    }
 *
 *    val spec = StateSpec.function(mappingFunction).numPartitions(<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">10</span>)
 *
 *    val mapWithStateDStream = keyValueDStream.mapWithState[StateType, MappedType](spec)
 * }}}
 *
 * @param spec          Specification of this transformation
 * @tparam StateType    Class <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">type</span> of the state data
 * @tparam MappedType   Class <span class="hljs-built_in" style="color: rgb(102, 0, 102); box-sizing: border-box;">type</span> of the mapped data
 */
@Experimental
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
    spec: StateSpec[K, V, StateType, MappedType]
  ): MapWithStateDStream[K, V, StateType, MappedType] = {
  new MapWithStateDStreamImpl[K, V, StateType, MappedType](
    self,
// StateSpecImpl類封裝了StateSpec操作。
    spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
  )
}
</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li></ul><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li></ul>
2.  MapWithStateDStream原始碼如下:
            
           

相關推薦

Spark Streaming原始碼解讀State管理updateStateByKeymapWithState解密

源地址:http://blog.csdn.net/snail_gesture/article/details/5151058 背景:  整個Spark Streaming是按照Batch Duractions劃分Job的。但是很多時候我們需要算過去的一天甚

Spark Streaming原始碼解讀Receiver在Driver的精妙實現全生命週期徹底研究思考

在Spark Streaming中對於ReceiverInputDStream來說,都是現實一個Receiver,用來接收資料。而Receiver可以有很多個,並且執行在不同的worker節點上。這些Receiver都是由ReceiverTracker來管理的。

Spark 定製版:015~Spark Streaming原始碼解讀No Receivers徹底思考

本講內容: a. Direct Acess b. Kafka 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,我們講Spark Streaming中一個非常重要的內容:State狀態管理

Spark Streaming原始碼解讀資料清理內幕徹底解密

本篇部落格的主要目的是: 1. 理清楚Spark Streaming中資料清理的流程 組織思路如下: a) 背景 b) 如何研究Spark Streaming資料清理? c) 原始碼解析

Spark 定製版:013~Spark Streaming原始碼解讀Driver容錯安全性

本講內容: a. ReceiverBlockTracker容錯安全性 b. DStreamGraph和JobGenerator容錯安全性 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,

第15課:Spark Streaming原始碼解讀No Receivers徹底思考

背景:      目前No Receivers在企業中使用的越來越多。No Receivers具有更強的控制度,語義一致性。No Receivers是我們操作資料來源自然方式,操作資料來源使用一個封裝器,且是RDD型別的。所以Spark Streaming就產生了自定義R

Spark 定製版:010~Spark Streaming原始碼解讀流資料不斷接收全生命週期徹底研究思考

本講內容: a. 資料接收架構設計模式 b. 資料接收原始碼徹底研究 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,我們給大傢俱體分析了Receiver啟動的方式及其啟動設計帶來的多個

Spark Streaming原始碼解讀No Receivers詳解

背景: 目前No Receivers在企業中使用的越來越多。No Receivers具有更強的控制度,語義一致性。No Receivers是我們操作資料來源自然方式,操作資料來源使用一個封裝器,且是RDD型別的。所以Spark Streaming就產生了自定義

Spark Streaming原始碼解讀Driver中的ReceiverTracker詳解

本篇博文的目標是: Driver的ReceiverTracker接收到資料之後,下一步對資料是如何進行管理 一:ReceiverTracker的架構設計 1. Driver在Executor啟動Receiver方式,每個Receiver都封裝成一個Tas

Spark定製班第9課:Spark Streaming原始碼解讀Receiver在Driver的精妙實現全生命週期徹底研究思考

本期內容: 1. Receiver啟動的方式設想 2. Receiver啟動原始碼徹底分析 1. Receiver啟動的方式設想   Spark Streaming是個執行在Spark Core上的應用程式。這個應用程式既要接收資料,還要處理資料,這些都是在分散式的

Spark——Streaming原始碼解析容錯

此文是從思維導圖中匯出稍作調整後生成的,思維腦圖對程式碼瀏覽支援不是很好,為了更好閱讀體驗,文中涉及到的原始碼都是刪除掉不必要的程式碼後的虛擬碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀: 此博文共分為四個部分: DAG定義 Job動態生成 資料的產生與匯入 容錯 ​ 策略 優點 缺點 (1) 熱備

Spark——Streaming原始碼解析資料的產生與匯入

此文是從思維導圖中匯出稍作調整後生成的,思維腦圖對程式碼瀏覽支援不是很好,為了更好閱讀體驗,文中涉及到的原始碼都是刪除掉不必要的程式碼後的虛擬碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀: 此博文共分為四個部分: DAG定義 Job動態生成 資料的產生與匯入 容錯 資料的產生與匯入主要分為以下五個部分

Spark——Streaming原始碼解析DAG定義

此文是從思維導圖中匯出稍作調整後生成的,思維腦圖對程式碼瀏覽支援不是很好,為了更好閱讀體驗,文中涉及到的原始碼都是刪除掉不必要的程式碼後的虛擬碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀: 此博文共分為四個部分: DAG定義 Job動態生成 資料的產生與匯入 容錯 1. DStream 1.1. RD

大資料學習路107-spark streaming基於mysql歷史state統計

package com.test.sparkStreaming import java.sql.{DriverManager, PreparedStatement} import com.typesafe.config.{Config, ConfigFactory} im

Spark MLlib原始碼解讀樸素貝葉斯分類器,NaiveBayes

Spark MLlib 樸素貝葉斯NaiveBayes 原始碼分析 基本原理介紹 首先是基本的條件概率求解的公式。 P(A|B)=P(AB)P(B) 在現實生活中,我們經常會碰到已知一個條件概率,求得兩個時間交換後的概率的問題。也就是在已知P(A

faster rcnn原始碼解讀(六)minibatch

原始碼: # -------------------------------------------------------- # Fast R-CNN # Copyright (c) 2015 Microsoft # Licensed under The MIT Li

node總結管理器npmcnpm瞭解下

npm(node package manager)是nodejs的包管理器,用於node外掛管理(包括安裝、解除安裝、管理依賴等), NPM是隨同NodeJS一起安裝的包管理工具,能解決NodeJS程式碼部署上的很多問題,它是 Node 獲得成功的重要原因之一。 但是我們平常安裝包的時候,還

Android面試系列文章2018記憶體管理UI卡頓篇

Android面試系列文章2018之記憶體管理之UI卡頓篇 1.UI卡頓的原理   60ftp –> 16ms: Android系統每隔16ms都會對介面進行渲染一次,造成卡頓的原因就是Android系統在渲染的時候丟幀了, 16ms = 1000/60hz,相當於60fps

Spark Streaming狀態管理函式(一)——updateStateByKeymapWithState

updateStateByKey和mapWithState   什麼是狀態管理函式   updateStateByKey   mapWithState   updateStateByKey和mapWithState的區別   適用場景 什麼是狀態管理函

SODBASE CEP學習進階篇(七)續:SODBASE CEP與Spark streaming整合-低延遲規則管理 與分散式快取整合

在實際大資料工作中,常常有實時監測資料庫變化或實時同步資料到大資料儲存,解決大資料實時分析的需求。同時,增量同步資料庫資料相比全量查詢也減少了網路頻寬消耗。本文以Mysql的bin-log到Kafka為例,使用Canal Server,通過SODBASE引擎不用寫程式就可以設定資料同步規則。