1. 程式人生 > >實驗 4 RDD 編程初級實踐

實驗 4 RDD 編程初級實踐

-c close core opened color map view park row

註意:spark的編碼格式是utf-8,其他的格式會有亂碼,所以文件要使用utf-8編碼

pom.xml:

技術分享圖片
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>gao</groupId>
    <artifactId>WordCount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true
</skip> </configuration> </plugin> </plugins> </build> </project>
View Code

(1)該系總共有多少學生;

(2)該系共開設來多少門課程;

(3)Tom 同學的總成績平均分是多少;

(4)求每名同學的選修的課程門數;

(5)該系 DataBase 課程共有多少人選修;

(6)各門課程的平均分是多少;

(7)使用累加器計算共有多少人選了 DataBase 這門課。

技術分享圖片
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object one {
  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setMaster("local")
      .setAppName("text1")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("C:\\Users\\Administrator\\Desktop\\Data01.txt")
    
//該系總共有多少學生; val par=rdd.map( row=>row.split(",")(0)) var count=par.distinct() println("學生總人數:"+count.count()) //該系共開設來多少門課程; val couse=rdd.map( row=>row.split(",")(1)) println("課程數:"+couse.distinct().count()) //Tom 同學的總成績平均分是多少; val pare = rdd.filter(row=>row.split(",")(0)=="Tom") /*pare.foreach(println)*/ pare.map(row=>(row.split(",")(0),row.split(",")(2).toInt)) .mapValues(x=>(x,1)) .reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)) .mapValues(x => (x._1 / x._2)) .collect().foreach(x=>println("Tom的平均成績:"+x._2)) //求每名同學的選修的課程門數; val pare2 = rdd.map(row=>(row.split(",")(0),row.split(",")(1))) pare2.mapValues(x => (x,1)).reduceByKey((x,y) => (" ",x._2 + y._2)).mapValues(x => x._2).foreach(println) //該系 DataBase 課程共有多少人選修; val pare3 = rdd.filter(row=>row.split(",")(1)=="DataBase") println("DataBase的選修人數:"+pare3.count) // 各門課程的平均分是多少; val pare4 = rdd.map(row=>(row.split(",")(1),row.split(",")(2).toInt)) pare4.mapValues(x=>(x,1)) .reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)) .mapValues(x => (x._1/ x._2)) .collect().foreach(println) //使用累加器計算共有多少人選了 DataBase 這門課。 val pare5 = rdd.filter(row=>row.split(",")(1)=="DataBase") .map(row=>(row.split(",")(1),1)) val accum = sc.longAccumulator("My Accumulator") pare5.values.foreach(x => accum.add(x)) println("選了 DataBase 這門課的人數:"+accum.value) } }
View Code

2.對於兩個輸入文件 A 和 B,編寫 Spark 獨立應用程序,對兩個文件進行合並,並剔除其 中重復的內容,得到一個新文件 C

技術分享圖片
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner

object two
{
  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setMaster("local")
      .setAppName("text2")
    val sc = new SparkContext(conf)
    val dataFile = "C:\\Users\\Administrator\\Desktop\\data"
    val data = sc.textFile(dataFile,2)
    val res = data.filter(_.trim().length>0).map(line=>(line.trim,"\t"))
      .partitionBy(new HashPartitioner(1)).groupByKey().sortByKey().keys
    res.saveAsTextFile("result")
  }
}
View Code

3.每個輸入文件表示班級學生某個學科的成績,每行內容由兩個字段組成,第一個是學生 名字,第二個是學生的成績;編寫 Spark 獨立應用程序求出所有學生的平均成績,並輸出到 一個新文件中。

技術分享圖片
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner

object three {
  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setMaster("local")
      .setAppName("text3")
    val sc = new SparkContext(conf)
    val dataFile = "C:\\Users\\Administrator\\Desktop\\data1"
    val data = sc.textFile(dataFile,3)
    val res = data.filter(_.trim().length>0)
      .map(line=>(line.split("\t")(0).trim()
        ,line.split("\t")(1).trim().toInt))
      .partitionBy(new HashPartitioner(1))
      .groupByKey().map(x => {
      var n = 0
      var sum = 0.0
      for(i <- x._2){
        sum = sum + i
        n = n +1
      }
      val avg = sum/n
      val format = f"$avg%1.2f".toDouble
      (x._1,format)
    })
    res.saveAsTextFile("result1")
  }
}
View Code

實驗 4 RDD 編程初級實踐