1. 程式人生 > >Spark(六):SparkSQLAndDataFrames對結構化資料集與非結構化資料的處理

Spark(六):SparkSQLAndDataFrames對結構化資料集與非結構化資料的處理

Spark(六):SparkSQLAndDataFrames對結構化資料集與非結構化資料的處理

如上轉載的這篇文章寫得不錯!!!


一:簡單瞭解SparkSQL。

Spark SQL 是結構化的資料處理一個Spark模組。與基本的Spark RDD API不同,Spark SQL 所提供的介面為Spark 提供有關資料和正在執行的計算的結構的詳細資訊。Spark SQL內部使用這些額外的資訊來執行額外的優化。有幾種方法與Spark SQL 包括 SQL、 DataFrames API 和資料集 API 進行互動。計算結果相同的執行引擎在使用時,獨立的 API/語言使用的表達計算。這種統一意味著開發人員很容易可以提供最自然的方式來表達一個給定的轉換基於各種 Api 之間來回切換。

Spark SQL是Spark中的一個模組,主要用於進行結構化資料的處理。它提供的最核心的程式設計抽象,就是DataFrame。同時Spark SQL還可以作為分散式的SQL查詢引擎。Spark SQL最重要的功能之一,就是從Hive中查詢資料。

二:簡單瞭解DataFrame。

DataFrame是一個以命名列方式組織的分散式資料集,等同於關係型資料庫中的一個表,也相當於R/Python中的data frames(但是進行了更多的優化)。DataFrame可以通過很多來源進行構建,包括:結構化的資料檔案,Hive中的表,外部的關係型資料庫,以及RDD。

接下來是對 結構化資料集 與 非結構化資料集 的操作。

三:結構化資料集: 如何把JSON檔案轉化為DataFrame

3.1.在HDFS上放置了兩個JSON檔案,即

people.json, 檔案內容如下:

{"id": "19","name": "berg","sex": "male","age": 19}
{"id": "20","name": "cccc","sex": "female","age": 20}
{"id": "21","name": "xxxx","sex": "male","age": 21}
{"id": "22","name": "jjjj","sex": "female","age": 21}

student.json,檔案內容如下:

{"id": "1","name": "china","sex": "female","age": 100}
{"id": "19","name": "xujun","sex": "male","age": 22}
3.2 通過DataFrame的API來操作資料,熟悉下DataFrame中方法的使用:
public class SparkSqlDemo {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//建立了 sqlContext的上下文,注意,它是DataFrame的起點。
		SQLContext sqlContext = new SQLContext(sc);

		//本地的JSON檔案轉化為DataFrame
		DataFrame df = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/people.json");

		//輸出表結構
		df.printSchema();

		//顯示DataFrame的內容。
		df.show();

		//選擇name
		df.select(df.col("name")).show();

		// 選擇所有年齡大於21歲的人,只保留name欄位
		df.filter(df.col("age").lt(21)).select("name").show();

		// 選擇name,並把age欄位自增 1
		df.select(df.col("name"), df.col("age").plus(1)).show();

		//按年齡分組計數:
		df.groupBy("age").count().show();  // 應該有一條資料記錄為  2 

		//把另個JSON檔案轉化為DataFrame
		DataFrame df2 = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/student.json");

		df2.show();
		
		//表的關聯。
		df.join(df2,df.col("id").equalTo(df2.col("id"))).show();
		
		
		//以程式設計方式執行SQL:
		//把DataFrame物件轉化為一個虛擬的表
		df.registerTempTable("people");
		sqlContext.sql("select age,count(*) from people group by age").show();

		System.out.println(  "-------------" );
		sqlContext.sql("select * from people").show();

	}
}
3.3 以程式設計方式執行 SQL 查詢並返回作為綜合結果,通過登錄檔,操作sql的方式來操作資料:
public class SparkSqlDemo1 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//建立了 sqlContext的上下文,注意,它是DataFrame的起點。
		SQLContext sqlContext = new SQLContext(sc);

		//本地的JSON檔案轉化為DataFrame
		DataFrame df = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/people.json");

		//把另一個JSON檔案轉化為DataFrame
		DataFrame df2 = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/student.json");		

		//以程式設計方式執行SQL:
		//把DataFrame物件轉化為一個虛擬的表
		df.registerTempTable("people");
		df2.registerTempTable("student");

		// 查詢虛擬表 people 中所有資料
		sqlContext.sql("select * from people").show();

		//檢視某個欄位  
		sqlContext.sql("select name from people ").show();

		//檢視多個欄位  
		sqlContext.sql("select name,age+1 from people ").show();  

		//過濾某個欄位的值  
		sqlContext.sql("select name, age from people where age>=21").show();

		//count group 某個欄位的值  
		sqlContext.sql("select age,count(*) countage from people group by age").show();


		//關聯: 內聯 。 
		sqlContext.sql("select * from people inner join student on people.id = student.id ").show();
		/*
	    +---+---+----+----+---+---+-----+----+
		|age| id|name| sex|age| id| name| sex|
		+---+---+----+----+---+---+-----+----+
		| 19| 19|berg|male| 22| 19|xujun|male|
		+---+---+----+----+---+---+-----+----+ 
		 */
	}
}

四:非結構化資料集:

第一種方法使用反射來推斷架構 RDD 包含特定型別的物件。
這種基於反射方法導致更簡潔的程式碼和工程好當您已經知道該Schema編寫Spark應用程式時。

建立 DataFrames 的第二個方法是通過允許您構建一個Schema,然後將它應用於現有 RDD 的程式設計介面。
雖然這種方法更為詳細,它允許您構建 DataFrames 時直到執行時才知道的列和它們的型別。
4.1 非結構化的資料集檔案,user.txt,內容如下:
1,"Hadoop",20
2,"HBase", 21
3,"Zookeeper",22
4,"Hive",23
5,"Spark",24
6,"Berg",22
7,"Xujun",23
4.2 通過 class反射來註冊一張表。
    Spark SQL 支援 JavaBeans RDD 自動轉換分散式資料集。BeanInfo,使用反射來獲取定義表的架構。目前,Spark SQL 不支援包含巢狀的 JavaBeans 或包含複雜的型別,例如列表或陣列。您可以通過建立一個類,實現可序列化並有 getter 和 setter 方法的所有其欄位建立 JavaBean。
public class SparkSqlDemo2 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//建立了 sqlContext的上下文,注意,它是DataFrame的起點。
		SQLContext sqlContext = new SQLContext(sc);

		//把載入的文字檔案 並 每行轉換 JavaBean
		JavaRDD<String> rdd = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/user.txt");

		JavaRDD<User> userRDD = rdd.map( new Function<String, User>() {

			private static final long serialVersionUID = 1L;

			public User call(String line) throws Exception {
				String[] parts = line.split(",");
				User user = new User();
				user.setId(Integer.parseInt(parts[0].trim()));
				user.setName(parts[1].trim());
				user.setAge(Integer.parseInt(parts[2].trim()));
				return user;
			}
		});

		// collect 屬於行動運算元Action 提交作業並觸發運算。
		List<User> list = userRDD.collect();
		for (User user : list) {
			System.out.println(  user );
		}

		//通過 class 反射註冊一張表
		DataFrame df = sqlContext.createDataFrame(userRDD, User.class);
		df.registerTempTable("user");

		DataFrame df1 = sqlContext.sql("SELECT id,name,age FROM user WHERE age >= 21 AND age <= 23");

		// 通過sql 查詢的結果是 DataFrame 即df1 它還是支援 RDD的所有正常操作。
		df1.show();
		
		//並且 結果中的行列可以按序號訪問。
		List<String> listString = df1.javaRDD().map(new Function<Row, String>() {
			
			private static final long serialVersionUID = 1L;

			public String call(Row row) {
				return "Id: " + row.getInt(0) + ", Name: "+row.getString(1) + ", Age: " + row.getInt(2);
			}
		}).collect();
		
		for (String string : listString) {
			System.out.println(  string );
		}
	}
}
4.3 以程式設計方式指定 schema, 通過欄位反射來對映註冊臨時表
在某些情況下不能提前定義 JavaBean 類 (例如,記錄的結構編碼的字串,或將解析文字資料集和領域預計將以不同的方式為不同的使用者),
三個步驟,可以以程式設計方式建立分散式資料集。

1. 從原始 RDD; 建立行 RDD

2. 建立由 StructType 中 RDD 在步驟 1 中建立的行結構相匹配的schema。

3.適用於行 RDD 通過 createDataFrame 方法由 SQLContext 提供的schema。
public class SparkSqlDemo3 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//建立了 sqlContext的上下文,注意,它是DataFrame的起點。
		SQLContext sqlContext = new SQLContext(sc);

		//把載入的文字檔案 並 每行轉換 JavaBean
		JavaRDD<String> rdd = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/user.txt");

		// schema 以字串形式編碼
		String schemaString = "id name age";

		// 基於 字串的schema生成 schema。
		List<StructField> fields = new ArrayList<StructField>();
		
		String[] str = schemaString.split(" ");
		fields.add(DataTypes.createStructField(str[0], DataTypes.IntegerType, true));
		fields.add(DataTypes.createStructField(str[1], DataTypes.StringType, true));
		fields.add(DataTypes.createStructField(str[2], DataTypes.IntegerType, true));

		StructType schema = DataTypes.createStructType(fields);  //  id name age

		JavaRDD<Row> rowRDD = rdd.map( new Function<String, Row>() {

			private static final long serialVersionUID = 1L;
			public Row call(String record) throws Exception {
				String[] fields = record.split(",");
				return RowFactory.create(Integer.parseInt(fields[0].trim()), fields[1].trim(),Integer.parseInt(fields[2].trim()));
			}
		});

		List<Row> list = rowRDD.collect();
		for (Row row : list) {
			System.out.println(  row.getInt(0) + "\t"+ row.getString(1) + "\t"+row.getInt(2)  );
		}

		//對RDD應用schema 並註冊一張表:
		DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
		System.out.println( "df : " + df);
		df.registerTempTable("user");

		df.show();
		DataFrame df2 = sqlContext.sql("SELECT id,name,age FROM user WHERE age >= 21 AND age <= 23");

		// 通過sql 查詢的結果是 DataFrame 即df1 它還是支援 RDD的所有正常操作。
		df2.show();
		// 並且 結果中的行列可以按序號訪問。
		List<String> listString = df2.javaRDD().map(new Function<Row, String>() {

			private static final long serialVersionUID = 1L;
			public String call(Row row) {
				System.out.println( row );
				return "Id: " + row.getInt(0) + ", Name: "+row.getString(1) + ", Age: " + row.getInt(2);
			}
		}).collect();

		for (String string : listString) {
			System.out.println(  string );
		}

	}
}

注意如果將上述程式碼段中的一段,即:

        String[] str = schemaString.split(" ");

        fields.add(DataTypes.createStructField(str[0], DataTypes.IntegerType, true));

        fields.add(DataTypes.createStructField(str[1], DataTypes.StringType, true));

        fields.add(DataTypes.createStructField(str[2], DataTypes.IntegerType, true));

改為下面這段程式碼:

   for (String fieldName: schemaString.split(" ")) {


          fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));

      }

將會出現以下錯誤:

        Caused by: scala.MatchError: 1 (of class java.lang.Integer)

那就來認識下 區域性套用和部分應用 : http://www.ibm.com/developerworks/cn/java/j-jn9/


轉載 https://my.oschina.net/gently?tab=newest&catalogId=3583988