pvuv的程式碼開發及提交spark程式jar包執行讀取資料來源並將結果寫入MySQL中
目錄
PvUvToMysql類
package com.fuyun.bigdate.spark import java.sql.{Connection, PreparedStatement} import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} /** * 統計PVUV將結果儲存到MySQL中. */ object PvUvToMysql { def main(args: Array[String]): Unit = { /** * 初始化 */ // 例項化sparkconf並設定程式執行名稱 val sparkConf = new SparkConf().setAppName("pvuv") // 例項化sparkcontext並將sparkconf傳入 val sc = new SparkContext(sparkConf) // 設定日誌級別 sc.setLogLevel("WARN") /** * 輸入 */ // 讀取輸入檔案的內容,並設定分割槽數為4 val inputRdd = sc.textFile("hdfs://192.168.xxx.xx/datas/2015082818.data",4) // 列印檔案內容的第一行 println(s"first = ${inputRdd.first()}") // 統計檔案的行數 println(s"count = ${inputRdd.count()}") /** * 轉換 */ // 先將輸入的每行去除首尾空格,再保留長度大於0的元素 val filterRdd = inputRdd.filter(_.trim.length > 0) .map(line => { // 將每行資料按"\t"分割 val arr = line.split("\t") // 返回date、url、guid (arr(17).substring(0, 10),arr(1), arr(5)) }) // 快取 filterRdd.persist(StorageLevel.MEMORY_AND_DISK_SER_2) // pv val pvRdd = filterRdd.map{ // 取出date、url case (date, url, guid) => (date, url) } // 先將每個元素去除首尾空格,然後只保留長度大於0的元素 .filter(_._2.trim.length > 0) // 返回(date,1)的元組 .map(tuple => (tuple._1, 1)) // 按日期分組求和統計每天的pv數 .reduceByKey(_ + _) println("====================pv output======================") // foreachPartition比foreach更優化 pvRdd.foreachPartition(iter => iter.foreach(println)) // uv // 先得到(date, guid) val uvRdd = filterRdd.map{case (date, url, guid) => (date, guid)} // 去重 .distinct() // 先將元素去除首尾空格再保留長度大於0 的元素 .filter(_._2.trim.length > 0) // 構造(date, 1)的元組 .map(tuple => (tuple._1, 1)) // 按date分組再根據value求和統計每天的uv數 .reduceByKey(_ + _) println("======================uv output========================") uvRdd.foreachPartition(iter => iter.foreach(println)) /** * union * def union[T: ClassTag](first: RDD[T], rest: RDD[T]*) * 注意:union合併時要求兩個rdd的型別必須一致 */ val unionRdd = pvRdd.union(uvRdd) unionRdd.foreach(println) /** * join * def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] * 注意:join時要求所有的RDD必須為二元組型別的rdd * */ val joinRdd = pvRdd.join(uvRdd) joinRdd.foreach{case (date, (pv, uv)) => println(s"${date}\t${pv}\t${uv}")} /** * 輸出 */ // 合併後設置分割槽數為1 joinRdd.coalesce(1).foreachPartition(iter => { // conn初始化為null var conn:Connection = null // 初始化ps_create為null var ps_create:PreparedStatement =null // 初始化ps_tru為null var ps_tru:PreparedStatement =null // 初始化ps_insert為null var ps_insert:PreparedStatement =null try{ // 得到connection conn = ConnectionUtils.getConnection // 如果表不存在,則建立表 val sql_create = "create table if not exists pvuv_tb (date varchar(66), pv int(10), uv int(10))" ps_create = conn.prepareStatement(sql_create) ps_create.execute() // 清空表格內容 /*val sql_tru = "truncate pvuv_tb" ps_tru = conn.prepareStatement(sql_tru) ps_tru.execute()*/ // 插入資料 val sql_insert = "insert into pvuv_tb values (?, ?, ?)" ps_insert = conn.prepareStatement(sql_insert) iter.foreach{ case (date, (pv, uv)) => println(s"${date}\t${pv}\t${uv}") ps_insert.setString(1, date) // 將date新增到第一個? ps_insert.setInt(2, uv) // 將pv新增到第二個? ps_insert.setInt(3, uv) // 將uv新增到第三個? ps_insert.executeUpdate() // 提交SQL語句 } }catch{ case e:Exception => println("MySQL Exception") }finally{ // 關流 if (ps_create != null) { ps_create.close() } if (ps_tru != null) { ps_tru.close() } if (ps_insert != null) { ps_insert.close() } ConnectionUtils.closeConnection(conn) } }) // 關閉快取 filterRdd.unpersist() /** * 關閉資源 */ // 設定執行緒睡眠時間 Thread.sleep(10000000000L) sc.stop() } }
ConnectionUtils類
package com.fuyun.bigdate.spark
import java.io.InputStream
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
/**
* Created by lenovo on 2018/9/26.
*/
object ConnectionUtils {
// 讀取properties檔案
val inputStream:InputStream = ConnectionUtils. getClass.getClassLoader.getResourceAsStream("jdbc.properties")
// 例項化properties物件
val prop = new Properties()
// 載入properties檔案
prop.load(inputStream)
// 載入driver.class.name
Class.forName(prop.getProperty("driver.class.name"))
// 載入mysql.url
val url = prop.getProperty("mysql.url")
// 載入mysql.user
val username = prop.getProperty("mysql.user")
// 載入mysql.password
val password = prop.getProperty("mysql.password")
def getConnection:Connection = {
DriverManager.getConnection(url, username, password)
}
def closeConnection(conn: Connection){
if (conn != null) {
conn.close()
}
}
}
jdbc.properties檔案
此檔案在main中的resources中建立,方便以後修改此檔案而實現不同的需求
driver.class.name=com.mysql.jdbc.Driver
mysql.url=jdbc:mysql://192.168.xxx.xx:3306/spark_db
mysql.user=root
mysql.password=123456
在IDEA中打jar包的兩種方式
IDEA打jar包
1,在專案上滑鼠右鍵 --> Open Module Settings
2, Artifacts --> + --> JAR --> From modules with dependencies…
3, Main Class是你這個專案(指令碼)的主方法,就是要執行的類,選一個
4,如下圖,設定 META-INF/MANIFEST.MF
5,選中你這個專案的根目錄,一定要放在根目錄下
6,設定完是這樣子的,關於 JAR files from libraries的兩個選項:
選中第一個的話,打完包後是一個jar包
選中第二個的話,打完包後是一個jar包,外帶你專案所用的jar包,個人推薦第二個
7,設定完後就可以點OK了
8,這個頁面, Build on make打上勾,其他的不一樣也沒事
9,最後一步, Build Artifacts… --> XXX.jar --> Build
10,複製這裡的路徑去找jar包就行
IDEA中maven方式打jar包
- 先清除,在右側的Maven Projects-> Lifecycle -> clean
- 編譯專案,選單欄Build -> Make Project
- 打jar包,在右側的Maven Projects-> Lifecycle -> package
- 然後到左側專案的target中尋找jar包
提交spark程式jar包執行
先將jar包放入linux的/opt/datas/目錄
將mysql-connector-java-5.1.27-bin.jar放入/opt/cdh-5.7.6/spark-2.2.0-cdh5.7.6/jars/目錄中
引數解釋
--master 設定模式:本地、叢集、yarn
--class 包含main方法的driver類的地址
--driver-memory 設定driver的記憶體
--executor-memory 設定executor的記憶體
--executor-cores 設定每個executor的核心數
--total-executor-cores 設定executor的總核心數
又上兩個引數可以算出executor的個數total-executor-cores/executor-cores
--num-executors 設定executor的個數(只有在yarn上執行才有這個引數)
本地模式執行
bin/spark-submit
–master local[2]
–class com.fuyun.bigdate.spark.PvUvToMysql
–driver-memory 512M
–executor-memory 512M
–executor-cores 1
–total-executor-cores 2
/opt/datas/sparkLearning.jar
叢集上執行
叢集執行
bin/spark-submit
–master spark://192.168.xxx.xx:7077
–class com.fuyun.bigdate.spark.PvUvToMysql
–driver-memory 512M
–executor-memory 512M
–executor-cores 1
–total-executor-cores 2
/opt/datas/sparkLearning.jar
yarn上執行
yarn上執行
bin/spark-submit
–master yarn
–class com.fuyun.bigdate.spark.PvUvToMysql
–driver-memory 512M
–executor-memory 512M
–num-executors 2
/opt/datas/sparkLearning.jar