1. 程式人生 > >(dataframe)利用dataframe來操作MySQL資料庫【Java版純程式碼】

(dataframe)利用dataframe來操作MySQL資料庫【Java版純程式碼】

package com.bjsxt;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.collections.map.HashedMap;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.SQLContext;
import org.dmg.pmml.Scorecard;
import org.apache.spark.sql.SaveMode;

public class CreateDFFromMysql {
    public static void main(String[] args) {
        SparkConf conf=new SparkConf().setAppName("mysql").setMaster("local");
        JavaSparkContext sc=new JavaSparkContext(conf);
        /**
         * 配置join或者聚合操作shuffle資料時分割槽的數量
         */
        conf.set("spark.sql.shuffle.partitions", "1");
        SQLContext sqlContext=new SQLContext(sc);
        /**
         * 第一種方式:
         * 讀取MySQL資料庫,載入為DataFrame
         */
        Map<String, String> options = new HashMap<String,String>();
        options.put("url", "jdbc:mysql://192.168.198.21:3306/spark");
        options.put("driver", "com.mysql.jdbc.Driver");
        options.put("user", "root");
        options.put("password", "123");
        options.put("dbtable", "person");
        DataFrame person = sqlContext.read().format("jdbc").options(options).load();
        person.show();
        person.registerTempTable("person1");
        /**
         * 第二種方式:
         * 讀取MySQL資料表載入為:dataframe
         */
        DataFrameReader reader = sqlContext.read().format("jdbc");
        reader.option("url", "jdbc:mysql://192.168.198.21:3306/spark");
        reader.option("driver", "com.mysql.jdbc.Driver");
        reader.option("user", "root");
        reader.option("password", "123");
        reader.option("dbtable", "score");
        DataFrame score = reader.load();          
        score.show();
        score.registerTempTable("score1");
        DataFrame result = sqlContext.sql("select person1.id,person1.name,person1.age,score1.score from person1,score1 where person1.name=score1.name");
        result.show();
        /**
         * 將結果儲存在mysql中
         */
        Properties properties = new Properties();
        properties.setProperty("user", "root");
        properties.setProperty("password", "123");
        /**
         * SaveMode
         * overwrite:覆蓋
         * append:追加
         * errorIfExist:如果存在就報錯
         * Ignore:如果存在就忽略
         */
        result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.198.21:3306/spark","result",properties);
        System.out.println("————————————finish——————————");
        sc.stop();
    }

}