1. 程式人生 > >spark寫入mysql的幾種方法,針對不同場景

spark寫入mysql的幾種方法,針對不同場景

方法一:各個欄位都是提前定好的

val prop = new java.util.Properties
prop.setProperty("user", "root")
prop.setProperty("password", "123456")

df1.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.1.97:3306/xiang_log", "nginx_code_phone", prop)
df2.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.1.97:3306/xiang_log", "nginx_params_phone", prop)

方法二:欄位可自由添減

    df.foreachPartition(p => {
      @transient val conn = ConnectionPool.getConnection
      p.foreach(x => {
        val sql = "insert into app_id(id,date,appid,num) values (" +
          "'"+UUID.randomUUID+"'," +
          "'"+x.getInt(0)+"'," +
          "'"+x.getString(1)+"'," +
          "'"+x.getLong(2)+"'" +
          ")"
        val stmt = conn.createStatement
        stmt.executeUpdate(sql)
      })
      ConnectionPool.returnConnection(conn)
    })
資料庫連結池:
package com.prince.spark.util;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;

public class ConnectionPool {
    private static LinkedList<Connection> connectionQueue;

    static {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        }catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    public synchronized static Connection getConnection() {
        try {
            if (connectionQueue == null) {
                connectionQueue = new LinkedList<Connection>();
                for (int i = 0;i < 5;i ++) {
                    Connection conn = DriverManager.getConnection(
                            "jdbc:mysql://192.168.1.97:3306/xiang_log?characterEncoding=utf8",
                            "root",
                            "123456"
                    );
                    connectionQueue.push(conn);
                }
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
        return connectionQueue.poll();
    }

    public static void returnConnection(Connection conn) {
        connectionQueue.push(conn);
    }
}

方法三:有時涉及到計算結果的寫入,還要組裝df

//組裝結果RDD
val arrayRDD = sc.parallelize(List ((num,log_date)))
//將結果RDD對映到rowRDD
val resultRowRDD = arrayRDD.map(p =>Row(
  p._1.toInt,
  p._2.toString,
  new Timestamp(new java.util.Date().getTime)
))
//通過StructType直接指定每個欄位的schema
val resultSchema = StructType(
  List(
    StructField("verify_num", IntegerType, true), 
    StructField("log_date", StringType, true), //是哪一天日誌分析出來的結果
    StructField("create_time", TimestampType, true) //分析結果的建立時間
  )
)
//組裝新的DataFrame
val DF = spark.createDataFrame(resultRowRDD,resultSchema)
//將結果寫入到Mysql
DF.write.mode("append")
  .format("jdbc")
  .option("url","jdbc:mysql://192.168.1.97:3306/xiang_log")
  .option("dbtable","verify") //表名
  .option("user","root")
  .option("password","123456")
  .save()

--------------------- 本文來自 放開那個產品經理 的CSDN 部落格 ,全文地址請點選:https://blog.csdn.net/qq_39869388/article/details/80423151?utm_source=copy