SparkSQL DataFrame與MySQL增刪改查那些事兒
在使用Spark中通過各種運算元計算完後各種指標後,一般都需要將計算好的結果資料存放到關係型資料庫,比如SQL/">MySQL和PostgreSQL等,隨後配置到展示平臺進行展現,花花綠綠的圖表就生成了。下面我講解一下,在Spark中如何通過c3p0連線池的方式對MySQL進行增加改查(CRUD),增加(Create),讀取查詢(Retrieve),更新(Update)和刪除(Delete)。
專案github地址:ofollow,noindex">spark-mysql
1.Create(增加)
case class CardMember(m_id:String,card_type:String,expire:Timestamp,duration:Int,is_sale:Boolean,date:Date,user:long,salary:Float) val memberSeq = Seq( CardMember(“member_2”,“月卡”,新時間戳(System.currentTimeMillis()),31,false,新日期(System.currentTimeMillis()),123223,0.32f), CardMember(“member_1” “,”季卡“,新的時間戳(System.currentTimeMillis()),93,false,new Date(System.currentTimeMillis()),124224,0.362f) ) val memberDF = memberSeq.toDF() //把DataFrame存入到MySQL的中,如果資料庫中不存在此表的話就會自動建立 MySQLUtils.saveDFtoDBCreateTableIfNotExist( “member_test”,memberDF)
2.Retrieve(讀取查詢)
//根據表名把MySQL中的資料表直接對映成DataFrame MySQLUtils.getDFFromMysql(hiveContext,“member_test”,null)
3.Update(更新)
//根據主鍵更新指定欄位,如果沒有此主鍵資料則直接插入 MySQLUtils.insertOrUpdateDFtoDBUsePool(“member_test”,memberDF,Array(“user”,“salary”))
4.Delete(刪除)
//刪除指定條件的資料 MySQLUtils.deleteMysqlTable(hiveContext,“member_test”,“m_id ='member_1'”); //刪除指定資料表 MySQLUtils.dropMysqlTable(hiveContext, “member_test”);
具體操作步驟如下:
在pom.xml中匯入MySQL聯結器jar包和c3p0的依賴包,並匯入更改
<dependency> <groupId> mysql </ groupId> <artifactId> mysql-connector-java </ artifactId> <version> 5.1.38 </ version> </ dependency> <dependency> <groupId> com.mchange </ groupId> <artifactId> c3p0 </ artifactId> <version> 0.9.5 </ version> </ dependency>
把資料庫連線池的獲取,DDL和DML操作方法都封裝在了下面3個工具類中
PropertyUtils獲取conf / mysql-user.properties檔案的配置資訊
package utils import java.util.Properties / ** *使用IntelliJ IDEA建立。 *作者:[email protected] *描述:PropertyUtils工具類 *日期:創建於2018-11-17 11:43 * / object PropertyUtils { def getFileProperties(fileName:String,propertyKey:String):String = { val result = this.getClass.getClassLoader.getResourceAsStream(fileName) val prop = new Properties prop.load(result) prop.getProperty(propertyKey) } }
MySQLPoolManager此類封裝了資料庫連線池的獲取
package utils import java.sql.Connection import com.mchange.v2.c3p0.ComboPooledDataSource / ** *使用IntelliJ IDEA建立。 *作者:[email protected] *描述:MySQL連線池管理類 *日期:創建於2018-11-17 12:43 * / object MySQLPoolManager { var mysqlManager:MysqlPool = _ def getMysqlManager:MysqlPool = { synchronized { if(mysqlManager == null){ mysqlManager = new MysqlPool } } mysqlManager } class MysqlPool extends Serializable { private val cpds:ComboPooledDataSource = new ComboPooledDataSource(true) try { cpds.setJdbcUrl(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.jdbc.url”)) cpds.setDriverClass(PropertyUtils.getFileProperties) (“mysql-user.properties”,“mysql.pool.jdbc.driverClass”)) cpds.setUser(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.jdbc.username”)) cpds.setPassword(PropertyUtils .getFileProperties(“mysql-user.properties”,“mysql.jdbc.password”)) cpds.setMinPoolSize(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.pool.jdbc.minPoolSize”)。toInt) cpds.setMaxPoolSize(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.pool.jdbc.maxPoolSize”)。toInt) cpds.setAcquireIncrement(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.pool。 jdbc.acquireIncrement“)。toInt) cpds.setMaxStatements(PropertyUtils.getFileProperties(”mysql-user.properties“,”mysql.pool.jdbc.maxStatements“)。toInt) } catch { case e:Exception => e.printStackTrace( ) } def getConnection:Connection = { try { cpds.getConnection() } catch { case ex:Exception => ex.printStackTrace() null } } def close():Unit = { try { cpds.close() } catch { case ex:Exception => ex.printStackTrace() } } } }
MySQLUtils封裝了增加改查方法,直接使用即可
package utils import java.sql。{Date,Timestamp} import java.util.Properties import org.apache.log4j.Logger import org.apache.spark.sql.types._ import org.apache.spark.sql。{DataFrame,SQLContext} / ** *使用IntelliJ IDEA建立。 *作者:[email protected] *描述:MySQL DDL和DML工具類 *日期:創建於2018-11-17 12:43 * / object MySQLUtils { val logger:Logger = Logger.getLogger(getClass.getSimpleName) / ** *將DataFrame所有型別(除id外)轉換為String後,通過c3p0的連線池方法,向mysql寫入資料 * * @param tableName表名 * @param resultDateFrame DataFrame * / def saveDFtoDBUsePool(tableName:String ,resultDateFrame:DataFrame){ val colNumbers = resultDateFrame.columns.length val sql = getInsertSql(tableName,colNumbers) val columnDataTypes = resultDateFrame.schema.fields.map(_。dataType) resultDateFrame.foreachPartition(partitionRecords => { val conn = MySQLPoolManager .getMysqlManager.getConnection //從連線池中獲取一個連線 val preparedStatement = conn.prepareStatement(sql) val metaData = conn.getMetaData.getColumns(null,“%”,tableName,“%”)//通過連接獲取表名對應資料表的元資料 try { conn.setAutoCommit(false) partitionRecords.foreach(record => { //注意:setString方法從1開始,record.getString()方法從0開始 for(i < - 1 to colNumbers){ val value = record。 get(i - 1) val dateType = columnDataTypes(i - 1) if(value!= null){//如何值不為空,將型別轉換為String preparedStatement.setString(i,value.toString) dateType match { case _:ByteType => preparedStatement.setInt(i,record.getAs [Int](i - 1)) case _:ShortType => preparedStatement.setInt(i,record.getAs [Int](i - 1)) case _: IntegerType => preparedStatement.setInt(i,record.getAs [Int](i - 1)) case _:LongType => preparedStatement.setLong(i,record.getAs [Long](i - 1)) case _:BooleanType => preparedStatement.setBoolean(i,record.getAs [Boolean](i - 1)) case _ :FloatType => preparedStatement.setFloat(i,record.getAs [Float](i - 1)) case _:DoubleType => preparedStatement.setDouble(i,record.getAs [Double](i - 1)) case _:StringType => preparedStatement.setString(i,record.getAs [String](i - 1)) case _:TimestampType => preparedStatement.setTimestamp(i,record.getAs [Timestamp](i - 1)) case _:DateType => preparedStatement.setDate(i,record.getAs [Date](i - 1)) case _ => throw new RuntimeException(s“nonsupport $ {dateType} !!!”) } } else {//如果值為空,將值設為對應型別的空值 metaData.absolute(i) preparedStatement.setNull( i,metaData.getInt(“DATA_TYPE”)) } } preparedStatement.addBatch() }) preparedStatement.executeBatch() conn.commit() } catch { case e:Exception => println(s“@@ saveDFtoDBUsePool $ {e。 getMessage}“) //做一些log } finally { preparedStatement.close() conn.close() } }) } / ** *拼裝插入SQL * @param tableName * @param colNumbers * @return * / def getInsertSql(tableName:String,colNumbers:Int):String = { var sqlStr =“insert into”+ tableName +“values(” for (i < - 1 to colNumbers){ sqlStr + =“?” if if(i!= colNumbers){ sqlStr + =“,” } } sqlStr + =“)” sqlStr } / **以元組的方式返回mysql屬性資訊** / def getMySQLInfo:(String,String,String)= { val jdbcURL = PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.jdbc.url”) VAL的userName = PropertyUtils.getFileProperties( “mysql-user.properties”, “mysql.jdbc.username”) VAL密碼= PropertyUtils.getFileProperties( “mysql-user.properties”, “mysql.jdbc.password”) (JDBCURL,使用者名稱,passWord) } / ** *從MySQL的資料庫中獲取DateFrame * * @引數sqlContext sqlContext * @引數mysqlTableName表名 * @引數queryCondition查詢條件(可選) * @返回DateFrame * / DEF getDFFromMysql(sqlContext:SQLContext,mysqlTableName:字串,queryCondition :String):DataFrame = { val(jdbcURL,userName,passWord)= getMySQLInfo val prop = new Properties() prop.put(“user”,userName) prop.put(“password”,passWord) if(null == queryCondition ||“”= = queryCondition) sqlContext.read.jdbc(jdbcURL,mysqlTableName,prop) else sqlContext.read.jdbc(jdbcURL,mysqlTableName,prop).where(queryCondition) } / ** *刪除資料表 * @param sqlContext * @param mysqlTableName * @return * / def dropMysqlTable(sqlContext:SQLContext,mysqlTableName:String):Boolean = { val conn = MySQLPoolManager.getMysqlManager.getConnection //從連線池中獲取一個連線 val preparedStatement = conn.createStatement() try { preparedStatement.execute(s“drop table $ mysqlTableName”) } catch { case e:Exception => println(s“mysql dropMysqlTable error:$ {e.getMessage}”) false } finally { preparedStatement.close() conn.close() } } / ** *刪除表中的資料 * @param sqlContext * @param mysqlTableName * @param condition * @return * / def deleteMysqlTableData(sqlContext:SQLContext,mysqlTableName:String,condition:String):Boolean = { val conn = MySQLPoolManager。 getMysqlManager.getConnection //從連線池中獲取一個連線 val preparedStatement = conn.createStatement() try { preparedStatement.execute(s“從$ mysqlTableName中刪除$ condition”) } catch { case e:Exception => println(s“ mysql deleteMysqlTable錯誤:$ {e.getMessage}“) false } finally { preparedStatement.close() conn.close() } } / ** *儲存DataFrame到MySQL中,如果表不存在的話,會自動建立 * @param tableName * @param resultDateFrame * / def saveDFtoDBCreateTableIfNotExist(tableName:String,resultDateFrame:DataFrame){ //如果沒有表,根據DataFrame建表 createTableIfNotExist(tableName,resultDateFrame) //驗證資料表字段和dataFrame欄位個數和名稱,順序是否一致 verifyFieldConsistency(tableName,resultDateFrame) //儲存df saveDFtoDBUsePool(tableName,resultDateFrame) } / ** *拼裝insertOrUpdate SQL語句 * @param tableName * @param cols * @param updateColumns * @return * / def getInsertOrUpdateSql(tableName:String,cols:Array [String],updateColumns:Array [String]):String = { val colNumbers = cols.length var sqlStr =“insert into”+ tableName +“values(” for(i < - 1 to colNumbers){ sqlStr + =“?” if if(i!= colNumbers){ sqlStr + =“,” } } sqlStr + = “)ON DUPLICATE KEY UPDATE” updateColumns.foreach(str => { sqlStr + = s“$ str =?,” }) sqlStr.substring(0,sqlStr.length - 1) } / ** *通過insertOrUpdate的方式把DataFrame寫入到MySQL中,注意:此方式,必須對錶設定主鍵 * @param tableName * @param resultDateFrame * @param updateColumns * / def insertOrUpdateDFtoDBUsePool(tableName:String,resultDateFrame:DataFrame ,updateColumns:Array [String]){ val colNumbers = resultDateFrame.columns.length val sql = getInsertOrUpdateSql(tableName,resultDateFrame.columns,updateColumns) val columnDataTypes = resultDateFrame.schema.fields.map(_。dataType) println(“## ############ sql =“+ sql” resultDateFrame.foreachPartition(partitionRecords => { val conn = MySQLPoolManager.getMysqlManager.getConnection //從連線池中獲取一個連線 val preparedStatement = conn.prepareStatement(sql) val metaData = conn.getMetaData.getColumns(null,“%”,tableName,“%”)//通過連接獲取表名對應資料表的元資料 try { conn.setAutoCommit(false ) partitionRecords.foreach(record => { //注意:setString方法從1開始,record.getString()方法從0開始 for(i < - 1 to colNumbers){ val value = record.get(i - 1) val dateType = columnDataTypes(i - 1) if(value!= null){//如何值不為空,將型別轉換為String preparedStatement.setString(i,value.toString) dateType match { case _:ByteType => preparedStatement。 setInt(i,record.getAs [Int](i - 1)) case _:ShortType => preparedStatement.setInt(i,record.getAs [Int](i - 1)) case _:IntegerType => preparedStatement.setInt(i,record.getAs [Int](i - 1)) case _ :LongType => preparedStatement.setLong(i,record.getAs [Long](i - 1)) case _:BooleanType => preparedStatement.setInt(i,if(record.getAs [Boolean](i - 1))1 else 0) case _:FloatType => preparedStatement.setFloat(i,record.getAs [Float](i - 1)) case _:DoubleType => preparedStatement.setDouble(i,record.getAs [Double](i - 1)) case _:StringType => preparedStatement.setString(i,record.getAs [String](i - 1)) case _:TimestampType => preparedStatement.setTimestamp(i,record.getAs [Timestamp](i - 1)) case _:DateType => preparedStatement.setDate(i,record.getAs [Date](i - 1)) case _ =>丟擲新的RuntimeException(s“nonsupport $ {dateType} !!!”) } } else {//如果值為空,將值設為對應型別的空值 metaData.absolute(i) preparedStatement.setNull(i, metaData.getInt(“DATA_TYPE”)) } } //設定需要更新的欄位值 用於(ⅰ< - 1至updateColumns.length){ VAL欄位索引= record.fieldIndex(updateColumns(I - 1)) VAL值= record.get(欄位索引) VAL的dataType = columnDataTypes(欄位索引) println(s“@@ $ fieldIndex,$ value,$ dataType”) if(value!= null){//如何值不為空,將型別轉換為String dataType match { case _:ByteType => preparedStatement.setInt (colNumbers + i,record.getAs [Int](fieldIndex)) case _:ShortType => preparedStatement.setInt(colNumbers + i,record.getAs [Int](fieldIndex)) case _:IntegerType => preparedStatement.setInt(colNumbers + i,record.getAs [Int](fieldIndex)) case _:LongType => preparedStatement.setLong(colNumbers + i,record.getAs [Long](fieldIndex)) case _ :BooleanType => preparedStatement.setBoolean(colNumbers + i,record.getAs [Boolean](fieldIndex)) case _:FloatType => preparedStatement.setFloat(colNumbers + i,record.getAs [Float](fieldIndex)) case _:DoubleType => preparedStatement.setDouble(colNumbers + i,record.getAs [Double](fieldIndex)) case _:StringType => preparedStatement.setString(colNumbers + i,record.getAs [String](fieldIndex)) case _:TimestampType => preparedStatement.setTimestamp(colNumbers + i,record.getAs [Timestamp](fieldIndex)) case _:DateType => preparedStatement.setDate(colNumbers + i,record.getAs [Date](fieldIndex)) case _ =>丟擲新的RuntimeException(s“nonsupport $ {dataType} !!!”) } } else {//如果值為空,將值設為對應型別的空值 metaData.absolute(colNumbers + i) preparedStatement.setNull( colNumbers + i,metaData.getInt(“DATA_TYPE”)) } } preparedStatement.addBatch() }) preparedStatement.executeBatch() conn.commit() } catch { case e:Exception => println(s“@@ insertOrUpdateDFtoDBUsePool $ {e.getMessage}”) //做一些log } finally { preparedStatement.close() conn.close() } }) } / ** *如果資料表不存在,根據DataFrame的欄位建立資料表,資料表字段順序和dataFrame對應 *若DateFrame出現名為id的欄位,將其設為資料庫主鍵(int,自增,主鍵),其他欄位會根據DataFrame的DataType型別來自動對映到MySQL中 * * @param tableName表名 * @param df dataFrame * @return * / def createTableIfNotExist(tableName:String,df:DataFrame):AnyVal = { val con = MySQLPoolManager .getMysqlManager.getConnection val metaData = con.getMetaData val colResultSet = metaData.getColumns(null,“%”,tableName,“%”) //如果沒有該表,建立資料表 if(!colResultSet.next()){ / /構建建表字符串 val sb = new StringBuilder(s“CREATE TABLE` $ tableName`(”)df.schema.fields.foreach(x => if(x.name.equalsIgnoreCase(“id”)){ sb.append(s“`$ {x.name}`int(255)NOT NULL AUTO_INCREMENT PRIMARY KEY,”)//如果是欄位名為id,設定主鍵,整形,自增 } else { x.dataType match { case _:ByteType => sb.append(s“`$ {x.name}`int(100)DEFAULT NULL,”) case _:ShortType => sb .append(s“`$ {x.name}`int(100)DEFAULT NULL,”) case _:IntegerType => sb.append(s“`$ {x.name}`int(100)DEFAULT NULL,” ) case _:LongType => sb.append(s“`$ {x.name}`bigint(100)DEFAULT NULL,”) case _:BooleanType => sb.append(s“`$ {x.name}` tinyint DEFAULT NULL,“) case _:FloatType => sb.append(s”`$ {x。name}`float(50)DEFAULT NULL,“) case _:DoubleType => sb.append(s“`$ {x.name}`double(50)DEFAULT NULL,”) case _:StringType => sb.append(s“`$ {x.name}`varchar (50)DEFAULT NULL,“) case _:TimestampType => sb.append(s”`$ {x.name}`timestamp DEFAULT current_timestamp,“) case _:DateType => sb.append(s”`$ {x .name}`date DEFAULT NULL,“) case _ => throw new RuntimeException(s”nonsupport $ {x.dataType} !!!“) } } ) sb.append(”)ENGINE = InnoDB DEFAULT CHARSET = utf8“) val sql_createTable = sb.deleteCharAt(sb.lastIndexOf(','))。toString() println(sql_createTable) val statement = con。的createStatement() statement.execute(sql_createTable) } } / ** *驗證資料表和dataFrame欄位個數,名稱,順序是否一致 * * @param tableName表名 * @param df dataFrame * / def verifyFieldConsistency(tableName:String,df:DataFrame):Unit = { val con = MySQLPoolManager.getMysqlManager.getConnection val metaData = con.getMetaData val colResultSet = metaData.getColumns(null,“%”,tableName,“%”) colResultSet.last() val tableFiledNum = colResultSet.getRow val dfFiledNum = df.columns.length if (tableFiledNum!= dfFiledNum){ throw new Exception(s“資料表和DataFrame欄位個數不一致!! table - $ tableFiledNum但dataFrame - $ dfFiledNum”) } for(i < - 1 to tableFiledNum){ colResultSet.absolute(I) VAL tableFileName = colResultSet.getString( “COLUMN_NAME”) VAL dfFiledName = df.columns.apply(I - 1) (!tableFileName.equals(dfFiledName)){IF 丟擲新的異常(一個或多個“資料表和DataFrame欄位名不一致!! table - '$ tableFileName'但dataFrame - '$ dfFiledName'“) } } colResultSet.beforeFirst() } }