本文環境說明

  1. centos伺服器
  2. jupyterscalaspylon-kernel
  3. spark-2.4.0
  4. scala-2.11.12
  5. hadoop-2.6.0

本文主要內容

  • spark讀取hive表的資料,主要包括直接sql讀取hive表;通過hdfs檔案讀取hive表,以及hive分割槽表的讀取。
  • 通過jupyter上的cell來初始化sparksession。
  • 文末還有通過spark提取hdfs檔案的完整示例

jupyter配置檔案

  • 我們可以在jupyter的cell框裡面,對spark的session做出對應的初始化,具體可以見下面的示例。
  1. %%init_spark
  2. launcher.master = "local[*]"
  3. launcher.conf.spark.app.name = "BDP-xw"
  4. launcher.conf.spark.driver.cores = 2
  5. launcher.conf.spark.num_executors = 3
  6. launcher.conf.spark.executor.cores = 4
  7. launcher.conf.spark.driver.memory = '4g'
  8. launcher.conf.spark.executor.memory = '4g'
  1. // launcher.conf.spark.serializer = "org.apache.spark.serializer.KryoSerializer"
  2. // launcher.conf.spark.kryoserializer.buffer.max = '4g'
  1. import org.apache.spark.sql.SparkSession
  2. var NumExecutors = spark.conf.getOption("spark.num_executors").repr
  3. var ExecutorMemory = spark.conf.getOption("spark.executor.memory").repr
  4. var AppName = spark.conf.getOption("spark.app.name").repr
  5. var max_buffer = spark.conf.getOption("spark.kryoserializer.buffer.max").repr
  1. println(f"Config as follows: \nNumExecutors: $NumExecutors, \nAppName: $AppName,\nmax_buffer:$max_buffer")

  • 直接檢視我們初始化的sparksession對應的環境各變數

從hive中取數

直接sparksql走起
  1. import org.apache.spark.sql.SparkSession
  2. val sql_1 = """select * from tbs limit 4 """
  3. var df = sql(sql_1)
  4. df.show(5, false)

通過hdfs取數
  1. object LoadingData_from_hdfs_base extends mylog{// with Logging
  2. ...
  3. def main(args: Array[String]=Array("tb1", "3", "\001", "cols", "")): Unit = {
  4. if (args.length < 2) {
  5. println("Usage: LoadingData_from_hdfs <tb_name, parts. sep_line, cols, paths>")
  6. System.err.println("Usage: LoadingData_from_hdfs <tb_name, parts, sep_line, cols, paths>")
  7. System.exit(1)
  8. }
  9. log.warn("開始啦排程")
  10. val tb_name = args(0)
  11. val parts = args(1)
  12. val sep_line = args(2)
  13. val select_col = args(3)
  14. val save_paths = args(4)
  15. val select_cols = select_col.split("#").toSeq
  16. log.warn(s"Loading cols are : \n $select_cols")
  17. val gb_sql = s"DESCRIBE FORMATTED ${tb_name}"
  18. val gb_desc = sql(gb_sql)
  19. val hdfs_address = gb_desc.filter($"col_name".contains("Location")).take(1)(0).getString(1)
  20. val hdfs_address_cha = s"$hdfs_address/*/"
  21. val Cs = new DataProcess_base(spark)
  22. val tb_desc = Cs.get_table_desc(tb_name)
  23. val raw_data = Cs.get_hdfs_data(hdfs_address)
  24. val len1 = raw_data.map(item => item.split(sep_line)).first.length
  25. val names = tb_desc.filter(!$"col_name".contains("#")).dropDuplicates(Seq("col_name")).sort("id").select("col_name").take(len1).map(_(0)).toSeq.map(_.toString)
  26. val schema1 = StructType(names.map(fieldName => StructField(fieldName, StringType)))
  27. val rawRDD = raw_data.map(_.split(sep_line).map(_.toString)).map(p => Row(p: _*)).filter(_.length == len1)
  28. val df_data = spark.createDataFrame(rawRDD, schema1)//.filter("custommsgtype = '1'")
  29. val df_desc = select_cols.toDF.join(tb_desc, $"value"===$"col_name", "left")
  30. val df_gb_result = df_data.select(select_cols.map(df_data.col(_)): _*)//.limit(100)
  31. df_gb_result.show(5, false)
  32. ...
  33. // spark.stop()
  34. }
  35. }
  1. val cols = "area_name#city_name#province_name"
  2. val tb_name = "tb1"
  3. val sep_line = "\u0001"
  4. // 執行指令碼
  5. LoadingData_from_hdfs_base.main(Array(tb_name, "4", sep_line, cols, ""))

)

判斷路徑是否為資料夾

  • 方式1
  1. def pathIsExist(spark: SparkSession, path: String): Boolean = {
  2. val filePath = new org.apache.hadoop.fs.Path( path )
  3. val fileSystem = filePath.getFileSystem( spark.sparkContext.hadoopConfiguration )
  4. fileSystem.exists( filePath )
  5. }
  6. pathIsExist(spark, hdfs_address)
  7. // 得到結果如下:
  8. // pathIsExist: (spark: org.apache.spark.sql.SparkSession, path: String)Boolean
  9. // res4: Boolean = true
  • 方式2
  1. import java.io.File
  2. val d = new File("/usr/local/xw")
  3. d.isDirectory
  4. // 得到結果如下:
  5. // d: java.io.File = /usr/local/xw
  6. // res3: Boolean = true

分割槽表讀取源資料

  • 對分割槽檔案需要注意下,需要保證原始的hdfs上的raw檔案裡面是否有對應的分割槽欄位值

單個檔案讀取
  1. object LoadingData_from_hdfs_onefile_with_path extends mylog{
  2. def main(args: Array[String]=Array("tb_name", "hdfs:/", "3","\n", "\001", "cols", "")): Unit = {
  3. ...
  4. val hdfs_address = args(1)
  5. val len1 = raw_data.map(item => item.split(sep_line)).first.length
  6. val rawRDD = raw_data.flatMap(line => line.split(sep_text)).map(word => (word.split(sep_line):+hdfs_address)).map(p => Row(p: _*))
  7. println(rawRDD.take(2))
  8. val names = tb_desc.filter(!$"col_name".contains("#")).dropDuplicates(Seq("col_name")).sort("id").select("col_name").take(len1).map(_(0)).toSeq.map(_.toString)
  9. import org.apache.spark.sql.types.StructType
  10. val schema1 = StructType(names.map(fieldName => StructField(fieldName, StringType)))
  11. val new_schema1 = schema1.add(StructField("path", StringType))
  12. val df_data = spark.createDataFrame(rawRDD, new_schema1)
  13. val df_desc = select_cols.toDF.join(tb_desc, $"value"===$"col_name", "left")
  14. // df_desc.show(false)
  15. val df_gb_result = df_data.select(select_cols.map(df_data.col(_)): _*)//.limit(100)
  16. df_gb_result.show(5, false)
  17. ...
  18. // spark.stop()
  19. }
  20. }
  1. val file1 = "hdfs:file1.csv"
  2. val tb_name = "tb_name"
  3. val sep_text = "\n"
  4. val sep_line = "\001"
  5. val cols = "city#province#etl_date#path"
  6. // 執行指令碼
  7. LoadingData_from_hdfs_onefile_with_path.main(Array(tb_name, file1, "4", sep_line, sep_text, cols, ""))

多個檔案讀取嘗試1
  1. object LoadingData_from_hdfs_wholetext_with_path extends mylog{// with Logging
  2. ...
  3. def main(args: Array[String]=Array("tb1", "hdfs:/", "3","\n", "\001", "cols", "")): Unit = {
  4. ...
  5. val tb_name = args(0)
  6. val hdfs_address = args(1)
  7. val parts = args(2)
  8. val sep_line = args(3)
  9. val sep_text = args(4)
  10. val select_col = args(5)
  11. val save_paths = args(6)
  12. val select_cols = select_col.split("#").toSeq
  13. val Cs = new DataProcess_get_data(spark)
  14. val tb_desc = Cs.get_table_desc(tb_name)
  15. val rddWhole = spark.sparkContext.wholeTextFiles(s"$hdfs_address", 10)
  16. rddWhole.foreach(f=>{
  17. println(f._1+"資料量是=>"+f._2.split("\n").length)
  18. })
  19. val files = rddWhole.collect
  20. val len1 = files.flatMap(item => item._2.split(sep_text)).take(1).flatMap(items=>items.split(sep_line)).length
  21. val names = tb_desc.filter(!$"col_name".contains("#")).dropDuplicates(Seq("col_name")).sort("id").select("col_name").take(len1).map(_(0)).toSeq.map(_.toString)
  22. import org.apache.spark.sql.types.StructType
  23. // 解析wholeTextFiles讀取的結果並轉化成dataframe
  24. val wordCount = files.map(f=>f._2.split(sep_text).map(g=>g.split(sep_line):+f._1.split("/").takeRight(1)(0))).flatMap(h=>h).map(p => Row(p: _*))
  25. val schema1 = StructType(names.map(fieldName => StructField(fieldName, StringType)))
  26. val new_schema1 = schema1.add(StructField("path", StringType))
  27. val rawRDD = sc.parallelize(wordCount)
  28. val df_data = spark.createDataFrame(rawRDD, new_schema1)
  29. val df_desc = select_cols.toDF.join(tb_desc, $"value"===$"col_name", "left")
  30. //df_desc.show(false)
  31. val df_gb_result = df_data.select(select_cols.map(df_data.col(_)): _*)
  32. df_gb_result.show(5, false)
  33. println("生成的dataframe,依path列groupby的結果如下")
  34. df_gb_result.groupBy("path").count().show(false)
  35. ...
  36. // spark.stop()
  37. }
  38. }
  1. val file1 = "hdfs:file1_1[01].csv"
  2. val tb_name = "tb_name"
  3. val sep_text = "\n"
  4. val sep_line = "\001"
  5. val cols = "city#province#etl_date#path"
  6. // 執行指令碼
  7. LoadingData_from_hdfs_wholetext_with_path.main(Array(tb_name, file1, "4", sep_line, sep_text, cols, ""))

讀取多檔案且保留檔名為列名技術實現
  • 以下實現功能

    • Array[(String, String)]型別的按(String, String)拆成多行;
    • 將(String, String)中的第2個元素,按照\n分割符分成多行,按\?分隔符分成多列;
    • 將(String, String)中的第1個元素,分別加到2中的每行後面。在dataframe中呈現的就是新增一列啦
  • 業務場景

    • 如果要一次讀取多個檔案,且相對合並後的資料集中,對資料來源於哪一個檔案作出區分。
  1. // 測試用例,主要是把wholetextfile得到的結果轉化為DataFrame
  2. val test1 = Array(("abasdfsdf", "a?b?c?d\nc?d?d?e"), ("sdfasdf", "b?d?a?e\nc?d?e?f"))
  3. val test2 = test1.map(line=>line._2.split("\n").map(line1=>line1.split("\\?"):+line._1)).flatMap(line2=>line2).map(p => Row(p: _*))
  4. val cols = "cn1#cn2#cn3#cn4#path"
  5. val names = cols.split("#")
  6. val schema1 = StructType(names.map(fieldName => StructField(fieldName, StringType)))
  7. val rawRDD = sc.parallelize(test2)
  8. val df_data = spark.createDataFrame(rawRDD, schema1)
  9. df_data.show(4, false)
  10. test1

多個檔案讀取for迴圈
  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.sql.{DataFrame, Row}
  3. import org.apache.spark.sql.SparkSession
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. import org.apache.spark.sql.functions.monotonically_increasing_id
  6. import org.apache.log4j.{Level, Logger}
  7. import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
  8. import org.apache.hadoop.fs.{FileSystem, Path}
  9. Logger.getLogger("org").setLevel(Level.WARN)
  10. // val log = Logger.getLogger(this.getClass)
  11. @transient lazy val log:Logger = Logger.getLogger(this.getClass)
  12. class DataProcess_get_data_byfor (ss: SparkSession) extends java.io.Serializable{
  13. import ss.implicits._
  14. import ss.sql
  15. import org.apache.spark.sql.types.DataTypes
  16. ...
  17. def union_dataframe(df_1:RDD[String], df_2:RDD[String]):RDD[String] ={
  18. val count1 = df_1.map(item=>item.split(sep_line)).take(1)(0).length
  19. val count2 = df_2.map(item=>item.split(sep_line)).take(1)(0).length
  20. val name2 = df_2.name.split("/").takeRight(1)(0)
  21. val arr2 = df_2.map(item=>item.split(sep_line):+name2).map(p => Row(p: _*))
  22. println(s"執行到這兒了")
  23. var name1 = ""
  24. var arr1 = ss.sparkContext.makeRDD(List().map(p => Row(p: _*)))
  25. // var arr1 = Array[org.apache.spark.sql.Row]
  26. if (count1 == count2){
  27. name1 = df_1.name.split("/").takeRight(1)(0)
  28. arr1 = df_1.map(item=>item.split(sep_line):+name1).map(p => Row(p: _*))
  29. // arr1.foreach(f=>print(s"arr1嘞$f" + f.length + "\n"))
  30. println(s"執行到這兒了沒?$count1~$count2 $name1/$name2")
  31. arr1
  32. }
  33. else{
  34. println(s"執行到這兒了不相等哈?$count1~$count2 $name1/$name2")
  35. arr1 = df_1.map(item=>item.split(sep_line)).map(p => Row(p: _*))
  36. }
  37. var rawRDD = arr1.union(arr2)
  38. // arr3.foreach(f=>print(s"$f" + f.length + "\n"))
  39. // var rawRDD = sc.parallelize(arr3)
  40. var sourceRdd = rawRDD.map(_.mkString(sep_line))
  41. // var count31 = arr1.take(1)(0).length
  42. // var count32 = arr2.take(1)(0).length
  43. // var count3 = sourceRdd.map(item=>item.split(sep_line)).take(1)(0).length
  44. // var nums = sourceRdd.count
  45. // print(s"arr1: $count31、arr2: $count32、arr3: $count3, 資料量為:$nums")
  46. sourceRdd
  47. }
  48. }
  1. object LoadingData_from_hdfs_text_with_path_byfor extends mylog{// with Logging
  2. ...
  3. def main(args: Array[String]=Array("tb1", "hdfs:/", "3","\n", "\001", "cols","data1", "test", "")): Unit = {
  4. ...
  5. val hdfs_address = args(1)
  6. ...
  7. val pattern = args(6)
  8. val pattern_no = args(7)
  9. val select_cols = select_col.split("#").toSeq
  10. log.warn(s"Loading cols are : \n $select_cols")
  11. val files = FileSystem.get(spark.sparkContext.hadoopConfiguration).listStatus(new Path(s"$hdfs_address"))
  12. val files_name = files.toList.map(f=> f.getPath.getName)
  13. val file_filter = files_name.filter(_.contains(pattern)).filterNot(_.contains(pattern_no))
  14. val df_1 = file_filter.map(item=> sc.textFile(s"$path1$item"))
  15. df_1.foreach(f=>{
  16. println(f + "資料量是" + f.count)
  17. })
  18. val df2 = df_1.reduce(_ union _)
  19. println("合併後的資料量是" + df2.count)
  20. val Cs = new DataProcess_get_data_byfor(spark)
  21. ...
  22. // 將for迴圈讀取的結果合併起來
  23. val result = df_1.reduce((a, b)=>union_dataframe(a, b))
  24. val result2 = result.map(item=>item.split(sep_line)).map(p => Row(p: _*))
  25. val df_data = spark.createDataFrame(result2, new_schema1)
  26. val df_desc = select_cols.toDF.join(tb_desc, $"value"===$"col_name", "left")
  27. println("\n")
  28. //df_desc.show(false)
  29. val df_gb_result = df_data.select(select_cols.map(df_data.col(_)): _*)
  30. df_gb_result.show(5, false)
  31. println("生成的dataframe,依path列groupby的結果如下")
  32. df_gb_result.groupBy("path").count().show(false)
  33. ...
  34. // spark.stop()
  35. }
  36. }
  1. val path1 = "hdfs:202001/"
  2. val tb_name = "tb_name"
  3. val sep_text = "\n"
  4. val sep_line = "\001"
  5. val cols = "city#province#etl_date#path"
  6. val pattern = "result_copy_1"
  7. val pattern_no = "1.csv"
  8. // val file_filter = List("file1_10.csv", "file_12.csv", "file_11.csv")
  1. // 執行指令碼
  2. LoadingData_from_hdfs_text_with_path_byfor.main(Array(tb_name, path1, "4", sep_line, sep_text, cols, pattern, pattern_no, ""))

執行指令碼的完整示例

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.sql.{DataFrame, Row}
  3. import org.apache.spark.sql.SparkSession
  4. import org.apache.spark.sql.functions.monotonically_increasing_id
  5. import org.apache.log4j.{Level, Logger}
  6. import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
  7. Logger.getLogger("org").setLevel(Level.WARN)
  8. val log = Logger.getLogger(this.getClass)
  9. class DataProcess_base (ss: SparkSession) extends java.io.Serializable{
  10. import ss.implicits._
  11. import ss.sql
  12. import org.apache.spark.sql.types.DataTypes
  13. def get_table_desc(tb_name:String="tb"):DataFrame ={
  14. val gb_sql = s"desc ${tb_name}"
  15. val gb_desc = sql(gb_sql)
  16. val names = gb_desc.filter(!$"col_name".contains("#")).withColumn("id", monotonically_increasing_id())
  17. names
  18. }
  19. def get_hdfs_data(hdfs_address:String="hdfs:"):RDD[String]={
  20. val gb_data = ss.sparkContext.textFile(hdfs_address)
  21. gb_data.cache()
  22. val counts1 = gb_data.count
  23. println(f"the rows of origin hdfs data is $counts1%-1d")
  24. gb_data
  25. }
  26. }
  1. object LoadingData_from_hdfs_base extends mylog{// with Logging
  2. Logger.getLogger("org").setLevel(Level.WARN)
  3. val conf = new SparkConf()
  4. conf.setMaster("yarn")
  5. conf.setAppName("LoadingData_From_hdfs")
  6. conf.set("spark.home", System.getenv("SPARK_HOME"))
  7. val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
  8. import spark.implicits._
  9. import spark.sql
  10. var UIAddress = spark.conf.getOption("spark.driver.appUIAddress").repr
  11. var yarnserver = spark.conf.getOption("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES").repr
  12. println(f"Config as follows: \nUIAddress: $UIAddress, \nyarnserver: $yarnserver")
  13. def main(args: Array[String]=Array("tb1", "3", "\001", "cols", "")): Unit = {
  14. if (args.length < 2) {
  15. println("Usage: LoadingData_from_hdfs <tb_name, parts. sep_line, cols, paths>")
  16. System.err.println("Usage: LoadingData_from_hdfs <tb_name, parts, sep_line, cols, paths>")
  17. System.exit(1)
  18. }
  19. log.warn("開始啦排程")
  20. val tb_name = args(0)
  21. val parts = args(1)
  22. val sep_line = args(2)
  23. val select_col = args(3)
  24. val save_paths = args(4)
  25. val select_cols = select_col.split("#").toSeq
  26. log.warn(s"Loading cols are : \n $select_cols")
  27. val gb_sql = s"DESCRIBE FORMATTED ${tb_name}"
  28. val gb_desc = sql(gb_sql)
  29. val hdfs_address = gb_desc.filter($"col_name".contains("Location")).take(1)(0).getString(1)
  30. println(s"tbname路徑是$hdfs_address")
  31. val hdfs_address_cha = s"$hdfs_address/*/"
  32. val Cs = new DataProcess_base(spark)
  33. val tb_desc = Cs.get_table_desc(tb_name)
  34. val raw_data = Cs.get_hdfs_data(hdfs_address)
  35. val len1 = raw_data.map(item => item.split(sep_line)).first.length
  36. val names = tb_desc.filter(!$"col_name".contains("#")).dropDuplicates(Seq("col_name")).sort("id").select("col_name").take(len1).map(_(0)).toSeq.map(_.toString)
  37. val schema1 = StructType(names.map(fieldName => StructField(fieldName, StringType)))
  38. val rawRDD = raw_data.map(_.split(sep_line).map(_.toString)).map(p => Row(p: _*)).filter(_.length == len1)
  39. val df_data = spark.createDataFrame(rawRDD, schema1)//.filter("custommsgtype = '1'")
  40. val df_desc = select_cols.toDF.join(tb_desc, $"value"===$"col_name", "left")
  41. val df_gb_result = df_data.select(select_cols.map(df_data.col(_)): _*)//.limit(100)
  42. df_gb_result.show(5, false)
  43. println("生成的dataframe,依path列groupby的結果如下")
  44. // val part = parts.toInt
  45. // df_gb_result.repartition(part).write.mode("overwrite").option("header","true").option("sep","#").csv(save_paths)
  46. // log.warn(f"the rows of origin data compare to mysql results is $ncounts1%-1d VS $ncounts3%-4d")
  47. // spark.stop()
  48. }
  49. }
  1. val cols = "area_name#city_name#province_name"
  2. val tb_name = "tb1"
  3. val sep_line = "\u0001"
  4. // 執行指令碼
  5. LoadingData_from_hdfs_base.main(Array(tb_name, "4", sep_line, cols, ""))