1. 程式人生 > >spark streaming 入門

spark streaming 入門

1. transformation:

transformation型別的運算元,是懶載入的(就是說不呼叫Action的方法,是不會去計算的 )

2.action

一旦呼叫立即執行計算

3.常見的運算元

transformation:

Transformation

Meaning

map(func)

Return a new distributed dataset formed by passing each element of the source through a function func.

對呼叫

mapRDD資料集中的每個element都使用func,然後返回一個新的RDD,這個返回的資料集是分散式的資料集

filter(func)

Return a new dataset formed by selecting those elements of the source on which funcreturns true.

對呼叫filter的RDD資料集中的每個元素都使用func,然後返回一個包含使func為true的元素構成的RDD  

flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items (so func

should return a Seq rather than a single item).

和map差不多,但是flatMap生成的是多個結果,返回值是一個Seq

mapPartitions(func)

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.

和map很像,但是map是每個element,而mapPartitions是每個partition

mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

和mapPartitions很像,但是func作用的是其中一個split上,所以func中應該有index  

sample(withReplacementfractionseed)

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.

抽樣

union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.

返回一個新的dataset,包含源dataset和給定dataset的元素的集合 

intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

distinct([numTasks]))

Return a new dataset that contains the distinct elements of the source dataset.

返回一個新的dataset,這個dataset含有的是源dataset中的distinct的element  

groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

返回(K,Seq[V]),也就是hadoop中reduce函式接受的key-valuelist  

reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

就是用一個給定的reduce func再作用在groupByKey產生的(K,Seq[V]),比如求和,求平均數  

aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

按照key來進行排序,是升序還是降序,ascending是boolean型別  

join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin,rightOuterJoin, and fullOuterJoin.

當有兩個KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks為併發的任務數  

cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.

當有兩個KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks為併發的任務數  

cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

笛卡爾積就是m*n

pipe(command[envVars])

Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.

coalesce(numPartitions)

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

repartitionAndSortWithinPartitions(partitioner)

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

Action:

Action

Meaning

reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

說白了就是聚集,但是傳入的函式是兩個引數輸入返回一個值,這個函式必須是滿足交換律和結合律的  

collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組 

count()

Return the number of elements in the dataset. 返回的是dataset中的element的個數  

first()

Return the first element of the dataset (similar to take(1)). 返回的是dataset中的第一個元素 

take(n)

Return an array with the first n elements of the dataset. 返回前n個elements,這個士driver program返回的 

takeSample(withReplacement,num, [seed])

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed. 抽樣返回一個dataset中的num個元素,隨機種子seed  

takeOrdered(n[ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.

saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. 把dataset寫到一個text file中,或者hdfs,或者hdfs支援的檔案系統中,spark把每條記錄都轉換為一行記錄,然後寫到file中  

saveAsSequenceFile(path)  (Java and Scala)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). 只能用在key-value對上,然後生成SequenceFile寫到本地或者hadoop檔案系統  

saveAsObjectFile(path)  (Java and Scala)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().

countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. 返回的是key對應的個數的一個map,作用於一個RDD  

foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating anAccumulator or interacting with external storage systems. Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details. :對dataset中的每個元素都使用func

4. 常用運算元的使用

4.1 .keyBy()+.groupbyKey()+.map+.foreach+.collect    =>對table型別的RDD先按某個元素分組,再對組內元素進行求和,求最值等數學運算

val seq=Seq((1,2017,11,13,"zhangsan",21),(2,2018,2,12,"lisi",34),(3,2016,10,"wangwu",32),(4,2015,11,12,"zhaoliu",65),(5,2016,8,13,"liqi",45))

val rddx = sc.makeRDD(seq)

//把月份設為key

val rddx1= rddx.keyBy(_._3)

rddx1.groupByKey().map{

case(k,v) =>

var sum,count,max =0

var ,min=100

v.foreach{  //遍歷value中的每個元組

x=>

sum+=x._6    //對元組內第6個元素求和

if(max<x._4) max=x._4  /對元組內第4個元素求最大值

if(min>x._4) min=x._4

sum

}

count=v.count(x=>true)

val mean=sum/count.toDouble

(k,count,sum,mean,min,max) //返回值

}.collect().map(x=>print(x))

// 對rddx 新增一列,並根據第一個元素排序

rddx.map { x=>{

(x._1, x._2, x._3, x._4, x._5, x._6,  x._6 - x._4)

}}.sortBy(x=>x._1).collect(). map(x=>print(x))

                               .toDF("id","year","month","day","name","age","age_day")



4.2 對k-v型別的RDD分組然後求mean,sum,max等

rdd.groupByKey().map{ x=>(x._1, x._2.reduce(_+_)/x._2.count(x=>true).toDouble, x._2.min, x._2.max)}

5. rdd join

 5.1 RDD的join操作,如果存在有join的RDD為空,則計算後的結果也將是空的。

val conf = new SparkConf().setMaster("local").setAppName("RDDJoinDemo2")  

  val sc = new SparkContext(conf)  

  val flow = sc.parallelize(  

    List(  

      ("001001003001",2,10),  

      ("001001001001",1,20),  

     ("001002001001",1,30)  ) 

  )    

  val greater10 = flow.filter(_._3>10)  

  //println("2 :" + greater10.count())  // 2 :2  

  val eq2 = flow.filter(_._2==2)  

  //println("1 :" +  eq2.count())       // 1 :1  

  val eq3 = flow.filter(_._2==3)  

  //println("0 :" + eq3.count())        // 0 :0  

  

  val a = greater10.map(x=>(x._1,1)).reduceByKey(_+_)  

  println("有2條資料:" + a.count())  

  val c = eq3.map(x=>(x._1,1)).reduceByKey(_+_)  

  println("有0條資料:" + c.count())  

  val f = a.join(c)  

  println("有資料的RDD和無資料的RDD,join之後有0條資料:" + f.count()) 

5.2    join 類別 : 兩個RDD必須要滿足 都為(1,2)   (1,(2,3))      這種資料格式才能join

(1,2,3,4)join (1,3,4,5)    是不行的

只能變成(1,(2,3,4)) join (1,(3,4,5,6)) 才能join


   val idName = sc.parallelize(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu")))
 
   val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21)))
  
    idName.join(idAge).collect().foreach(println)
    
    idName.leftOuterJoin(idAge).collect().foreach(println)

    idName.rightOuterJoin(idAge).collect().foreach(println)
    
     idName.fullOuterJoin(idAge).collect().foreach(println)

     idName.cogroup(idAge).collect().foreach(println)
     

fullOuterJoin與cogroup的比較:當出現相同Key時, join會出現笛卡爾積, 而cogroup的處理方式不同

val idScore = sc.parallelize(Array((1, 100), (2, 90), (2, 95))) println("\ncogroup, 出現相同id時\n") /** * (1,(CompactBuffer(zhangsan),CompactBuffer(100))) * (2,(CompactBuffer(lisi),CompactBuffer(90, 95))) * (3,(CompactBuffer(wangwu),CompactBuffer())) */ idName.cogroup(idScore).collect().foreach(println) println("\njoin, 出現相同id時\n") /** * (1,(Some(zhangsan),Some(100))) * (2,(Some(lisi),Some(90))) * (2,(Some(lisi),Some(95))) * (3,(Some(wangwu),None)) */ idName.fullOuterJoin(idScore).collect().foreach(println)

 val idName1 = sc.parallelize(Array((1, 23,"zhangsan"), (2,12, "lisi"), (3,15, "wangwu")))

  val idAge1 = sc.parallelize(Array((1, 30,165), (2, 29,170), (4, 21,185)))

// .keyBy(x=>(x._1,x._2))   keyBy的引數 就是join條件,同時將(1,23,"zhangsan")轉為((1,23),(1,2,"zhangsan"))  ;(1,30,165)轉為

( (1,30),(1,30,165)) 

val join= idName1.keyBy(x=>(x._1, x._2)).join(idAge1.keyBy(x=>(x._1, x._2))).map{x=>{

(x._1 , x._2._1._2 ,  x._2._1._3 ,  x._2._2._2 , x._2._2._3)

}}.collect().map(x=>println(x))

6. RDD的生成

val rdd = sc.makeRDD(Seq((1,2017,11),(2,2018,12)))
val rdd = sc.parallelize (Seq((1,2017,11),(2,2018,12))  ,2)

兩者區別:後者可以指定分割槽大小,這裡為2 ,前者固定為seq引數的size,這裡為3 1)parallelize的宣告:

def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T]

2)makeRDD的宣告:

def makeRDD[T: ClassTag](

seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T]
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]

3)詳細區別: A)makeRDD函式比parallelize函式多提供了資料的位置資訊。 B)兩者的返回值都是ParallelCollectionRDD,但parallelize函式可以自己指定分割槽的數量,而makeRDD函式固定為seq引數的size大小。

7.reduceByKey --- mapValues

scala> val c = sc.parallelize(List("aaa","b","b","c"))
c: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:21


//做一個對映
scala> c.map(x => (x,x.length))
res7: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at map at <console>:24

scala> res7.first
res8: (String, Int) = (aaa,3)

//把後面的值做對映 轉換成元胞的形式
scala> res7.mapValues(y=>(y,10))
res11: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[10] at mapValues at <console>:26

scala> res11.collect
res12: Array[(String, (Int, Int))] = Array((aaa,(3,10)), (b,(1,10)), (b,(1,10)), (c,(1,10)))

scala> res7.collect
res13: Array[(String, Int)] = Array((aaa,3), (b,1), (b,1), (c,1))

使用reduceByKey  mapValues

val key = sc.parallelize(List(("panda",0),("pink",3),("pirate",3),("panda",1),("pink",4)))
綜合:key.mapValues(y=>(y,1)) reduceByKey((x,y) => (x._1+y._1 , x._1+y._2))
 
scala> val key = sc.parallelize(List(("panda",0),("pink",3),("pirate",3),("panda",1),("pink",4)))
key: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:21

scala> key.collect
res15: Array[(String, Int)] = Array((panda,0), (pink,3), (pirate,3), (panda,1), (pink,4))

scala> val key1 = key.mapValues(y=>(y,1))
key1: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[13] at mapValues at <console>:23

scala> key1.collect
res16: Array[(String, (Int, Int))] = Array((panda,(0,1)), (pink,(3,1)), ((pirate,(3,1)), (panda,(1,1)), (pink,(4,1)))

scala> val key2 = key1.reduceByKey((x,y) => (x._1+y._1 , x._1+y._2))
key2: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[14] at reduceByKey at <console>:25

scala> key2.collect
res17: Array[(String, (Int, Int))] = Array(([pirate,(3,1)), (panda,(1,2)), (pink,(7,5)))

join相同鍵的值 相當於內連線

scala> val aa = sc.parallelize(List(("K1","V1"),("K2","V2"),("K3","V3")))
aa: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[15] at parallelize at <console>:21

scala> val bb = sc.parallelize(List(("K1","V1"),("K2","V2"),("K4","V4")))
bb: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[16] at parallelize at <console>:21

scala> aa.join(bb)
res18: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[19] at join at <console>:26

scala> res18.collect
res19: Array[(String, (String, String))] = Array((K1,(V1,V1)), (K2,(V2,V2)))

//bb join aa coleect也行
scala> bb.join(aa) collect
warning: there were 1 feature warning(s); re-run with -feature for details
res20: Array[(String, (String, String))] = Array((K1,(V1,V1)), (K2,(V2,V2)))

//自己join自己 得到相同的鍵後面兩個相同的值
scala> bb.join(bb) collect
warning: there were 1 feature warning(s); re-run with -feature for details
res21: Array[(String, (String, String))] = Array((K1,(V1,V1)), (K4,(V4,V4)), (K2,(V2,V2)))

//take()取出前幾個值
scala> bb.take(2)
res23: Array[(String, String)] = Array((K1,V1), (K2,V2))

8. streaming下操作從kafka取出的資料

#調優

#將spark.streaming.unpersist設定為true,系統將自動清理不需要的RDD,以顯著減少RDD對記憶體的佔用

#可設定streamingContext.remember為資料設定更長的保留時間

#sparkStreaming預設將接收到的資料序列化後放入記憶體,以減少記憶體使用,序列化和反序列化需要耗費cpu資源,因此使用適當的序列化工具(Kryo)和自定義的序列號介面可以高效的使用cpu,還可以設定spark.rdd.compress以時間開銷來換取記憶體資源,以降低GC開銷。

#控制批處理時間間隔內的資料量:如一個批處理時間間隔是1秒,但是1秒產生了1G資料,那麼要確保當前的節點上至少有可供sparkStreaming使用的1g記憶體。

#執行時間優化:

1.設定合理的批處理時間和視窗大小:500ms是個較好的批處理時間間隔

2.提高並行度:1.增加接收器數目2.DStream.repartition 3.提高聚合運算的並行度(如顯示為reduceByKey(),reduceByKeyAndWindow...設定更高的行度引數)

kafka+spark streaming +hbase 小例子

val  streamRDD=km.createDirectStream(...).map(_._2).map( msgLine=>{

val data=msgLine.split("\\|")

val id=data(0)

val name =data(1)

val age=data(2)

val price=data(3)

(id,name,age,price)

})

#使用updatestateByKey時需要checkpoint 持久化接收到的資料,在叢集模式下執行時,需要將持久化目錄設為HDFS上的目錄

# ssc.checkpoint("hdfs://master01:9000/user/yw/input/checkpointdir")

ssc.checkpoint("checkpointdir")// 用於讀取持久化到記憶體或者磁碟的資料

// 邏輯處理,求同一個人的多條記錄的price的平均值,最大值,最小值

val resultData=streamRDD.keyBy(x=>(x._1, x._2, x._3)).map(x=>(x._1, x._2._4)).groupByKey().map{ x=>(

x._1, x._2.reduce(_+_)/x._2.count(x=>true).toDouble ,x.2_max, x._2.min)

}

// 將本次統計結果放到Hbase

val opt =resultData.foreachRDD{ rdd=>{

rdd.foreachPartition( partitionRecords =>{

try{

partitionRecords.foreach(s =>{

val  id =s._1._1

val name=s._1._2

val age=s._1._3

val  mean= s._2

val max=s._3

val min= s._4

// 將以上列存入hbase

// 建立hbase config  一般在方法外面建立,只建立一次

val conf =HBaseConfiguration.create()

conf.set("hbase.zookeeper.quorum", "hadoop5,"hadoop7,hadoop6,hadoop2,hadoop1"")

val conn =ConnectionFactory.createConnection(conf)

// 根據hbase中已經建立的表名,獲取連線;hbase shell 命令建表: create ‘tableName’,'cf'

val tableName=TableName.valueOf("historyData")

val table =conn.getTable(tableName)

val put:Put=new Put(Bytes.toBytes(id+"_"+mean+"_"+min))// 指定rowkey 然後生成put物件

put.addColumn(Bytes.toBytes("cf"),  Bytes.toBytes("id"), Bytes.toBytes(id))

put.addColumn(Bytes.toBytes("cf"),  Bytes.toBytes("name"), Bytes.toBytes(name))

put.addColumn(Bytes.toBytes("cf"),  Bytes.toBytes("(age"), Bytes.toBytes(age))

put.addColumn(Bytes.toBytes("cf"),  Bytes.toBytes("mean"), Bytes.toBytes(mean))

put.addColumn(Bytes.toBytes("cf"),  Bytes.toBytes("max"), Bytes.toBytes(max))

put.addColumn(Bytes.toBytes("cf"),  Bytes.toBytes("min"), Bytes.toBytes(min))

Try(table.put(put) ) .getOrElse(table.close())

table.close() //分割槽資料寫入hbase後關閉連線

})

}catch{

case e: Exception =>println("Error")}

})

}}