1. 程式人生 > >spark 連線mysql資料庫 讀取、寫入資料

spark 連線mysql資料庫 讀取、寫入資料

 資料庫連線並獲取資料:

 JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("sparkApp").setMaster("local[5]"));
        SQLContext sqlContext = new SQLContext(sparkContext);

 Properties Properties = new Properties();
            Properties.put("user", "資料庫使用者名稱");
            Properties.put("password", "資料庫密碼");
            Properties.put("driver", "資料庫驅動");

DataFrame DF = sqlContext.read().jdbc("資料庫地址","表名",Properties).;
        

DF: 從資料庫中獲取的資料

資料庫寫入資料:

       /**
         * 1、建立型別為Row的RDD
         */
        JavaRDD<List<String>> logDateRdd = sparkContext.parallelize(logDate);
        JavaRDD<Row> RDD = logDateRdd.map(new Function<List<String>,Row>(){
            @Override
            public Row call(List<String> logDate) throws Exception {
                return RowFactory.create(
                        logDate.get(0),
                        logDate.get(1)
                );
            }
        });

        /**
         * 2、動態構造DataFrame的元資料。
         */
        List structFields = new ArrayList();
        structFields.add(DataTypes.createStructField("col1",DataTypes.StringType,false));
        structFields.add(DataTypes.createStructField("col2",DataTypes.StringType,true));

        //構建StructType,用於最後DataFrame元資料的描述
        StructType structType = DataTypes.createStructType(structFields);

        /**
         * 3、基於已有的元資料以及RDD<Row>來構造DataFrame
         */
        DataFrame DF = sqlContext.createDataFrame(RDD,structType);

        /**
         * 4、將資料寫入到e_trade_acct_data表中
         */
        DF.write().mode("append").jdbc("資料庫地址","表名","存有使用者名稱、密碼、驅動的Properties類");

 sparkContext.parallelize(logDate): 將資料轉成RDD

structFields : 裡面的col1 、col2為資料庫欄位名,DateTypes 表示資料型別,資料型別要保持一致。false:表示不能為null .true表示可為null