1. 程式人生 > >第59課:使用Java和Scala在IDE中實戰RDD和DataFrame轉換操作’學習筆記

第59課:使用Java和Scala在IDE中實戰RDD和DataFrame轉換操作’學習筆記

第59課:使用JavaScalaIDE中實戰RDDDataFrame轉換操作’學習筆記

本期內容:

1  RDDDataFrame轉換的重大意義

2  使用Java實戰RDDDaraFrame轉換

3  使用Scala實戰RDDDataFrame轉換

一.  RDDDataFrame轉換的重大意義

SparkRDD可以直接轉換成DataFrameSparkCore的核心是RDD,所有的排程都是基於RDD完成的,對RDD的操作都可以轉換成基於DataFrame使用SparkSQL來操作。RDD可能接上資料庫,接上NoSQL,其他檔案系統等各種資料來源,然後將資料轉換為DataFrame

極大簡化了大資料的開發,原來寫Scala\Java,現在只需要寫SparkSQL

同時對DataFrame的操作又可以轉換成RDD,基於DataFrame對資料進行SQL或機器學習等操作後又可以轉換為RDD,這對於儲存資料、格式化非常方便。

RDDDataFrame有兩種方式:

1.通過反射,推斷RDD元素中的元資料。

RDD中的資料本身是沒有元資料的,例如一個Person的資訊裡有id/name/ageRDDRecord不知道id/name/age這些資訊,但如果變成DataFrame的話,DataFrame必須知道這些資訊。如何在RDDDataFrame轉換時擁有這些元資料資訊呢?最簡單的就是通過反射。

Scala中就是Case Class對映。寫一個Case Class,描述RDD中不同列的元資料是什麼。

Java中就是通過JavaBean

Scalacase class對映。

JavaBean(但不能支援巢狀的JavaBean,也不能有List/Map等複雜的資料結構。只能用簡單的資料型別:String/Int等。Scala就沒有這些限制)

使用反射的前提:已經知道元資料資訊了(靜態的)。但有些場景下只有在執行時才能知道元資料資訊(動態的)

2. 建立DataFrame時事先不知道元資料資訊,只能在執行時動態構建元資料。然後再把這些元資料資訊應用於RDD上。這種情況是比較常見的情況,即動態獲取

Schema

class Person{

private int id;

private String name;

private int age;

}

點選右鍵,選擇source -> Generate Getters and Setters

 

 

選擇ageidname後點擊OK

 

即可自動生成gettersetter程式碼:

class Person{

private int id;

private String name;

private int age;

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public int getAge() {

return age;

}

public void setAge(int age) {

this.age = age;

}

}

資料:

D:\DT-IMF\testdata目錄下建立persons.txt檔案,內容如下:

1,Spark,7

2,Hadoop,11

3,Flink,5

下面是實戰程式碼:

package SparkSQLByJava;

//使用反射的方式將RDD轉換成為DataFrame

import java.util.List;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.DataFrame;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SQLContext;

public class RDD2DataFrameByReflection {

public static void main(String[] args) {

//建立SparkConf物件

SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection");

//建立SparkContext物件

JavaSparkContext sc = new JavaSparkContext(conf);

//建立SQLContext上下文物件用於SQL分析

SQLContext sqlContext = new SQLContext(sc);

//建立RDD,讀取textFile

JavaRDD<String> lines = sc.textFile("D://DT-IMF//testdata//persons.txt");

JavaRDD<Person> persons = lines.map(new Function<String, Person>() {

@Override

public Person call(String line) throws Exception {

String[] splited = line.split(",");

Person p = new Person();

p.setId(Integer.valueOf(splited[0].trim()));

p.setName(splited[1]);

p.setAge(Integer.valueOf(splited[2].trim()));

return p;

}

});

/*

*reateDataFrame方法來自於sqlContext,有兩個引數,第一個是RDD,這裡就是lines.map之後的persons

*這個RDD裡的型別是person,即每條記錄都是person,person其實是有id,name,age的,

*JavaRDD本身並不知道id,name,age資訊,所以要建立DataFrame,DataFrame需要知道id,name,age資訊,

*DataFrame怎麼知道的呢?這裡用createDataFrame時傳入兩個引數,第一個的RDD本身,第二個引數是

*對RDD中每條資料的元資料的描述,這裡就是java bean class,即person.class

*實際上工作原理是:person.class傳入時本身會用反射的方式建立DataFrame,

*在底層通過反射的方式獲得Person的所有fields,結合RDD本身,就生成了DataFrame

*/

DataFrame df = sqlContext.createDataFrame(persons, Person.class);

//將DataFrame變成一個TempTable。

df.registerTempTable("persons");

//在記憶體中就會生成一個persons的表,在這張臨時表上就可以寫SQL語句了。

DataFrame bigDatas = sqlContext.sql("select * from persons where age >= 6");

//轉過來就可以把查詢後的結果變成 RDD。返回的是JavaRDD<Row>

//注意:這裡需要匯入org.apache.spark.sql.Row

JavaRDD<Row> bigDataRDD = bigDatas.javaRDD();

//再對RDD進行map操作。元素是一行一行的資料(SQL的Row),結果是Person,再次還原成Person。

//這裡返回的是具體的每條RDD的元素。

JavaRDD<Person> result = bigDataRDD.map(new Function<Row, Person>() {

@Override

public Person call(Row row) throws Exception {

Person p = new Person();

//p.setId(row.getInt(0));

//p.setName(row.getString(1));

//p.setAge(row.getInt(2));

//資料讀進來時第一列是id,第二列是name,第三列是age,生成的RDD也是這個順序,

//變成DataFrame後,DataFrame有自己的優化引擎,優化(資料結構優化等)之後再進行處理,

//處理後再變成RDD時就不能保證第一列是id,第二列是name,第三列是age了。

//原因是DataFrame對資料進行了排序。

p.setId(row.getInt(1));

p.setName(row.getString(2));

p.setAge(row.getInt(0));

return p;

}

});

List<Person> personList = result.collect();

for(Person p : personList){

System.out.println(p);

}

}

}

package SparkSQLByJava;

import java.io.Serializable;

public class Person implements Serializable {

/**

 *

 */

private static final long serialVersionUID = 1L;

public int id;

public String name;

public int age;

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public int getAge() {

return age;

}

public void setAge(int age) {

this.age = age;

}

@Override

public String toString() {

return "Person [id=" + id + ", name=" + name + ", age=" + age + "]";

}

}

下面是在eclipse中執行的console資訊:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

16/03/29 01:24:54 INFO SparkContext: Running Spark version 1.6.0

16/03/29 01:24:57 INFO SecurityManager: Changing view acls to: think

16/03/29 01:24:57 INFO SecurityManager: Changing modify acls to: think

16/03/29 01:24:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(think); users with modify permissions: Set(think)

16/03/29 01:25:00 INFO Utils: Successfully started service 'sparkDriver' on port 56818.

16/03/29 01:25:01 INFO Slf4jLogger: Slf4jLogger started

16/03/29 01:25:02 INFO Remoting: Starting remoting

16/03/29 01:25:02 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:56831]

16/03/29 01:25:02 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 56831.

16/03/29 01:25:02 INFO SparkEnv: Registering MapOutputTracker

16/03/29 01:25:03 INFO SparkEnv: Registering BlockManagerMaster

16/03/29 01:25:03 INFO DiskBlockManager: Created local directory at C:\Users\think\AppData\Local\Temp\blockmgr-2307f4ac-3fca-4e65-ad95-99802c35dffc

16/03/29 01:25:03 INFO MemoryStore: MemoryStore started with capacity 1773.8 MB

16/03/29 01:25:03 INFO SparkEnv: Registering OutputCommitCoordinator

16/03/29 01:25:04 INFO Utils: Successfully started service 'SparkUI' on port 4040.

16/03/29 01:25:04 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040

16/03/29 01:25:05 INFO Executor: Starting executor ID driver on host localhost

16/03/29 01:25:06 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56838.

16/03/29 01:25:06 INFO NettyBlockTransferService: Server created on 56838

16/03/29 01:25:06 INFO BlockManagerMaster: Trying to register BlockManager

16/03/29 01:25:06 INFO BlockManagerMasterEndpoint: Registering block manager localhost:56838 with 1773.8 MB RAM, BlockManagerId(driver, localhost, 56838)

16/03/29 01:25:06 INFO BlockManagerMaster: Registered BlockManager

16/03/29 01:25:10 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 127.4 KB)

16/03/29 01:25:10 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 141.3 KB)

16/03/29 01:25:10 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:56838 (size: 13.9 KB, free: 1773.7 MB)

16/03/29 01:25:10 INFO SparkContext: Created broadcast 0 from textFile at RDD2DataFrameByReflection.java:24

16/03/29 01:25:17 WARN : Your hostname, think-PC resolves to a loopback/non-reachable address: fe80:0:0:0:d401:a5b5:2103:6d13%eth8, but we couldn't find any external IP address!

16/03/29 01:25:18 INFO FileInputFormat: Total input paths to process : 1

16/03/29 01:25:18 INFO SparkContext: Starting job: collect at RDD2DataFrameByReflection.java:77

16/03/29 01:25:19 INFO DAGScheduler: Got job 0 (collect at RDD2DataFrameByReflection.java:77) with 1 output partitions

16/03/29 01:25:19 INFO DAGScheduler: Final stage: ResultStage 0 (collect at RDD2DataFrameByReflection.java:77)

16/03/29 01:25:19 INFO DAGScheduler: Parents of final stage: List()

16/03/29 01:25:19 INFO DAGScheduler: Missing parents: List()

16/03/29 01:25:19 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[7] at map at RDD2DataFrameByReflection.java:58), which has no missing parents

16/03/29 01:25:19 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 8.8 KB, free 150.1 KB)

16/03/29 01:25:19 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.6 KB, free 154.6 KB)

16/03/29 01:25:19 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:56838 (size: 4.6 KB, free: 1773.7 MB)

16/03/29 01:25:19 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006

16/03/29 01:25:19 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[7] at map at RDD2DataFrameByReflection.java:58)

16/03/29 01:25:19 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

16/03/29 01:25:19 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2138 bytes)

16/03/29 01:25:19 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

16/03/29 01:25:19 INFO HadoopRDD: Input split: file:/D:/DT-IMF/testdata/persons.txt:0+33

16/03/29 01:25:19 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

16/03/29 01:25:19 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

16/03/29 01:25:19 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

16/03/29 01:25:19 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

16/03/29 01:25:19 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

16/03/29 01:25:20 INFO GeneratePredicate: Code generated in 479.023467 ms

16/03/29 01:25:20 INFO GenerateUnsafeProjection: Code generated in 97.650998 ms

16/03/29 01:25:20 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2443 bytes result sent to driver

16/03/29 01:25:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1507 ms on localhost (1/1)

16/03/29 01:25:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

16/03/29 01:25:20 INFO DAGScheduler: ResultStage 0 (collect at RDD2DataFrameByReflection.java:77) finished in 1.573 s

16/03/29 01:25:20 INFO DAGScheduler: Job 0 finished: collect at RDD2DataFrameByReflection.java:77, took 1.890426 s

Person [id=1, name=Spark, age=7]

Person [id=2, name=Hadoop, age=11]

16/03/29 01:25:20 INFO SparkContext: Invoking stop() from shutdown hook

16/03/29 01:25:21 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040

16/03/29 01:25:21 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

16/03/29 01:25:21 INFO MemoryStore: MemoryStore cleared

16/03/29 01:25:21 INFO BlockManager: BlockManager stopped

16/03/29 01:25:21 INFO BlockManagerMaster: BlockManagerMaster stopped

16/03/29 01:25:21 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

16/03/29 01:25:21 INFO SparkContext: Successfully stopped SparkContext

16/03/29 01:25:21 INFO ShutdownHookManager: Shutdown hook called

16/03/29 01:25:21 INFO ShutdownHookManager: Deleting directory C:\Users\think\AppData\Local\Temp\spark-481db032-91d6-4ced-a94c-b38dd0b9033c

16/03/29 01:25:21 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

16/03/29 01:25:21 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

執行時報錯解決1:

16/03/29 00:33:31 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)

java.lang.IllegalAccessException: Class org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1 can not access a member of class SparkSQLByJava.Person with modifiers "public"

這是許可權的問題,說明反射時需要類為public。

執行時報錯解決2:

16/03/29 00:53:43 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)

java.io.NotSerializableException: SparkSQLByJava.Person

Serialization stack:

- object not serializable (class: SparkSQLByJava.Person, value: [email protected])

- element of array (index: 0)

- array (class [Ljava.lang.Object;, size 2)

原因是Person沒有序列化,需要改成public class Person implements Serializable

以上內容是王家林老師DT大資料夢工廠《 IMF傳奇行動》第59課的學習筆記。
王家林老師是Spark、Flink、DockerAndroid技術中國區佈道師。Spark亞太研究院院長和首席專家,DT大資料夢工廠創始人,Android軟硬整合原始碼級專家,英語發音魔術師,健身狂熱愛好者。

微信公眾賬號:DT_Spark

電話:18610086859

QQ:1740415547

微訊號:18610086859  

新浪微博:ilovepains

相關推薦

59使用JavaScala在IDE實戰RDDDataFrame轉換操作學習筆記

第59課:使用Java和Scala在IDE中實戰RDD和DataFrame轉換操作’學習筆記 本期內容: 1  RDD與DataFrame轉換的重大意義 2  使用Java實戰RDD與DaraFrame轉換 3  使用Scala實戰RDD與DataFrame轉換 一.  

59使用JavaScala在IDE實戰RDDDataFrame轉換操作

內容:     1.RDD與DataFrame轉換的重大意義     2.使用Java實戰RDD與DataFrame轉換     3.使用Scala實戰RDD與dataFrame轉換 一.  RDD與DataFrame轉換的重大意義     1.在Spark中RDD可以

java開發hdfs

node 執行 需要 public conf iss import lean logging (1)關於hdfs小結 hadoop由hdfs + yarn + map/reduce組成, hdfs是數據庫存儲模塊,主要由1臺namenode和n臺datanode組成的一個集

hadoopjava開發Map/Reduce

pla tool @override val code 項目 ssp ava ram 配置系統環境變量HADOOP_HOME,指向hadoop安裝目錄(如果你不想招惹不必要的麻煩,不要在目錄中包含空格或者中文字符)把HADOOP_HOME/bin加到PATH環境變量(非必要

6Java Spring Boot 2.0實戰MyBatis與優化(Java面試題)

《阿里巴巴Java Spring Boot 2.0開發實戰課程》06課本期分享專家:徐雷—阿里巴巴特邀Java講師,MongoDB講師 本期分享主題:Spring Boot2.0實戰MyBatis與優化 (Java面試題)Java Spring Boot 2.0是最新的開發平臺,Mybatis是高效能ORM

7Java Spring Boot 2.0安全機制、漏洞與MVC身份驗證實戰

《阿里巴巴Java Spring Boot 2.0開發實戰課程》07課本期分享專家:徐雷—阿里巴巴特邀Java講師,MongoDB講師 本期分享主題:Java Spring Boot2.0實戰MyBatis與優化 (Java面試題)Java Spring Boot 2.0是最新的開發平臺,深入介紹Sprin

Android學習Java程式碼實現XML佈局

權重屬性layout_height 在佈局控制元件中(如LinearLayout),子控制元件可以根據權重值(預設為0)來分配所佔據的空間,這需要結合layout_width或layout_height的值進行分類: 1)如果子控制元件layout_weight都為“ma

35節Java面向物件的多執行緒

Java面向物件中的多執行緒 多執行緒 在Java面向物件中的多執行緒中,要理解多執行緒的知識點,首先要掌握什麼是程序,什麼是執行緒?為什麼有多執行緒呢?多執行緒存在的意義有什麼什麼呢?執行緒的建立方式又有哪些?以及要理解多執行緒的特點等。

Scala實戰高手****16Scala implicits程式設計徹底實戰及Spark原始碼鑑賞

隱式轉換:當某個類沒有具體的方法時,可以在該類的伴生物件或上下文中查詢是否存在隱式轉換,將其轉換為可以呼叫該方法的類,通過程式碼簡單的描述下 一:隱式轉換 1、定義類Man class Man(val name: String) 2、定義類SuperMan,並在類中定義一個方法 class Supe

73Spark SQL Thrift Server實戰

內容:     1.SparkSQL Thrift解析與測試     2.SparkSQL Thrift Server JDBC程式設計 一、SparkSQL Thrift解析與測試     ThriftServer是一個JDBC/ODBC介面,使用者可以通過JDBC/

16Scala implicits程式設計徹底實戰及Spark原始碼鑑賞

本節課主要講的內容: 1、函式隱式轉換 2、隱式引數 3、隱式類 4、隱式物件 本節課搜狐視訊地址:http://my.tv.sohu.com/us/299637343/84785657.shtml隱式轉換:當某個類沒有具體的方法時,可以在該類的伴生物件或上下文中查詢是否存在隱式轉換,將其轉換為可以呼叫該方法

通過配置文件獲取對象(Spring框架的IOCDI的底層就是基於這樣的機制)

ted const dex generate stat clas name 必須 nbsp 首先在D盤創建一個文件hero.txt,內容為:com.hero.Hero(此處必須是Hero的完整路徑) 接下來是Hero類 package com.hero; publi

C#程式設計基礎C#的運算子分支語句

知識點:運算子、if結構、if-else語句 一、運算子 運算子用於執行程式程式碼運算,會針對一個以上運算元專案來進行運算。例如:2+3,其運算元是2和3,而運算子則是“+”。在vb2005中運算子大致可以分為5種類型:算術運算子、連線運算子、關係運算符、賦值運算子和邏輯運算子。。

C#程式設計基礎C#三元運算子的初級使用巢狀

知識點:三元運算子的使用。 1、三元運算子 三元運算子的初級使用: 符號: ?: 舉例:int c=bool ? a : b 當bool=true,c=表示式a,當bool=false,c=表示式b。 三元運算子?:是 if~else 語句的簡寫形式 書寫格式

67Spark SQL下采用JavaScala實現Join的案例綜合實戰(鞏固前面學習的Spark SQL知識)

內容:     1.SparkSQL案例分析     2.SparkSQL下采用Java和Scala實現案例 一、SparkSQL下采用Java和Scala實現案例 學生成績: {"name":"Michael","score":98} {"name":"Andy"

九章Java的final的使用Java的super的使用

final關鍵字 使用final關鍵字做標識有“最終的”含義 final可以修飾類、方法、屬性和變數 final修飾類的時候,則該類不允許被繼承 final修飾方法,則該方法不允許被覆蓋 final修飾,則該類的屬性不會進行隱式的初始化(類的初始化屬性必須有值)或在構造方法中賦值(但只能

91SparkStreaming基於Kafka Direct案例實戰內幕原始碼解密 java.lang.ClassNotFoundException 踩坑解決問題詳細內幕版本

第91課:SparkStreaming基於Kafka Direct案例實戰和內幕原始碼解密    /* * *王家林老師授課http://weibo.com/ilovepains */  每天晚上20:00YY頻道現場授課頻道68917580 1、作業內容:SparkS

四天JAVA的迴圈語句詳解,常用例子

1:switch語句(掌握)(1)格式:switch(表示式) {case 值1:語句體1;break;case 值2:語句體2;break;...default:語句體n+1;break;}格式解釋說明:switch:說明這是switch語句。表示式:可以是byte,short,int,charJDK5以後

編寫高質量程式碼改善Java程式的151個建議(1章Java開發通用的方法準則___建議11~15)

    序列化Serializable是Java提供的通用資料儲存和讀取的介面。任何類只要實現了Serializable介面,就可以被儲存到檔案中,或者作為資料流通過網路傳送到別的地方。 package OSChina.Serializable; import java.io.

爬蟲互聯網網頁的解析

iso 來看 指向 應該 pri tro conn 路徑 獲取 基本步驟 這節課們們的目的就是使用Requests模塊+BeautifulSoup模塊爬取網站上的信息 首先爬取一個網站主要分兩步 1、第一步我們要了解服務器與本地交換機制,選擇正確的辦法我們才能獲取正確的信息