1. 程式人生 > >程式碼 | Spark讀取mongoDB資料寫入Hive普通表和分割槽表

程式碼 | Spark讀取mongoDB資料寫入Hive普通表和分割槽表

版本: 
spark 2.2.0 
hive 1.1.0 
scala 2.11.8 
hadoop-2.6.0-cdh5.7.0 
jdk 1.8 
MongoDB 3.6.4

一 原始資料及Hive表 

MongoDB資料格式

{
    "_id" : ObjectId("5af65d86222b639e0c2212f3"),
    "id" : "1",
    "name" : "lisi",
    "age" : "18",
    "deptno" : "01"
}

Hive普通表

create table mg_hive_test(
id string,
name string,
age string,
deptno string
)row format delimited fields terminated by '\t';

Hive分割槽表

create table  mg_hive_external(
id string,
name string,
age string
)
partitioned by (deptno string)
row format delimited fields terminated by '\t';

 

二 IDEA+Maven+Java 
依賴

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.mongodb</groupId>
      <artifactId>mongo-java-driver</artifactId>
      <version>3.6.3</version>
    </dependency>
    <dependency>
      <groupId>org.mongodb.spark</groupId>
      <artifactId>mongo-spark-connector_2.11</artifactId>
      <version>2.2.2</version>
    </dependency>

程式碼

package com.huawei.mongo;/*
 * @Author: Create by Achun
 *@Time: 2018/6/2 21:00
 *
 */

import com.mongodb.spark.MongoSpark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.bson.Document;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

public class sparkreadmgtohive {
    public static void main(String[] args) {
        //spark 2.x
        String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
        SparkSession spark = SparkSession.builder()
                .master("local[2]")
                .appName("SparkReadMgToHive")
                .config("spark.sql.warehouse.dir", warehouseLocation)
                .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/test.mgtest")
                .enableHiveSupport()
                .getOrCreate();
        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

        //spark 1.x
//        JavaSparkContext sc = new JavaSparkContext(conf);
//        sc.addJar("/Users/mac/zhangchun/jar/mongo-spark-connector_2.11-2.2.2.jar");
//        sc.addJar("/Users/mac/zhangchun/jar/mongo-java-driver-3.6.3.jar");
//        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkReadMgToHive");
//        conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/test.mgtest");
//        conf.set("spark. serializer","org.apache.spark.serializer.KryoSerialzier");
//        HiveContext sqlContext = new HiveContext(sc);
//        //create df from mongo
//        Dataset<Row> df = MongoSpark.read(sqlContext).load().toDF();
//        df.select("id","name","name").show();

        String querysql= "select id,name,age,deptno,DateTime,Job from mgtable b";
        String opType ="P";

        SQLUtils sqlUtils = new SQLUtils();
        List<String> column = sqlUtils.getColumns(querysql);

        //create rdd from mongo
        JavaRDD<Document> rdd = MongoSpark.load(sc);
        //將Document轉成Object
        JavaRDD<Object> Ordd = rdd.map(new Function<Document, Object>() {
            public Object call(Document document){
                List list = new ArrayList();
                for (int i = 0; i < column.size(); i++) {
                    list.add(String.valueOf(document.get(column.get(i))));
                }
                return list;

//                return list.toString().replace("[","").replace("]","");
            }
        });
        System.out.println(Ordd.first());
        //通過程式設計方式將RDD轉成DF
        List ls= new ArrayList();
        for (int i = 0; i < column.size(); i++) {
            ls.add(column.get(i));
        }
        String schemaString = ls.toString().replace("[","").replace("]","").replace(" ","");
        System.out.println(schemaString);

        List<StructField> fields = new ArrayList<StructField>();
        for (String fieldName : schemaString.split(",")) {
            StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
            fields.add(field);
        }
        StructType schema = DataTypes.createStructType(fields);

        JavaRDD<Row> rowRDD = Ordd.map((Function<Object, Row>) record -> {
            List fileds = (List) record;
//            String[] attributes = record.toString().split(",");
            return RowFactory.create(fileds.toArray());
        });

        Dataset<Row> df = spark.createDataFrame(rowRDD,schema);

        //將DF寫入到Hive中
        //選擇Hive資料庫
        spark.sql("use datalake");
        //註冊臨時表
        df.registerTempTable("mgtable");

        if ("O".equals(opType.trim())) {
            System.out.println("資料插入到Hive ordinary table");
            Long t1 = System.currentTimeMillis();
            spark.sql("insert into mgtohive_2 " + querysql + " " + "where b.id not in (select id from mgtohive_2)");
            Long t2 = System.currentTimeMillis();
            System.out.println("共耗時:" + (t2 - t1) / 60000 + "分鐘");
        }else if ("P".equals(opType.trim())) {

        System.out.println("資料插入到Hive  dynamic partition table");
        Long t3 = System.currentTimeMillis();
        //必須設定以下引數 否則報錯
        spark.sql("set hive.exec.dynamic.partition.mode=nonstrict");
        //depton為分割槽欄位   select語句最後一個欄位必須是deptno
        spark.sql("insert into mg_hive_external partition(deptno) select id,name,age,deptno from mgtable b where b.id not in (select id from mg_hive_external)");
        Long t4 = System.currentTimeMillis();
        System.out.println("共耗時:"+(t4 -t3)/60000+ "分鐘");
        }
        spark.stop();
    }

}

工具類

package com.huawei.mongo;/*
 * @Author: Create by Achun
 *@Time: 2018/6/3 23:20
 *
 */

import java.util.ArrayList;
import java.util.List;

public class SQLUtils {

    public List<String> getColumns(String querysql){
        List<String> column = new ArrayList<String>();
        String tmp = querysql.substring(querysql.indexOf("select") + 6,
                querysql.indexOf("from")).trim();
        if (tmp.indexOf("*") == -1){
            String cols[] = tmp.split(",");
            for (String c:cols){
                column.add(c);
            }
        }
        return column;
    }

    public String getTBname(String querysql){
        String tmp = querysql.substring(querysql.indexOf("from")+4).trim();
        int sx = tmp.indexOf(" ");
        if(sx == -1){
            return tmp;
        }else {
            return tmp.substring(0,sx);
        }
    }

}

 

三 錯誤解決辦法 
1 IDEA會獲取不到Hive的資料庫和表,將hive-site.xml放入resources檔案中。並且將resources設定成配置檔案(設定成功資料夾是藍色否則是灰色) 
file–>Project Structure–>Modules–>Source 

2 上面錯誤處理完後如果報JDO型別的錯誤,那麼檢查HIVE_HOME/lib下時候否mysql驅動,如果確定有,那麼就是IDEA獲取不到。解決方法如下:

 

將mysql驅動拷貝到jdk1.8.0_171.jdk/Contents/Home/jre/lib/ext路徑下(jdk/jre/lib/ext)

在IDEA專案External Libraries下的<1.8>裡面新增mysql驅動 

 

 

四 注意點 
由於將MongoDB資料表註冊成了臨時表和Hive表進行了關聯,所以要將MongoDB中的id欄位設定成索引欄位,否則效能會很慢。 
MongoDB設定索引方法:

db.getCollection('mgtest').ensureIndex({"id" : "1"}),{"background":true}

檢視索引:

db.getCollection('mgtest').getIndexes()

MongoSpark網址:https://docs.mongodb.com/spark-connector/current/java-api/

本文轉自 若澤大資料:https://mp.weixin.qq.com/s/7uQG-g8oilqJebynTS6Bkg