1. 程式人生 > >12.Spark SQL:開窗函式以及top3銷售額統計案例實戰

12.Spark SQL:開窗函式以及top3銷售額統計案例實戰

Spark 1.4.x版本以後,為Spark SQL和DataFrame引入了開窗函式,比如最經典,最常用的,row_number(),可以讓我們實現分組取topn的邏輯。

案例:統計每個種類的銷售額排名前3的產品

java版本
package cn.spark.study.sql;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;


/**
 * 84講,row_number()開窗函式實戰
 * @author leizq120310
 *
 */


public class RowNumberWindowFunction {


	public static void main(String[] args) {
		// 建立SparkConf,叢集執行
		SparkConf conf = new SparkConf()
				.setAppName("RowNumberWindowFunction");
		
		// 建立JavaSparkContext
		JavaSparkContext sc = new JavaSparkContext(conf);
		HiveContext hiveContext = new HiveContext(sc.sc());
		
		// 建立銷售額表,sales表
		hiveContext.sql("DROP TABLE IF EXISTS sales");
		hiveContext.sql("CREATE TABLE IF NOT EXISTS sales ("
				+ "product STRING,"
				+ "category STRING, "
				+ "revenue BIGINT)");
		hiveContext.sql("LOAD DATA "
				+ "LOCAL INPATH '/usr/local/spark-study/resources/sales.txt' "
				+ "INTO TABLE sales");
		
		// 開始編寫我們的統計邏輯,使用row_number()開窗函式
		// 先說明一下,row_number()開窗函式的作用
		// 其實,就是給每個分組的數所在,按照其排序順序,打上一個分組內的行號
		// 比如說,有一個分組date=20151001, 裡面有3條資料,1122,1121,1124,
		// 那麼對這個分組的每一行使用row_number()開窗函式以後,三行,依次會獲得一個組內的行號
		// 行號從1開始遞增,比如1122 1, 1121 2, 1124, 3
		DataFrame top3SaleDF = hiveContext.sql(""
				+ "SELECT product, category,revenue "
				+ "FROM ("
					+ "SELECT "
						+ "product, "
						+ "category, "
						+ "revenue, "
						// row_number()開窗函式的語法說明
						// 首先可以,在SELECT查詢時,使用row_number()函式
						// 其次,row_number()函式後面先跟上OVER關鍵字
						// 然後括號中,是PARTITION BY,也就是說根據哪個欄位進行分組
						// 其次是可以用ORDER BY 進行組內排序
						// 然後row_number()就可以給每個組內的行,一個組內行號
						+ "row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank "
						+ "FROM sales "
				+ ") tmp_sales "
				+ "WHERE rank<=3");
		// 將每組排名前3的資料,儲存到一個表中
		hiveContext.sql("DROP TABLE IF EXISTS top3_sales");
		top3SaleDF.saveAsTable("top3_sales");
		
		// 關閉JavaSparkContext
		sc.close();
	}
}
scala版本:
package cn.spark.study.sql


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;


/**
 * 84講,row_number()開窗函式實戰
 * @author leizq120310
 *
 */


object RowNumberWindowFunction {
  def main(args:Array[String])
  {
    // 建立SparkConf,叢集執行
		val conf = new SparkConf()
				.setAppName("RowNumberWindowFunction");
		
		// 建立JavaSparkContext
		val sc = new JavaSparkContext(conf);
		val hiveContext = new HiveContext(sc);
		
		// 建立銷售額表,sales表
		hiveContext.sql("DROP TABLE IF EXISTS sales");
		hiveContext.sql("CREATE TABLE IF NOT EXISTS sales ("
				+ "product STRING,"
				+ "category STRING, "
				+ "revenue BIGINT)");
		hiveContext.sql("LOAD DATA "
				+ "LOCAL INPATH '/usr/local/spark-study/resources/sales.txt' "
				+ "INTO TABLE sales");
		
		// 開始編寫我們的統計邏輯,使用row_number()開窗函式
		// 先說明一下,row_number()開窗函式的作用
		// 其實,就是給每個分組的數所在,按照其排序順序,打上一個分組內的行號
		// 比如說,有一個分組date=20151001, 裡面有3條資料,1122,1121,1124,
		// 那麼對這個分組的每一行使用row_number()開窗函式以後,三行,依次會獲得一個組內的行號
		// 行號從1開始遞增,比如1122 1, 1121 2, 1124, 3
		val top3SaleDF = hiveContext.sql(""
				+ "SELECT product, category,revenue "
				+ "FROM ("
					+ "SELECT "
						+ "product, "
						+ "category, "
						+ "revenue, "
						// row_number()開窗函式的語法說明
						// 首先可以,在SELECT查詢時,使用row_number()函式
						// 其次,row_number()函式後面先跟上OVER關鍵字
						// 然後括號中,是PARTITION BY,也就是說根據哪個欄位進行分組
						// 其次是可以用ORDER BY 進行組內排序
						// 然後row_number()就可以給每個組內的行,一個組內行號
						+ "row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank "
						+ "FROM sales "
				+ ") tmp_sales "
				+ "WHERE rank<=3");
		// 將每組排名前3的資料,儲存到一個表中
		hiveContext.sql("DROP TABLE IF EXISTS top3_sales");
		top3SaleDF.saveAsTable("top3_sales");
		
		// 關閉JavaSparkContext
		sc.close();
  }
}