1. 程式人生 > >Spark:求出分組內的TopN

Spark:求出分組內的TopN

lac args read setprop ber rgs cas arr repl

制作測試數據源:

c1 85
c2 77
c3 88
c1 22
c1 66
c3 95
c3 54
c2 91
c2 66
c1 54
c1 65
c2 41
c4 65

spark scala實現代碼:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object GroupTopN1 {
  System.setProperty("hadoop.home.dir", "D:\\Java_Study\\hadoop-common-2.2.0-bin-master")

  
case class Rating(userId: String, rating: Long) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("ALS with ML Pipeline") val spark = SparkSession .builder() .config(sparkConf) .master("local") .config("spark.sql.warehouse.dir", "/") .getOrCreate()
import spark.implicits._ import spark.sql val lines = spark.read.textFile("C:\\Users\\Administrator\\Desktop\\group.txt") val classScores = lines.map(line => Rating(line.split(" ")(0).toString, line.split(" ")(1).toLong)) classScores.createOrReplaceTempView("tb_test") var df
= sql( s"""|select | userId, | rating, | row_number()over(partition by userId order by rating desc) rn |from tb_test |having(rn<=3) |""".stripMargin) df.show() spark.stop() } }

打印結果:

+------+------+---+
|userId|rating| rn|
+------+------+---+
|    c1|    85|  1|
|    c1|    66|  2|
|    c1|    65|  3|
|    c4|    65|  1|
|    c3|    95|  1|
|    c3|    88|  2|
|    c3|    54|  3|
|    c2|    91|  1|
|    c2|    77|  2|
|    c2|    66|  3|
+------+------+---+

Spark:求出分組內的TopN