RDD轉換成DataFrame的2種方式
DataFrame 與 RDD 的互動
Spark SQL它支援兩種不同的方式轉換已經存在的RDD到DataFrame
方法一
第一種方式是使用反射的方式,用反射去推倒出來RDD裡面的schema。這個方式簡單,但是不建議使用,因為在工作當中,使用這種方式是有限制的。
對於以前的版本來說,case class最多支援22個欄位如果超過了22個欄位,我們就必須要自己開發一個類,實現product接口才行。因此這種方式雖然簡單,但是不通用;因為生產中的欄位是非常非常多的,是不可能只有20來個欄位的。
示例:
/**
* convert rdd to dataframe 1
* @param spark
*/
private def runInferSchemaExample(spark:SparkSession): Unit ={
import spark.implicits._
val rdd = spark.sparkContext.textFile("E:/大資料/data/people.txt")
val df = rdd.map(_.split(","))
.map(x => People(x(0), x(1).trim.toInt)) //將rdd的每一行都轉換成了一個people
.toDF //必須先匯入import spark.implicits._ 不然這個方法會報錯
df.show()
df.createOrReplaceTempView("people")
// 這個DF包含了兩個欄位name和age
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// teenager(0)代表第一個欄位
// 取值的第一種方式:index from zero
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// 取值的第二種方式:byName
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name") + "," + teenager.getAs[Int]("age")).show()
}
// 注意:case class必須定義在main方法之外;否則會報錯
case class People(name:String, age:Int)
方法二
建立一個DataFrame,使用程式設計的方式 這個方式用的非常多。通過程式設計方式指定schema ,對於第一種方式的schema其實定義在了case class裡面了。
官網解讀:
當我們的case class不能提前定義(因為業務處理的過程當中,你的欄位可能是在變化的),因此使用case class很難去提前定義。
使用該方式建立DF的三大步驟:
- Create an RDD of Rows from the original RDD;
- Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
- Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
示例:
/**
* convert rdd to dataframe 2
* @param spark
*/
private def runProgrammaticSchemaExample(spark:SparkSession): Unit ={
// 1.轉成RDD
val rdd = spark.sparkContext.textFile("E:/大資料/data/people.txt")
// 2.定義schema,帶有StructType的
// 定義schema資訊
val schemaString = "name age"
// 對schema資訊按空格進行分割
// 最終fileds裡包含了2個StructField
val fields = schemaString.split(" ")
// 欄位型別,欄位名稱判斷是不是為空
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// 3.把我們的schema資訊作用到RDD上
// 這個RDD裡面包含了一些行
// 形成Row型別的RDD
val rowRDD = rdd.map(_.split(","))
.map(x => Row(x(0), x(1).trim))
// 通過SparkSession建立一個DataFrame
// 傳進來一個rowRDD和schema,將schema作用到rowRDD上
val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF.show()
}
【擴充套件】生產上建立DataFrame的程式碼舉例
在實際生產環境中,我們其實選擇的是方式二這種進行建立DataFrame的,這裡將展示部分程式碼:
Schema的定義
object AccessConvertUtil {
val struct = StructType(
Array(
StructField("url",StringType),
StructField("cmsType",StringType),
StructField("cmsId",LongType),
StructField("traffic",LongType),
StructField("ip",StringType),
StructField("city",StringType),
StructField("time",StringType),
StructField("day",StringType)
)
)
/**
* 根據輸入的每一行資訊轉換成輸出的樣式
*/
def parseLog(log:String) = {
try {
val splits = log.split("\t")
val url = splits(1)
val traffic = splits(2).toLong
val ip = splits(3)
val domain = "http://www.imooc.com/"
val cms = url.substring(url.indexOf(domain) + domain.length)
val cmsTypeId = cms.split("/")
var cmsType = ""
var cmsId = 0l
if (cmsTypeId.length > 1) {
cmsType = cmsTypeId(0)
cmsId = cmsTypeId(1).toLong
}
val city = IpUtils.getCity(ip)
val time = splits(0)
val day = time.substring(0,10).replace("-","")
//這個Row裡面的欄位要和struct中的欄位對應上
Row(url, cmsType, cmsId, traffic, ip, city, time, day)
} catch {
case e: Exception => Row(0)
}
}
}
建立DataFrame
object SparkStatCleanJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SparkStatCleanJob")
.master("local[2]").getOrCreate()
val accessRDD = spark.sparkContext.textFile("/Users/lemon/project/data/access.log")
//accessRDD.take(10).foreach(println)
//RDD ==> DF,建立生成DataFrame
val accessDF = spark.createDataFrame(accessRDD.map(x => AccessConvertUtil.parseLog(x)),
AccessConvertUtil.struct)
accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite)
.partitionBy("day").save("/Users/lemon/project/clean")
spark.stop()
}
}
相關推薦
Spark中RDD轉換成DataFrame的兩種方式(分別用Java和scala實現)
一:準備資料來源 在專案下新建一個student.txt檔案,裡面的內容為: print? <code class="language-java">1,zhangsan,20 2,lisi,21 3,wanger,1
RDD轉換成DataFrame的2種方式
DataFrame 與 RDD 的互動 Spark SQL它支援兩種不同的方式轉換已經存在的RDD到DataFrame 方法一 第一種方式是使用反射的方式,用反射去推倒出來RDD裡面的schema。這個方式簡單,但是不建議使用,因為在工作當中,使用
C 物件與JSON字串互相轉換的幾種方式
分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!  
Json字串與Object物件相互轉換的幾種方式
Json-Lib、Org.Json、Jackson、Gson、FastJson五種方式轉換json型別 只列舉了最省事的方式。不涉及複製情況和速度。 測試用例,一個User類,屬性name,age,location。重寫toString()。 public class User {
java中byte陣列與int型別的轉換(兩種方式)
java中byte陣列與int型別的轉換,在網路程式設計中這個演算法是最基本的演算法,我們都知道,在socket傳輸中,傳送、者接收的資料都是 byte陣列,但是int型別是4個byte組成的,如何把一個整形int轉換成byte陣列,同時如何把一個長度為4的byte陣列轉
配置sparksql讀hive,dataframe和RDD,將RDD轉換成Dataframe,檢視,withcolumn
文章目錄 退出spark-shell 使用spark自帶檔案建立dataframe 退出安全模式 配置spark讀hive 1.pom檔案增加 2.resource下加檔案 3.修改h
Spark RDD轉換成其他資料結構
在Spark推薦系統程式設計中,一般都是通過檔案載入成RDD: //在這裡預設 (userId, itemId, preference) val fields = sparkContext.textFile("").split("\t").map{ fie
時間戳與時間的相互轉換的幾種方式
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-
php 資料型別轉換強制轉換的三種方式
(int)、(integer):轉換成整形 (float)、(double)、(real):轉換成浮點型 (string):轉換成字串 (bool)、(boolean):轉換成布林型別 (arr
String轉換Long兩種方式
Long.ValueOf("String")與Long.parseLong("String")的區別 Long.ValueOf("String")返回Long包裝型別 Long.parseLong("
sparksql 動態設置schema將rdd轉換成dataset/dataframe
cde exce session types creat unit nes HERE ext java 1 public class DynamicDemo { 2 private static SparkConf conf = new SparkConf()
Android監聽APP前後臺轉換的兩種方式
應用場景:很多時候我們需要去監聽使用者有沒有將APPs放置在後臺,比如常用的銀行類APP、音視訊播放類APP,那麼這些APP為什麼要去監聽應用程式在不在前臺?銀行當然是為了安全,必須保證一些資料的傳輸一定是在前臺,音視訊播放APP呢,是為了更好的使用者體驗,使用者按下home
C 對象與JSON字符串互相轉換的幾種方式
pty reader font clear time nor one returns type 隨著 Rest 風格 API 的應用越來越廣泛,對象與JSON字符串互相轉換的功能也用的越來越多。這裏介紹三種方法,期中兩種為DotNet Framework .NET 3.
JavaScript:將類陣列轉換成陣列的幾種方式
首先說說什麼是類陣列,類陣列有幾個組成部分: 屬性要為索引(數字)。 必須有length屬性, 最好加上push和splice方法 對於一個普通的物件來說,如果它的所有property名均為正整數,同時也有相應的length屬性,那麼雖然該物件並不是由A
2018-12-14轉換成一天的開始和一天的結束 三種方式
三種方式 String start=cond.getCreatedTimeStart(); String end=cond.getCreatedTimeEnd(); String stratTime=start+" "+"00:00:00"; String
詳解Go開發Struct轉換成map兩種方式比較
詳解Go開發Struct轉換成map兩種方式比較 本篇文章主要介紹了詳解Go開發Struct轉換成map兩種方式比較,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧 最近做Go開發的時候接觸到了一個新的orm第
json轉換成time的兩種方式
//yyyy-MM-dd function JsonToDate(jsondate) { var date = new Date(parseInt(jsondate.replace("/Date(", "").replace(")/", ""), 10));
powershell 將文字轉換成表格的另一種方式
$text=" 1 夢幻西遊 216406 2014-01-21 資料片 2 爐石傳說 15905 2014-01-24 公測 3 新大話西遊 214465 2002-08-01 公測 4 問道 11
js中將字串轉換成json的三種方式
ECMA-262(E3) 中沒有將JSON概念寫到標準中,還好在中JSON的概念被正式引入了,包括全域性的JSON物件和Date的toJSON方法。 1,eval方式解析,恐怕這是最早的解析方式了。如下: 複製程式碼 程式碼如下: function strToJson(str){ var json =
將網頁轉換成PDF檔案的N種方式
有時候不知道為什麼,有些人總會想要把看到的各種東西弄成其他格式。這完全是句廢話,寫這句廢話是因為我突發奇想來到久違的網咖上網,腦子暫時還有點混亂。我來網咖是為了解決一點個人問題,現在問題解決完了或者無法解決了,無聊之中就只好來更新一下部落格了。 現在腦子清醒過來,我知道,