1. 程式人生 > >spark讀取hive資料-java

spark讀取hive資料-java

需求:將hive中的資料讀取出來,寫入es中。

環境:spark 2.0.2

1. SparkSession裡設定enableHiveSupport()
		SparkConf conf = new SparkConf().setAppName("appName").setMaster("local[*]");

		SparkSession spark = SparkSession
                .builder()
                .appName("Java Spark SQL basic example hive")
                .config(conf)
                .enableHiveSupport()  //支援hive
                .getOrCreate();

2. pom 新增依賴 ( 對hive版本沒要求 )
		<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>1.2.1</version>
        </dependency>

或者

		<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

3. 將配置檔案放在spark的conf下

參考官方文件

Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) file in conf/.
4. spark.sql讀取資料
		SparkSession spark = ESMysqlSpark.getSession();
		String querySql = "SELECT * FROM test.table";
		spark.sql(querySql);

5. hive sql 語句

需求:合併兩個欄位,組成一個新的字串。

可以先用udf註冊一個函式

		spark.udf().register("mode", new UDF2<String, Long, String>() {
                public String call(String types, Long time) throws Exception {
                    return types.replace(".", "") + String.valueOf(time);
                }}, DataTypes.StringType);

求某欄位的平均值(輸出為int型)、某欄位的最大/最小值、日期欄位格式化輸出等等。這種需求則都可以在hive語句中實現。

			String querySql = String.format("SELECT mode(ip, unix_timestamp()) id," +
                    " ip, " +
                    "cast(avg(t1) as bigint) f1, " +
                    "cast(avg(t2) as bigint) f2, " +
                    "min(t3) minSpeed, " +
                    "max(t4) maxSpeed, " +
                    "from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss') time " +
                    "FROM test.table " +
                    "where time > %s " +
                    "group by ip ", timeLimit);
  • unix_timestamp 獲取當前時間戳
  • cast(expression AS data_type) 資料型別轉換
  • from_unixtime(unix_timestamp(),‘yyyy-MM-dd HH:mm:ss’) 日期格式化輸出
6. 寫es

通過 ds.show()檢視資料是否正確

			Dataset ds = spark.sql(querySql);
            EsSparkSQL.saveToEs(ds, "sha_parking/t_speedInformation");

專案打包

  • mvn package 普通打包
  • mvn assembly:assembly 包含依賴包

遇到的問題

如果讀取不到資料。先確認以下配置:

  1. /etc/hosts中確保 127.0.0.1 hostname 已新增
  2. 檢視 $SPARK_HOME/conf/spark-env.sh, 確保ip地址是否正確
問題: Hive Schema version 2.1.0 does not match metastore
mysql -uroot -p  
use hive;
select * from VERSION;
update VERSION set SCHEMA_VERSION='2.1.1' where  VER_ID=1;