1. 程式人生 > >hive與mysql兩種數據源之間的join

hive與mysql兩種數據源之間的join

alt mar tab tom hql sql 行數 成功 use

  這篇文章是基於上一篇文章的續集

一:程序

1.程序、

 1 package com.scala.it
 2 
 3 import java.util.Properties
 4 
 5 import org.apache.spark.sql.SaveMode
 6 import org.apache.spark.sql.hive.HiveContext
 7 import org.apache.spark.{SparkConf, SparkContext}
 8 
 9 object HiveToMysql {
10   def main(args: Array[String]): Unit = {
11 val conf = new SparkConf() 12 .setMaster("local[*]") 13 .setAppName("hive-yo-mysql") 14 val sc = SparkContext.getOrCreate(conf) 15 val sqlContext = new HiveContext(sc) 16 val (url, username, password) = ("jdbc:mysql://linux-hadoop01.ibeifeng.com:3306/hadoop09", "root", "123456")
17 val props = new Properties() 18 props.put("user", username) 19 props.put("password", password) 20 21 // ================================== 22 // 第一步:同步hive的dept表到mysql中 23 sqlContext 24 .read 25 .table("hadoop09.dept") // database.tablename 26 .write 27 .mode(SaveMode.Overwrite) //
存在覆蓋 28 .jdbc(url, "mysql_dept", props) 29 30 // 第二步:hive表和mysql表進行數據join操作 ==> 采用HQL語句實現 31 // 2.1 將mysql的數據註冊成為臨時表 32 sqlContext 33 .read 34 .jdbc(url, "mysql_dept", props) 35 .registerTempTable("temp_mysql_dept") // 臨時表中不要出現"." 36 37 // 第三步數據join 38 sqlContext.sql( 39 """ 40 |SELECT a.*,b.dname,b.loc 41 |FROM hadoop09.emp a join temp_mysql_dept b on a.deptno = b.deptno 42 """.stripMargin) 43 .write 44 .format("org.apache.spark.sql.execution.datasources.parquet") 45 .mode(SaveMode.Overwrite) 46 .save("/spark/join/parquet") 47 48 // 檢測數據是否join成功 49 sqlContext 50 .read 51 .format("parquet") 52 .load("/spark/join/parquet") 53 .show() 54 55 } 56 }

2.效果

  技術分享圖片

二:知識點

1.format

  可以寫包名。

  技術分享圖片

hive與mysql兩種數據源之間的join