1. 程式人生 > >RDD轉換成DataFrame的2種方式

RDD轉換成DataFrame的2種方式

DataFrame 與 RDD 的互動

Spark SQL它支援兩種不同的方式轉換已經存在的RDD到DataFrame

方法一

第一種方式是使用反射的方式,用反射去推倒出來RDD裡面的schema。這個方式簡單,但是不建議使用,因為在工作當中,使用這種方式是有限制的
對於以前的版本來說,case class最多支援22個欄位如果超過了22個欄位,我們就必須要自己開發一個類,實現product接口才行。因此這種方式雖然簡單,但是不通用;因為生產中的欄位是非常非常多的,是不可能只有20來個欄位的。
示例:

/**
  * convert rdd to dataframe 1
  * @param
spark */
private def runInferSchemaExample(spark:SparkSession): Unit ={ import spark.implicits._ val rdd = spark.sparkContext.textFile("E:/大資料/data/people.txt") val df = rdd.map(_.split(",")) .map(x => People(x(0), x(1).trim.toInt)) //將rdd的每一行都轉換成了一個people .toDF //必須先匯入import spark.implicits._ 不然這個方法會報錯
df.show() df.createOrReplaceTempView("people") // 這個DF包含了兩個欄位name和age val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // teenager(0)代表第一個欄位 // 取值的第一種方式:index from zero teenagersDF.map(teenager => "Name: " + teenager(0)).show() // 取值的第二種方式:byName teenagersDF.map(teenager => "Name: "
+ teenager.getAs[String]("name") + "," + teenager.getAs[Int]("age")).show() } // 注意:case class必須定義在main方法之外;否則會報錯 case class People(name:String, age:Int)

方法二

建立一個DataFrame,使用程式設計的方式 這個方式用的非常多。通過程式設計方式指定schema ,對於第一種方式的schema其實定義在了case class裡面了。
官網解讀:
當我們的case class不能提前定義(因為業務處理的過程當中,你的欄位可能是在變化的),因此使用case class很難去提前定義。
使用該方式建立DF的三大步驟:

  1. Create an RDD of Rows from the original RDD;
  2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
  3. Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.

示例:

/**
  * convert rdd to dataframe 2
  * @param spark
  */
private def runProgrammaticSchemaExample(spark:SparkSession): Unit ={
  // 1.轉成RDD
  val rdd = spark.sparkContext.textFile("E:/大資料/data/people.txt")

  // 2.定義schema,帶有StructType的
  // 定義schema資訊
  val schemaString = "name age"
  // 對schema資訊按空格進行分割
  // 最終fileds裡包含了2個StructField
  val fields = schemaString.split(" ")
                            // 欄位型別,欄位名稱判斷是不是為空
                           .map(fieldName => StructField(fieldName, StringType, nullable = true))
  val schema = StructType(fields)

  // 3.把我們的schema資訊作用到RDD上
  //   這個RDD裡面包含了一些行
  // 形成Row型別的RDD
  val rowRDD = rdd.map(_.split(","))
                  .map(x => Row(x(0), x(1).trim))
  // 通過SparkSession建立一個DataFrame
  // 傳進來一個rowRDD和schema,將schema作用到rowRDD上
  val peopleDF = spark.createDataFrame(rowRDD, schema)

  peopleDF.show()
}

【擴充套件】生產上建立DataFrame的程式碼舉例

在實際生產環境中,我們其實選擇的是方式二這種進行建立DataFrame的,這裡將展示部分程式碼:

Schema的定義

object AccessConvertUtil {

  val struct = StructType(
    Array(
      StructField("url",StringType),
      StructField("cmsType",StringType),
      StructField("cmsId",LongType),
      StructField("traffic",LongType),
      StructField("ip",StringType),
      StructField("city",StringType),
      StructField("time",StringType),
      StructField("day",StringType)
    )
  )

  /**
    * 根據輸入的每一行資訊轉換成輸出的樣式
    */
  def parseLog(log:String) = {

    try {
      val splits = log.split("\t")

      val url = splits(1)
      val traffic = splits(2).toLong
      val ip = splits(3)

      val domain = "http://www.imooc.com/"
      val cms = url.substring(url.indexOf(domain) + domain.length)
      val cmsTypeId = cms.split("/")

      var cmsType = ""
      var cmsId = 0l
      if (cmsTypeId.length > 1) {
        cmsType = cmsTypeId(0)
        cmsId = cmsTypeId(1).toLong
      }

      val city = IpUtils.getCity(ip)
      val time = splits(0)
      val day = time.substring(0,10).replace("-","")

      //這個Row裡面的欄位要和struct中的欄位對應上
      Row(url, cmsType, cmsId, traffic, ip, city, time, day)
    } catch {
      case e: Exception => Row(0)
    }

  }

}

建立DataFrame

object SparkStatCleanJob {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("SparkStatCleanJob")
      .master("local[2]").getOrCreate()

    val accessRDD = spark.sparkContext.textFile("/Users/lemon/project/data/access.log")

    //accessRDD.take(10).foreach(println)

    //RDD ==> DF,建立生成DataFrame
    val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),
      AccessConvertUtil.struct)

    accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite)
            .partitionBy("day").save("/Users/lemon/project/clean")

    spark.stop()
  }

}

相關推薦

Spark中RDD轉換DataFrame的兩方式(分別用Java和scala實現)

 一:準備資料來源       在專案下新建一個student.txt檔案,裡面的內容為: print? <code class="language-java">1,zhangsan,20   2,lisi,21   3,wanger,1

RDD轉換DataFrame的2方式

DataFrame 與 RDD 的互動 Spark SQL它支援兩種不同的方式轉換已經存在的RDD到DataFrame 方法一 第一種方式是使用反射的方式,用反射去推倒出來RDD裡面的schema。這個方式簡單,但是不建議使用,因為在工作當中,使用

C 物件與JSON字串互相轉換的幾方式

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

Json字串與Object物件相互轉換的幾方式

Json-Lib、Org.Json、Jackson、Gson、FastJson五種方式轉換json型別 只列舉了最省事的方式。不涉及複製情況和速度。 測試用例,一個User類,屬性name,age,location。重寫toString()。 public class User {

java中byte陣列與int型別的轉換(兩方式

java中byte陣列與int型別的轉換,在網路程式設計中這個演算法是最基本的演算法,我們都知道,在socket傳輸中,傳送、者接收的資料都是 byte陣列,但是int型別是4個byte組成的,如何把一個整形int轉換成byte陣列,同時如何把一個長度為4的byte陣列轉

配置sparksql讀hive,dataframe和RDD,將RDD轉換Dataframe,檢視,withcolumn

文章目錄 退出spark-shell 使用spark自帶檔案建立dataframe 退出安全模式 配置spark讀hive 1.pom檔案增加 2.resource下加檔案 3.修改h

Spark RDD轉換其他資料結構

在Spark推薦系統程式設計中,一般都是通過檔案載入成RDD: //在這裡預設 (userId, itemId, preference) val fields = sparkContext.textFile("").split("\t").map{ fie

時間戳與時間的相互轉換的幾方式

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-

php 資料型別轉換強制轉換的三方式

(int)、(integer):轉換成整形 (float)、(double)、(real):轉換成浮點型 (string):轉換成字串 (bool)、(boolean):轉換成布林型別 (arr

String轉換Long兩方式

Long.ValueOf("String")與Long.parseLong("String")的區別 Long.ValueOf("String")返回Long包裝型別 Long.parseLong("

sparksql 動態設置schema將rdd轉換dataset/dataframe

cde exce session types creat unit nes HERE ext java 1 public class DynamicDemo { 2 private static SparkConf conf = new SparkConf()

Android監聽APP前後臺轉換的兩方式

應用場景:很多時候我們需要去監聽使用者有沒有將APPs放置在後臺,比如常用的銀行類APP、音視訊播放類APP,那麼這些APP為什麼要去監聽應用程式在不在前臺?銀行當然是為了安全,必須保證一些資料的傳輸一定是在前臺,音視訊播放APP呢,是為了更好的使用者體驗,使用者按下home

C 對象與JSON字符串互相轉換的幾方式

pty reader font clear time nor one returns type 隨著 Rest 風格 API 的應用越來越廣泛,對象與JSON字符串互相轉換的功能也用的越來越多。這裏介紹三種方法,期中兩種為DotNet Framework .NET 3.

JavaScript:將類陣列轉換陣列的幾方式

首先說說什麼是類陣列,類陣列有幾個組成部分: 屬性要為索引(數字)。 必須有length屬性, 最好加上push和splice方法 對於一個普通的物件來說,如果它的所有property名均為正整數,同時也有相應的length屬性,那麼雖然該物件並不是由A

2018-12-14轉換一天的開始和一天的結束 三方式

三種方式 String start=cond.getCreatedTimeStart(); String end=cond.getCreatedTimeEnd(); String stratTime=start+" "+"00:00:00"; String

詳解Go開發Struct轉換map兩方式比較

詳解Go開發Struct轉換成map兩種方式比較     本篇文章主要介紹了詳解Go開發Struct轉換成map兩種方式比較,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧   最近做Go開發的時候接觸到了一個新的orm第

json轉換time的兩方式

//yyyy-MM-dd function JsonToDate(jsondate) {     var date = new Date(parseInt(jsondate.replace("/Date(", "").replace(")/", ""), 10));    

powershell 將文字轉換表格的另一方式

$text=" 1 夢幻西遊 216406 2014-01-21 資料片 2 爐石傳說 15905 2014-01-24 公測 3 新大話西遊 214465 2002-08-01 公測 4 問道 11

js中將字串轉換json的三方式

ECMA-262(E3) 中沒有將JSON概念寫到標準中,還好在中JSON的概念被正式引入了,包括全域性的JSON物件和Date的toJSON方法。 1,eval方式解析,恐怕這是最早的解析方式了。如下: 複製程式碼 程式碼如下: function strToJson(str){ var json =

將網頁轉換PDF檔案的N方式

有時候不知道為什麼,有些人總會想要把看到的各種東西弄成其他格式。這完全是句廢話,寫這句廢話是因為我突發奇想來到久違的網咖上網,腦子暫時還有點混亂。我來網咖是為了解決一點個人問題,現在問題解決完了或者無法解決了,無聊之中就只好來更新一下部落格了。 現在腦子清醒過來,我知道,