1. 程式人生 > >Spark:將DataFrame寫入Mysql

Spark:將DataFrame寫入Mysql

normal avi sqlt getc height serve saveas ecif access

Spark將DataFrame進行一些列處理後,需要將之寫入mysql,下面是實現過程

1.mysql的信息

mysql的信息我保存在了外部的配置文件,這樣方便後續的配置添加。

1 //配置文件示例:
2 [hdfs@iptve2e03 tmp_lillcol]$ cat job.properties 
3 #mysql數據庫配置
4 mysql.driver=com.mysql.jdbc.Driver
5 mysql.url=jdbc:mysql://127.0.0.1:3306/database1?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
6 mysql.username=user 7 mysql.password=123456


2.需要的jar依賴(sbt版本,maven的對應修改即可)

 1 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"
 2 libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"
 3 libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"
 4
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2" 5 libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2" 6 libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2" 7 libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2" 8
libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38" 9 libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2" 10 libraryDependencies += "com.yammer.metrics" % "metrics-core" % "2.2.0"

3.完整實現代碼

  1 import java.io.FileInputStream
  2 import java.sql.{Connection, DriverManager}
  3 import java.util.Properties
  4 
  5 import org.apache.spark.sql.hive.HiveContext
  6 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
  7 import org.apache.spark.{SparkConf, SparkContext}
  8 
  9 /**
 10   * @author Administrator
 11   *         2018/10/16-10:15
 12   *
 13   */
 14 object SaveDataFrameASMysql {
 15   var hdfsPath: String = ""
 16   var proPath: String = ""
 17   var DATE: String = ""
 18 
 19   val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
 20   val sc: SparkContext = new SparkContext(sparkConf)
 21   val sqlContext: SQLContext = new HiveContext(sc)
 22 
 23   def main(args: Array[String]): Unit = {
 24     hdfsPath = args(0)
 25     proPath = args(1)
 26     //不過濾讀取
 27     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
 28     dim_sys_city_dict.show(10)
 29 
 30     //保存mysql
 31     saveASMysqlTable(dim_sys_city_dict, "TestMysqlTble2", SaveMode.Append, proPath)
 32   }
 33 
 34   /**
 35     * 將DataFrame保存為Mysql表
 36     *
 37     * @param dataFrame 需要保存的dataFrame
 38     * @param tableName 保存的mysql 表名
 39     * @param saveMode  保存的模式 :Append、Overwrite、ErrorIfExists、Ignore
 40     * @param proPath   配置文件的路徑
 41     */
 42   def saveASMysqlTable(dataFrame: DataFrame, tableName: String, saveMode: SaveMode, proPath: String) = {
 43     var table = tableName
 44     val properties: Properties = getProPerties(proPath)
 45     val prop = new Properties //配置文件中的key 與 spark 中的 key 不同 所以 創建prop 按照spark 的格式 進行配置數據庫
 46     prop.setProperty("user", properties.getProperty("mysql.username"))
 47     prop.setProperty("password", properties.getProperty("mysql.password"))
 48     prop.setProperty("driver", properties.getProperty("mysql.driver"))
 49     prop.setProperty("url", properties.getProperty("mysql.url"))
 50     if (saveMode == SaveMode.Overwrite) {
 51       var conn: Connection = null
 52       try {
 53         conn = DriverManager.getConnection(
 54           prop.getProperty("url"),
 55           prop.getProperty("user"),
 56           prop.getProperty("password")
 57         )
 58         val stmt = conn.createStatement
 59         table = table.toUpperCase
 60         stmt.execute(s"truncate table $table") //為了不刪除表結構,先truncate 再Append
 61         conn.close()
 62       }
 63       catch {
 64         case e: Exception =>
 65           println("MySQL Error:")
 66           e.printStackTrace()
 67       }
 68     }
 69     dataFrame.write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), table, prop)
 70   }
 71 
 72   /**
 73     * 獲取 Mysql 表的數據
 74     *
 75     * @param sqlContext
 76     * @param tableName 讀取Mysql表的名字
 77     * @param proPath   配置文件的路徑
 78     * @return 返回 Mysql 表的 DataFrame
 79     */
 80   def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String) = {
 81     val properties: Properties = getProPerties(proPath)
 82     sqlContext
 83       .read
 84       .format("jdbc")
 85       .option("url", properties.getProperty("mysql.url"))
 86       .option("driver", properties.getProperty("mysql.driver"))
 87       .option("user", properties.getProperty("mysql.username"))
 88       .option("password", properties.getProperty("mysql.password"))
 89       //        .option("dbtable", tableName.toUpperCase)
 90       .option("dbtable", tableName)
 91       .load()
 92 
 93   }
 94 
 95   /**
 96     * 獲取 Mysql 表的數據 添加過濾條件
 97     *
 98     * @param sqlContext
 99     * @param table           讀取Mysql表的名字
100     * @param filterCondition 過濾條件
101     * @param proPath         配置文件的路徑
102     * @return 返回 Mysql 表的 DataFrame
103     */
104   def readMysqlTable(sqlContext: SQLContext, table: String, filterCondition: String, proPath: String) = {
105     val properties: Properties = getProPerties(proPath)
106     var tableName = ""
107     tableName = "(select * from " + table + " where " + filterCondition + " ) as t1"
108     sqlContext
109       .read
110       .format("jdbc")
111       .option("url", properties.getProperty("mysql.url"))
112       .option("driver", properties.getProperty("mysql.driver"))
113       .option("user", properties.getProperty("mysql.username"))
114       .option("password", properties.getProperty("mysql.password"))
115       .option("dbtable", tableName)
116       .load()
117   }
118 
119   /**
120     * 獲取配置文件
121     *
122     * @param proPath
123     * @return
124     */
125   def getProPerties(proPath: String) = {
126     val properties: Properties = new Properties()
127     properties.load(new FileInputStream(proPath))
128     properties
129   }
130 }

4.測試

 1 def main(args: Array[String]): Unit = {
 2 hdfsPath = args(0)
 3 proPath = args(1)
 4 //不過濾讀取
 5 val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
 6 dim_sys_city_dict.show(10)
 7 
 8 //保存mysql
 9 saveASMysqlTable(dim_sys_city_dict, "TestMysqlTble2", SaveMode.Append, proPath)
10 }

5.運行結果數據敏感進行過處理

 1 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
 2 |dict_id|city_id|city_name|city_code|group_id|group_name|area_code| bureau_id|sort|bureau_name|
 3 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
 4 |     1|    249|       **|    **_ab|     100|      **按時|    **-查到|xcaasd...| 21|    張三公司|
 5 |     2|    240|       **|    **_ab|     300|      **按時|    **-查到|xcaasd...| 21|    張三公司|
 6 |     3|    240|       **|    **_ab|     100|      **按時|    **-查到|xcaasd...| 21|    張三公司|
 7 |     4|    242|       **|    **_ab|     300|      **按時|    **-查到|xcaasd...| 01|    張三公司|
 8 |     5|    246|       **|    **_ab|     100|      **按時|    **-查到|xcaasd...| 01|    張三公司|
 9 |     6|    246|       **|    **_ab|     300|      **按時|    **-查到|xcaasd...| 01|    張三公司|
10 |     7|    248|       **|    **_ab|     200|      **按時|    **-查到|xcaasd...| 01|    張三公司|
11 |     8|    242|       **|    **_ab|     400|      **按時|    **-查到|xcaasd...| 01|    張三公司|
12 |     9|    247|       **|    **_ab|     200|      **按時|    **-查到|xcaasd...| 01|    張三公司|
13 |     0|    243|       **|    **_ab|     400|      **按時|    **-查到|xcaasd...| 01|    張三公司|
14 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
15 
16 mysql> desc TestMysqlTble1;
17 +-------------+-------------+------+-----+---------+-------+
18 | Field       | Type        | Null | Key | Default | Extra |
19 +-------------+-------------+------+-----+---------+-------+
20 | dict_id     | varchar(32) | YES  |     | NULL    |       |
21 | city_id     | varchar(32) | YES  |     | NULL    |       |
22 | city_name   | varchar(32) | YES  |     | NULL    |       |
23 | city_code   | varchar(32) | YES  |     | NULL    |       |
24 | group_id    | varchar(32) | YES  |     | NULL    |       |
25 | group_name  | varchar(32) | YES  |     | NULL    |       |
26 | area_code   | varchar(32) | YES  |     | NULL    |       |
27 | bureau_id   | varchar(64) | YES  |     | NULL    |       |
28 | sort        | varchar(32) | YES  |     | NULL    |       |
29 | bureau_name | varchar(32) | YES  |     | NULL    |       |
30 +-------------+-------------+------+-----+---------+-------+
31 10 rows in set (0.00 sec)
32 
33 mysql> desc TestMysqlTble2;
34 +-------------+------+------+-----+---------+-------+
35 | Field       | Type | Null | Key | Default | Extra |
36 +-------------+------+------+-----+---------+-------+
37 | dict_id     | text | YES  |     | NULL    |       |
38 | city_id     | text | YES  |     | NULL    |       |
39 | city_name   | text | YES  |     | NULL    |       |
40 | city_code   | text | YES  |     | NULL    |       |
41 | group_id    | text | YES  |     | NULL    |       |
42 | group_name  | text | YES  |     | NULL    |       |
43 | area_code   | text | YES  |     | NULL    |       |
44 | bureau_id   | text | YES  |     | NULL    |       |
45 | sort        | text | YES  |     | NULL    |       |
46 | bureau_name | text | YES  |     | NULL    |       |
47 +-------------+------+------+-----+---------+-------+
48 10 rows in set (0.00 sec)
49 
50 
51 mysql> select count(1) from TestMysqlTble1;
52 +----------+
53 | count(1) |
54 +----------+
55 |       21 |
56 +----------+
57 1 row in set (0.00 sec)
58 
59 mysql> select count(1) from TestMysqlTble2;
60 +----------+
61 | count(1) |
62 +----------+
63 |       21 |
64 +----------+
65 1 row in set (0.00 sec)

6.效率問題

一開始直接這麽用的時候小數據還沒什麽,但是數據量大一點的時候速度就不行了,於是想方設法的想優化一下,用了幾個手段效果不明顯,然後進去看源代碼,發現了兩個關鍵的片段

 1  /**
 2    * Saves the content of the [[DataFrame]] to a external database table via JDBC. In the case the
 3    * table already exists in the external database, behavior of this function depends on the
 4    * save mode, specified by the `mode` function (default to throwing an exception).
 5    *
 6    * Don‘t create too many partitions in parallel on a large cluster; otherwise Spark might crash
 7    * your external database systems.
 8    *
 9    * @param url JDBC database url of the form `jdbc:subprotocol:subname`
10    * @param table Name of the table in the external database.
11    * @param connectionProperties JDBC database connection arguments, a list of arbitrary string
12    *                             tag/value. Normally at least a "user" and "password" property
13    *                             should be included.
14    *
15    * @since 1.4.0
16    */
17   def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
18     val props = new Properties()
19     extraOptions.foreach { case (key, value) =>
20       props.put(key, value)
21     }
22     // connectionProperties should override settings in extraOptions
23     props.putAll(connectionProperties)
24     val conn = JdbcUtils.createConnectionFactory(url, props)()
25 
26     try {
27       var tableExists = JdbcUtils.tableExists(conn, url, table)
28 
29       if (mode == SaveMode.Ignore && tableExists) {
30         return
31       }
32 
33       if (mode == SaveMode.ErrorIfExists && tableExists) {
34         sys.error(s"Table $table already exists.")
35       }
36 
37       if (mode == SaveMode.Overwrite && tableExists) {
38         JdbcUtils.dropTable(conn, table)
39         tableExists = false
40       }
41 
42       // Create the table if the table didn‘t exist.
43       if (!tableExists) {
44         val schema = JdbcUtils.schemaString(df, url)
45         val sql = s"CREATE TABLE $table ($schema)"
46         val statement = conn.createStatement
47         try {
48           statement.executeUpdate(sql)
49         } finally {
50           statement.close()
51         }
52       }
53     } finally {
54       conn.close()
55     }
56 
57     JdbcUtils.saveTable(df, url, table, props)//-----------------------------關鍵點1
58   }
59 
60 
61   /**
62    * Saves the RDD to the database in a single transaction.
63    */
64   def saveTable(
65       df: DataFrame,
66       url: String,
67       table: String,
68       properties: Properties) {
69     val dialect = JdbcDialects.get(url)
70     val nullTypes: Array[Int] = df.schema.fields.map { field =>
71       getJdbcType(field.dataType, dialect).jdbcNullType
72     }
73 
74     val rddSchema = df.schema
75     val getConnection: () => Connection = createConnectionFactory(url, properties)
76     val batchSize = properties.getProperty("batchsize", "1000").toInt
77     df.foreachPartition { iterator => //------------------------------------關鍵點2
78       savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
79     }
80   }

也就是說,自帶的方法就是按照分區來存的,每一個分區開啟一個mysql連接,所以最簡單的優化方式就是在保存之前對DataFrame進行重新分區,註意數據傾斜問題,不然可能效率沒有提升。
當然目前測試過最快的就是文件拿下來直接通過load data的命令導入mysql,但是這個比較麻煩。

下面是分區示例

 1 def main(args: Array[String]): Unit = {
 2     hdfsPath = args(0)
 3     proPath = args(1)
 4     //不過濾讀取
 5     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
 6     dim_sys_city_dict.show(10)
 7 
 8     //保存mysql
 9     saveASMysqlTable(dim_sys_city_dict.repartition(10), "TestMysqlTble2", SaveMode.Append, proPath)
10   }

7.總結

將DataFrame寫入mysql有幾點需要註意的地方:

  • 需要保存的表最好事先建好,否則字段類型會使用默認的,Text類型實在是耗資源,對比前後兩張表,下面分別為源表TestMysqlTble1和DataFrame保存的mysql表TestMysqlTble2
 1 mysql> desc TestMysqlTble1;                
 2 +-------------+-------------+------+-----+---------+-------+
 3 | Field       | Type        | Null | Key | Default | Extra |
 4 +-------------+-------------+------+-----+---------+-------+
 5 | dict_id     | varchar(32) | YES  |     | NULL    |       |
 6 | city_id     | varchar(32) | YES  |     | NULL    |       |
 7 | city_name   | varchar(32) | YES  |     | NULL    |       |
 8 | city_code   | varchar(32) | YES  |     | NULL    |       |
 9 | group_id    | varchar(32) | YES  |     | NULL    |       |
10 | group_name  | varchar(32) | YES  |     | NULL    |       |
11 | area_code   | varchar(32) | YES  |     | NULL    |       |
12 | bureau_id   | varchar(64) | YES  |     | NULL    |       |
13 | sort        | varchar(32) | YES  |     | NULL    |       |
14 | bureau_name | varchar(32) | YES  |     | NULL    |       |
15 +-------------+-------------+------+-----+---------+-------+
16 10 rows in set (0.00 sec)
17 
18 mysql> desc TestMysqlTble2;
19 +-------------+------+------+-----+---------+-------+
20 | Field       | Type | Null | Key | Default | Extra |
21 +-------------+------+------+-----+---------+-------+
22 | dict_id     | text | YES  |     | NULL    |       |
23 | city_id     | text | YES  |     | NULL    |       |
24 | city_name   | text | YES  |     | NULL    |       |
25 | city_code   | text | YES  |     | NULL    |       |
26 | group_id    | text | YES  |     | NULL    |       |
27 | group_name  | text | YES  |     | NULL    |       |
28 | area_code   | text | YES  |     | NULL    |       |
29 | bureau_id   | text | YES  |     | NULL    |       |
30 | sort        | text | YES  |     | NULL    |       |
31 | bureau_name | text | YES  |     | NULL    |       |
32 +-------------+------+------+-----+---------+-------+
33 10 rows in set (0.00 sec)
  • 關於 SaveMode.Overwrite
 1 def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
 2     val props = new Properties()
 3     extraOptions.foreach { case (key, value) =>
 4       props.put(key, value)
 5     }
 6     // connectionProperties should override settings in extraOptions
 7     props.putAll(connectionProperties)
 8     val conn = JdbcUtils.createConnectionFactory(url, props)()
 9 
10     try {
11       var tableExists = JdbcUtils.tableExists(conn, url, table)
12 
13       if (mode == SaveMode.Ignore && tableExists) {
14         return
15       }
16 
17       if (mode == SaveMode.ErrorIfExists && tableExists) {
18         sys.error(s"Table $table already exists.")
19       }
20 
21       if (mode == SaveMode.Overwrite && tableExists) {
22         JdbcUtils.dropTable(conn, table)//----------------------------------------關鍵點1
23         tableExists = false
24       }
25 
26       // Create the table if the table didn‘t exist.
27       if (!tableExists) {
28         val schema = JdbcUtils.schemaString(df, url)
29         val sql = s"CREATE TABLE $table ($schema)"
30         val statement = conn.createStatement
31         try {
32           statement.executeUpdate(sql)
33         } finally {
34           statement.close()
35         }
36       }
37     } finally {
38       conn.close()
39     }
40 
41     JdbcUtils.saveTable(df, url, table, props)
42   }
43 
44  /**
45    * Drops a table from the JDBC database.
46    */
47   def dropTable(conn: Connection, table: String): Unit = {
48     val statement = conn.createStatement
49     try {
50       statement.executeUpdate(s"DROP TABLE $table")//-------------------------------------關鍵點2
51     } finally {
52       statement.close()
53     }
54   }

從上述兩段關鍵代碼可以看到,在寫入的時候會先判斷表存不存在,SaveMode.Overwrite 的時候會執行 dropTable(conn: Connection, table: String)把原來的表刪除掉,這也意味著你會失去你的表結構,新建的表會出現上一個問題都用默認類型,所以在保存的方法中我添加了下面的操作

 1 if (saveMode == SaveMode.Overwrite) {
 2  51       var conn: Connection = null
 3  52       try {
 4  53         conn = DriverManager.getConnection(
 5  54           prop.getProperty("url"),
 6  55           prop.getProperty("user"),
 7  56           prop.getProperty("password")
 8  57         )
 9  58         val stmt = conn.createStatement
10  59         table = table.toUpperCase
11  60         stmt.execute(s"truncate table $table") //為了不刪除表結構,先truncate 再Append
12  61         conn.close()
13  62       }
14  63       catch {
15  64         case e: Exception =>
16  65           println("MySQL Error:")
17  66           e.printStackTrace()
18  67       }

truncate僅僅是刪除數據,並不刪除結構。

如果表一開始不存在

如果一開始不存在需要分兩種情況:

1.非SaveMode.Overwrite模式

沒有問題,會直接建表,用默認的數據類型

2.SaveMode.Overwrite模式

會報錯,下面是在沒有TestMysqlTble2的情況下使用SaveMode.Overwrite

 1 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table ‘iptv.TESTMYSQLTBLE2‘ doesn‘t exist
 2         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 3         at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 4         at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 5         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
 6         at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
 7         at com.mysql.jdbc.Util.getInstance(Util.java:387)
 8         at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:939)
 9         at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3878)
10         at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3814)
11         at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2478)
12         at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2625)
13         at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2547)
14         at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2505)
15         at com.mysql.jdbc.StatementImpl.executeInternal(StatementImpl.java:840)
16         at com.mysql.jdbc.StatementImpl.execute(StatementImpl.java:740)
17         at com.iptv.job.basedata.SaveDataFrameASMysql$.saveASMysqlTable(SaveDataFrameASMysql.scala:62)
18         at com.iptv.job.basedata.SaveDataFrameASMysql$.main(SaveDataFrameASMysql.scala:33)
19         at com.iptv.job.basedata.SaveDataFrameASMysql.main(SaveDataFrameASMysql.scala)
20         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
21         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
22         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
23         at java.lang.reflect.Method.invoke(Method.java:498)
24         at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
25         at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
26         at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
27         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
28         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

報錯詳情

1 at com.iptv.job.basedata.SaveDataFrameASMysql$.saveASMysqlTable(SaveDataFrameASMysql.scala:62)
2 生面報錯位置對應的代碼為
3 stmt.execute(s"truncate table $table") //為了不刪除表結構,先truncate 再Append
4 即truncate需要表存在

至此,DataFrame寫mysql功能實現

文章為個人工作總結,轉載請註明出處!!!!!!!

Spark:將DataFrame寫入Mysql