1. 程式人生 > >Spark商業案例與效能調優實戰100課》第3課:商業案例之通過RDD分析大資料電影點評系各種型別的最喜愛電影TopN及效能優化技巧

Spark商業案例與效能調優實戰100課》第3課:商業案例之通過RDD分析大資料電影點評系各種型別的最喜愛電影TopN及效能優化技巧

Spark商業案例與效能調優實戰100課》第3課:商業案例之通過RDD分析大資料電影點評系各種型別的最喜愛電影TopN及效能優化技

原始碼

package com.dt.spark.cores

import org.apache.spark.{SparkConf, SparkContext}

object Movie_Users_Analyzer {
  def main(args: Array[String]): Unit = {
    var masterUrl = "local[4]"
var dataPath = "data/movielens/medium/"
if (args.length > 0
) { masterUrl = args(0) } else if (args.length > 1) { dataPath = args(1) } val sc = new SparkContext(new SparkConf().setMaster(masterUrl).setAppName("Movie_Users_Analyzer")) val usersRDD = sc.textFile(dataPath + "users.dat") val moviessRDD = sc.textFile(dataPath + "movies.dat"
) val occupationsRDD = sc.textFile(dataPath + "occupation.dat") val ratingsRDD = sc.textFile(dataPath + "ratings.dat") val usersBasic = usersRDD.map(_.split("::")).map { user => ( user(3), (user(0), user(1), user(2)) ) } val occupations = occupationsRDD.map(_.split("::"
)).map(job => (job(0), job(1))) val userInformation = usersBasic.join(occupations) userInformation.cache() for (elem <- userInformation.collect()) { println(elem) } val targetMoive = ratingsRDD.map(_.split("::")).map(x => (x(0), x(1))).filter(_._2.equals("1193")) val targetUsers = userInformation.map(x => (x._2._1._1, x._2)) val userInformationForSpecificMovie = targetMoive.join(targetUsers) for (elem <- userInformationForSpecificMovie.collect()) { println(elem) } //users.dat UserID::Gender::Age::Occupation::Zip-code //ratings.dat UserID::MovieID::Rating::Timestamp //Occupation 6: "doctor/health care" // movies.dat MovieID::Title::Genres val ratings = ratingsRDD.map(_.split("::")).map(x => (x(0), x(1), x(2))).cache() ratings.map(x => (x._2, (x._3.toInt, 1))) .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) // 總分,總人數 .map(x => (x._2._1.toDouble / x._2._2, x._1)) .sortByKey(false) .take(10) .foreach(println) //觀看人數最多的電影 //ratings.dat UserID::MovieID::Rating::Timestamp ratings.map(x => (x._1, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).sortByKey(false) .take(10).foreach(print) //1,"users.dat":UserID::Gender::Age::OccupationID::Zip-code //2,"movies.dat":MovieID::Title::Genres val male = "M" val female = "F" //ratings.dat UserID::MovieID::Rating::Timestamp val genderRatings = ratings.map(x => (x._1, (x._1, x._2, x._3))).join( usersRDD.map(_.split("::")).map(x => (x(0), x(1)))).cache() genderRatings.take(10).foreach(println) val maleRatings = genderRatings.filter(x => x._2._2.equals("M")).map(x => x._2._1) val femaleRatings = genderRatings.filter(x => x._2._2.equals("F")) map (x => x._2._1) maleRatings.map(x => (x._2, (x._3.toInt, 1))) .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) // 總分,總人數 .map(x => (x._2._1.toDouble / x._2._2, x._1)) .sortByKey(false) .map(x => (x._2, x._1)) .take(10) .foreach(println) // femaleRatings.map(x => (x._2, (x._3.toInt, 1))) .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) // 總分,總人數 .map(x => (x._2._1.toDouble / x._2._2, x._1)) .sortByKey(false) .map(x => (x._2, x._1)) .take(10) .foreach(println) } }