1. 程式人生 > >hadoop常用演算法在spark中實現

hadoop常用演算法在spark中實現

object MRInSpark {
  /**
    * 求最大值最小值一直是Hadoop的經典案例,我們用Spark來實現一下,
    * 藉此感受一下spark中mr的思想和實現方式
    */
  def maxMin = {
    val sconf = new SparkConf().setAppName("avgTest").setMaster("local[2]")
    val sc = new SparkContext(sconf)
    val foo = sc.parallelize(List(1, 6, 4, 22))
    val max = foo.reduce((a, b) => Math.max(a, b))
    val min = foo.reduce((a, b) => Math.min(a, b))
    print(s"max=$max, min=$min")
  }

  /**
    * 平均值問題
    * 求每個key對應的平均值是常見的案例,
    * 在spark中處理類似問題常常會用到combineByKey這個函式,
    * 詳細介紹請google一下用法,下面看程式碼: *
    */
  def avg = {
    val sconf = new SparkConf().setAppName("avgTest").setMaster("local[2]")
    val sc = new SparkContext(sconf)
    val foo = sc.parallelize(List(("a", 1), ("a", 3), ("b", 2), ("b", 8)))
    val result = foo.combineByKey(
      //按照key進行分割槽內合併,v表示value,1表示當前的key出現的次數
      (v) => (v, 1),
      //acc為之前建立的元組,如果出現同一個key的value要進行累加
      (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
      //將不同分割槽間的的資料進行合併
      (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
    ).map { case (k, v) => (k, v._1 / v._2.toDouble) }
    result.collect().foreach(println)
  }
  
  def avgTwo = {
    val sconf = new SparkConf().setAppName("avgTest").setMaster("local[2]")
    val sc = new SparkContext(sconf)
    val foo = sc.parallelize(List(("a", 1), ("a", 3), ("b", 2), ("b", 8)))
    val result = foo.groupByKey().map { case (k, vs) => (k, vs.toList.sum / vs.size) }
    result.collect().foreach(println)
  }

  /**
    * Top n問題同樣也是hadoop種體現mr思想的經典案例,那麼在spark中如何方便快捷的解決呢:
    */
  def topn = {
    val sconf = new SparkConf().setAppName("avgTest").setMaster("local[2]")
    val sc = new SparkContext(sconf)
    val foo = sc.parallelize(List(("a", 1), ("a", 3), ("a", 2), ("b", 1), ("b", 4), ("a", 4), ("b", 2)))
    val groupSort = foo.groupByKey().map {
      case (k, values) =>
        //n此時取值為2
        val sortValues = values.toList.sortWith(_ > _).take(2)
        (k, sortValues)
    }
    groupSort.flatMap { case (k, vs) => vs.map(k -> _) }.foreach(println)
  }

  def main(args: Array[String]): Unit = {
    avgMine
  }

}