1. 程式人生 > >Spark中讀寫mysql資料庫

Spark中讀寫mysql資料庫

Spark中讀寫MySQL資料庫

一.使用Intellij編寫Spark程式讀取MySQL資料庫

1.在windows系統中,安裝有mysql資料庫。主要情況如下:

mysql> show databases;
+------------------------+
| Database               |
+------------------------+
| information_schema     |
| dbgirl                 |
| mydatabase             |
| mysql                  |
| performance_
schema | | runoob | | sakila | | test | +------------------------+ 10 rows in set (0.09 sec) mysql> use mydatabase; Database changed mysql> show tables; +----------------------+ | Tables_in_mydatabase | +----------------------+ | country | | course | | grade | | myworld | | score | | student | | teacher | | view_grade | +----------------------+
8 rows in set (0.00 sec)

2.現在準備讀取mydatabase.score中的資料
3.程式碼如下:

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.spark.{SparkConf, SparkContext}

object ReadFromMysql {
  //define a function to get an Connection of mysql
  def getConnection ():Connection={
    //you should the database'name not the database and table'
name DriverManager.getConnection("jdbc:mysql://localhost:3306/mydatabase","root","") } def main(args:Array[String]):Unit={ val conf = new SparkConf().setAppName("ReadFromMysql").setMaster("local") val sc = new SparkContext(conf) val connection = getConnection() val preparedStatement: PreparedStatement = connection.prepareStatement( "select * from score" ) val result: ResultSet = preparedStatement.executeQuery() println("sno\tcno\tdegree") while(result.next()){ print(result.getString("sno")+" ") print(result.getString("cno")+" ") print(result.getString("degree")+" ") println() } } }

4.執行結果如下:
這裡寫圖片描述

Intellij編寫Spark程式,往MySQL資料庫中寫入資料
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 1.insert data into table by programs
  */
object WriteToMysql {
  //get a connection to mysql database
  def getConnection():Connection={
    DriverManager.getConnection("jdbc:mysql://localhost:3306/mydatabase","root","")
  }

  def main(args:Array[String]): Unit ={
    val conf = new SparkConf().setAppName("WriteToMysql").setMaster("local")
    val sc = new SparkContext(conf)

    //the data should be inserted into MySQL
    //the database is named "mydatabase",the table's name is employee
    //the database and table should be created in advance
    val connection = getConnection()//invoke a function to get a connection
    val prepareSta: PreparedStatement = connection.prepareStatement("insert into employee values('LittleLawson',22, 'Enmoster')," +
      "('Ltt',22,'baidu'),('dinglei',40,'NetEase'),('Jack',52,'alibaba');");
    }
}
  • 執行結果如下:
    這裡寫圖片描述
    並且語句執行帶來的改變row有如下行數:
    這裡寫圖片描述