1. 程式人生 > >pvuv的程式碼開發及提交spark程式jar包執行讀取資料來源並將結果寫入MySQL中

pvuv的程式碼開發及提交spark程式jar包執行讀取資料來源並將結果寫入MySQL中

目錄


PvUvToMysql類

package com.fuyun.bigdate.spark

import java.sql.{Connection, PreparedStatement}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 統計PVUV將結果儲存到MySQL中.
  */
object PvUvToMysql {

  def main(args: Array[String]): Unit = {
    /**
      * 初始化
      */
    // 例項化sparkconf並設定程式執行名稱
    val sparkConf = new SparkConf().setAppName("pvuv")
    //  例項化sparkcontext並將sparkconf傳入
    val sc = new SparkContext(sparkConf)
    // 設定日誌級別
    sc.setLogLevel("WARN")
    /**
      * 輸入
      */
    //  讀取輸入檔案的內容,並設定分割槽數為4
    val inputRdd = sc.textFile("hdfs://192.168.xxx.xx/datas/2015082818.data",4)
    //  列印檔案內容的第一行
    println(s"first = ${inputRdd.first()}")
    //  統計檔案的行數
    println(s"count = ${inputRdd.count()}")
    /**
      * 轉換
      */
    // 先將輸入的每行去除首尾空格,再保留長度大於0的元素
    val filterRdd = inputRdd.filter(_.trim.length > 0)
      .map(line => {
        //  將每行資料按"\t"分割
        val arr = line.split("\t")
        // 返回date、url、guid
        (arr(17).substring(0, 10),arr(1), arr(5))
      })
    // 快取
    filterRdd.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
    // pv
    val pvRdd = filterRdd.map{
      //  取出date、url
      case (date, url, guid) => (date, url)
    }
      // 先將每個元素去除首尾空格,然後只保留長度大於0的元素
      .filter(_._2.trim.length > 0)
      // 返回(date,1)的元組
      .map(tuple => (tuple._1, 1))
      // 按日期分組求和統計每天的pv數
      .reduceByKey(_ + _)
    println("====================pv output======================")
    // foreachPartition比foreach更優化
    pvRdd.foreachPartition(iter => iter.foreach(println))

    // uv
    // 先得到(date, guid)
    val uvRdd = filterRdd.map{case (date, url, guid) => (date, guid)}
      // 去重
        .distinct()
      // 先將元素去除首尾空格再保留長度大於0 的元素
        .filter(_._2.trim.length > 0)
      // 構造(date, 1)的元組
        .map(tuple => (tuple._1, 1))
      // 按date分組再根據value求和統計每天的uv數
        .reduceByKey(_ + _)
    println("======================uv output========================")
    uvRdd.foreachPartition(iter => iter.foreach(println))

    /**
      * union
      *   def union[T: ClassTag](first: RDD[T], rest: RDD[T]*)
      *   注意:union合併時要求兩個rdd的型別必須一致
      */
    val unionRdd = pvRdd.union(uvRdd)
    unionRdd.foreach(println)
    /**
      * join
      *  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
      *  注意:join時要求所有的RDD必須為二元組型別的rdd
      *
      */
    val joinRdd = pvRdd.join(uvRdd)
    joinRdd.foreach{case (date, (pv, uv)) => println(s"${date}\t${pv}\t${uv}")}

    /**
      * 輸出
      */
    // 合併後設置分割槽數為1
    joinRdd.coalesce(1).foreachPartition(iter => {
      // conn初始化為null
      var conn:Connection = null
      // 初始化ps_create為null
      var ps_create:PreparedStatement =null
      // 初始化ps_tru為null
      var ps_tru:PreparedStatement =null
      // 初始化ps_insert為null
      var ps_insert:PreparedStatement =null
      try{
        // 得到connection
        conn = ConnectionUtils.getConnection
        // 如果表不存在,則建立表
        val sql_create = "create table if not exists pvuv_tb (date varchar(66), pv int(10), uv int(10))"
        ps_create = conn.prepareStatement(sql_create)
        ps_create.execute()
        // 清空表格內容
        /*val sql_tru = "truncate pvuv_tb"
        ps_tru = conn.prepareStatement(sql_tru)
        ps_tru.execute()*/
        // 插入資料
        val sql_insert = "insert into pvuv_tb values (?, ?, ?)"
        ps_insert = conn.prepareStatement(sql_insert)
        iter.foreach{
          case (date, (pv, uv)) => println(s"${date}\t${pv}\t${uv}")
            ps_insert.setString(1, date) // 將date新增到第一個?
            ps_insert.setInt(2, uv) // 將pv新增到第二個?
            ps_insert.setInt(3, uv) // 將uv新增到第三個?
            ps_insert.executeUpdate() // 提交SQL語句
        }
      }catch{
        case e:Exception => println("MySQL Exception")
      }finally{
        // 關流
        if (ps_create != null) {
          ps_create.close()
        }
        if (ps_tru != null) {
          ps_tru.close()
        }
        if (ps_insert != null) {
          ps_insert.close()
        }
        ConnectionUtils.closeConnection(conn)
      }
    })

    // 關閉快取
    filterRdd.unpersist()
    /**
      * 關閉資源
      */
    // 設定執行緒睡眠時間
    Thread.sleep(10000000000L)
    sc.stop()
  }
}


ConnectionUtils類

package com.fuyun.bigdate.spark

import java.io.InputStream
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties

/**
  * Created by lenovo on 2018/9/26.
  */
object ConnectionUtils {
  // 讀取properties檔案
  val inputStream:InputStream = ConnectionUtils.
getClass.getClassLoader.getResourceAsStream("jdbc.properties") // 例項化properties物件 val prop = new Properties() // 載入properties檔案 prop.load(inputStream) // 載入driver.class.name Class.forName(prop.getProperty("driver.class.name")) // 載入mysql.url val url = prop.getProperty("mysql.url") // 載入mysql.user
val username = prop.getProperty("mysql.user") // 載入mysql.password val password = prop.getProperty("mysql.password") def getConnection:Connection = { DriverManager.getConnection(url, username, password) } def closeConnection(conn: Connection){ if (conn != null) { conn.close() } } }

jdbc.properties檔案

此檔案在main中的resources中建立,方便以後修改此檔案而實現不同的需求

driver.class.name=com.mysql.jdbc.Driver
mysql.url=jdbc:mysql://192.168.xxx.xx:3306/spark_db
mysql.user=root
mysql.password=123456

在IDEA中打jar包的兩種方式

IDEA打jar包

idea打包java可執行jar包

1,在專案上滑鼠右鍵 --> Open Module Settings
在這裡插入圖片描述
2, Artifacts --> + --> JAR --> From modules with dependencies…
在這裡插入圖片描述
3, Main Class是你這個專案(指令碼)的主方法,就是要執行的類,選一個
在這裡插入圖片描述
4,如下圖,設定 META-INF/MANIFEST.MF
在這裡插入圖片描述
5,選中你這個專案的根目錄,一定要放在根目錄下
在這裡插入圖片描述
6,設定完是這樣子的,關於 JAR files from libraries的兩個選項:

選中第一個的話,打完包後是一個jar包

選中第二個的話,打完包後是一個jar包,外帶你專案所用的jar包,個人推薦第二個
在這裡插入圖片描述
7,設定完後就可以點OK了

8,這個頁面, Build on make打上勾,其他的不一樣也沒事
在這裡插入圖片描述

9,最後一步, Build Artifacts… --> XXX.jar --> Build
在這裡插入圖片描述
在這裡插入圖片描述

10,複製這裡的路徑去找jar包就行
在這裡插入圖片描述

IDEA中maven方式打jar包

  1. 先清除,在右側的Maven Projects-> Lifecycle -> clean
    在這裡插入圖片描述
  2. 編譯專案,選單欄Build -> Make Project
    在這裡插入圖片描述
  3. 打jar包,在右側的Maven Projects-> Lifecycle -> package
    在這裡插入圖片描述
  4. 然後到左側專案的target中尋找jar包
    在這裡插入圖片描述

提交spark程式jar包執行

先將jar包放入linux的/opt/datas/目錄

將mysql-connector-java-5.1.27-bin.jar放入/opt/cdh-5.7.6/spark-2.2.0-cdh5.7.6/jars/目錄中

引數解釋

--master 設定模式:本地、叢集、yarn
--class  包含main方法的driver類的地址
--driver-memory  設定driver的記憶體
--executor-memory  設定executor的記憶體
--executor-cores  設定每個executor的核心數
--total-executor-cores  設定executor的總核心數
又上兩個引數可以算出executor的個數total-executor-cores/executor-cores
--num-executors  設定executor的個數(只有在yarn上執行才有這個引數)

本地模式執行

bin/spark-submit
–master local[2]
–class com.fuyun.bigdate.spark.PvUvToMysql
–driver-memory 512M
–executor-memory 512M
–executor-cores 1
–total-executor-cores 2
/opt/datas/sparkLearning.jar
在這裡插入圖片描述

在這裡插入圖片描述
在這裡插入圖片描述

叢集上執行

叢集執行
bin/spark-submit
–master spark://192.168.xxx.xx:7077
–class com.fuyun.bigdate.spark.PvUvToMysql
–driver-memory 512M
–executor-memory 512M
–executor-cores 1
–total-executor-cores 2
/opt/datas/sparkLearning.jar
在這裡插入圖片描述

在這裡插入圖片描述
在這裡插入圖片描述

yarn上執行

yarn上執行
bin/spark-submit
–master yarn
–class com.fuyun.bigdate.spark.PvUvToMysql
–driver-memory 512M
–executor-memory 512M
–num-executors 2
/opt/datas/sparkLearning.jar
在這裡插入圖片描述

在這裡插入圖片描述

在這裡插入圖片描述