1. 程式人生 > >spark SQL學習(案例-統計每日uv)

spark SQL學習(案例-統計每日uv)

 

需求:統計每日uv

package wujiadong_sparkSQL


import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._
/**
  * Created by Administrator on 2017/3/6.
  */
object DailyUV {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("dailyuv")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val userAccesslog = Array(
      "2017-01-01,1122",
      "2017-01-01,1122",
      "2017-01-01,1123",
      "2017-01-01,1124",
      "2017-01-01,1124",
      "2017-01-02,1122",
      "2017-01-01,1121",
      "2017-01-01,1123",
      "2017-01-01,1123"

    )
    val AccesslogRDD = sc.parallelize(userAccesslog,2)
    //val AccesslogRDD = sc.textFile("hdfs://master:9000/student/2016113012/data/userAccesslog.txt").map(_.split(","))
    //通過StructType直接指定每個欄位的schema
    val schema = StructType(
      Array(
        StructField("date",StringType,true),
        StructField("userid",IntegerType,true)
      )
    )

    //j將普通rdd對映到rowRDD
    val RowRDD = AccesslogRDD.map(log => Row(log.split(",")(0),log.split(",")(1).toInt))
    //將schema資訊對映到RowRDD上,即建立dataframe
    val df = sqlContext.createDataFrame(RowRDD,schema)
    //要使用spark SQL的內建函式需匯入SQLContext下的隱士轉換

    import sqlContext.implicits._
    df.groupBy("date") //根據日期分組
        .agg('date,countDistinct('userid))//根據日期聚合,然後根據使用者id,注意這裡的語法是‘引號
         .map(row => Row(row(1),row(2))).collect().foreach(println)


    //uv含義和業務,每天都有很多使用者訪問,每個使用者可能每天訪問很多次,uv指的是對使用者進行去重以後的訪問次數




  }

}



執行結果

[email protected]:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.DailyUV  --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar 
17/03/06 21:01:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/06 21:01:53 WARN SparkConf: 
SPARK_CLASSPATH was detected (set to ':/home/hadoop/bigdata/hive/lib/mysql-connector-java-5.1.26-bin.jar').
This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath
        
17/03/06 21:01:53 WARN SparkConf: Setting 'spark.executor.extraClassPath' to ':/home/hadoop/bigdata/hive/lib/mysql-connector-java-5.1.26-bin.jar' as a work-around.
17/03/06 21:01:53 WARN SparkConf: Setting 'spark.driver.extraClassPath' to ':/home/hadoop/bigdata/hive/lib/mysql-connector-java-5.1.26-bin.jar' as a work-around.
17/03/06 21:01:55 INFO Slf4jLogger: Slf4jLogger started
17/03/06 21:01:55 INFO Remoting: Starting remoting
17/03/06 21:01:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://
[email protected]
:57493] 17/03/06 21:01:57 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 17/03/06 21:01:58 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. [2017-01-01,4] [2017-01-02,1] 17/03/06 21:02:21 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 17/03/06 21:02:21 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 17/03/06 21:02:21 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.