1. 程式人生 > >spark從mysql讀取資料(redis/mongdb/hbase等類似,換成各自RDD即可)

spark從mysql讀取資料(redis/mongdb/hbase等類似,換成各自RDD即可)

package com.ws.jdbc
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * spark從mysql讀取資料,不需要sqoop了(hadoop需要依靠sqoop匯入匯出資料至mysql)
  */
object JdbcRDDTest {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("JdbcRDDTest").setMaster("local[4]")

    val sc = new SparkContext(conf)

    //引數1:sc;引數2:資料庫連線;引數3:sql語句;引數4,5:條件;引數6:分割槽數;引數7:返回結果處理函式
    val result: JdbcRDD[(Int, String, Int)] = new JdbcRDD(sc, getConnection, "select * from ipcount where id >=? and id <= ?", 1, 5, 2,
      rs => {
        val id = rs.getInt(1)
        val province = rs.getString(2)
        val count = rs.getInt(3)
        (id, province, count)
      })

    println(result.collect().toBuffer)
    println(result.count())

    sc.stop()

  }

  //獲取資料庫連線函式
  val getConnection = () => {
    DriverManager.getConnection("jdbc:mysql://192.168.127.13/ip?charatorEncoding=utf-8", "root", "root")
  }
}