1. 程式人生 > >SparkSQL建立RDD:建立DataFrame的方式,配置Spark on Hive【文字說明+關鍵程式碼】

SparkSQL建立RDD:建立DataFrame的方式,配置Spark on Hive【文字說明+關鍵程式碼】

建立DataFrame的方式 

建立DataFrame的方式
		1).讀取json格式的檔案
			a).json檔案不能巢狀
			b).讀取的兩種方式:
				DataFrame df = sqlContext.read().format("json").load("./sparksql/json");
				DataFrame df2 = sqlContext.read().json("sparksql/json");
			c).載入過來的DataFrame 列會按照Ascii碼排序
			d).可以使用DataFrame的API操作DataFrame,也可以將DataFrame註冊成臨時表
				df.registerTempTable("jtable");
			
		2).讀取json格式的RDD
		
		3).讀取普通的RDD載入成DataFrame
			a).反射的方式(少)	
				JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {

					private static final long serialVersionUID = 1L;

					@Override
					public Person call(String line) throws Exception {
						Person p = new Person();
						p.setId(line.split(",")[0]);
						p.setName(line.split(",")[1]);
						p.setAge(Integer.valueOf(line.split(",")[2]));
						return p;
					}
				});
				DataFrame df = sqlContext.createDataFrame(personRDD, Person.class);
				i).自定類要實現序列化介面
				ii).自定義類的訪問級別是public
				iii).載入過來的DataFrame列也會按照Ascii碼排序
			b).動態建立Schema(多)
				List<StructField> asList =Arrays.asList(
					DataTypes.createStructField("id", DataTypes.StringType, true),
					DataTypes.createStructField("name", DataTypes.StringType, true),
					DataTypes.createStructField("age", DataTypes.IntegerType, true)
				);
				
				StructType schema = DataTypes.createStructType(asList);
				
				DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
				
				i).載入過來的DataFrame列不會按照Ascii碼排序
		
		4).讀取parquent檔案載入成DataFrame
			讀取:
				DataFrame load = sqlContext.read().format("parquet").load("./sparksql/parquet");
				load = sqlContext.read().parquet("./sparksql/parquet");
			儲存:
				df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet");
				df.write().mode(SaveMode.Ignore).parquet("./sparksql/parquet");
			
		5).讀取Mysql中的資料載入成DataFrame
			讀取:
				a).
					Map<String, String> options = new HashMap<String,String>();
					options.put("url", "jdbc:mysql://192.168.179.4:3306/spark");
					options.put("driver", "com.mysql.jdbc.Driver");
					options.put("user", "root");
					options.put("password", "123456");
					options.put("dbtable", "person");
					
					DataFrame person = sqlContext.read().format("jdbc").options(options).load();
				b).
					DataFrameReader reader = sqlContext.read().format("jdbc");
					reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark");
					reader.option("driver", "com.mysql.jdbc.Driver");
					reader.option("user", "root");
					reader.option("password", "123456");
					reader.option("dbtable", "score");
					DataFrame score = reader.load();
			儲存:
				Properties properties = new Properties();
				properties.setProperty("user", "root");
				properties.setProperty("password", "123456");
				result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.179.4:3306/spark", "result", properties);
				
		6).讀取Hive中的資料載入成DataFrame
			要配置Spark on Hive,如果SparkSQL要讀取資料是Hive中資料,要使用HiveContext,HiveContext是SQLContext的子類。
			讀取:
				HiveContext hiveContext = new HiveContext(sc);
				hiveContext.sql("USE spark");
				DataFrame df = hiveContext.table("good_student_infos");
			儲存:
				hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");
				goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");

配置spark on Hive:

5.配置Spark on Hive
		1).在客戶端建立../conf/hive-site.xml
			<configuration>
			   <property>
					<name>hive.metastore.uris</name>
					<value>thrift://node1:9083</value>
			   </property>
			</configuration>
		2).啟動Hive,在服務端啟動metaStore服務,hive --service metastore
		3).spark-shell 測試

謝謝你的鼓勵,繼續加油。