1. 程式人生 > >java及spark2.X連接mongodb3.X單機或集群的方法(帶認證及不帶認證)

java及spark2.X連接mongodb3.X單機或集群的方法(帶認證及不帶認證)

連接 通過 ava 更新數據 ati out client data 插入數據

首先,我們明確的是訪問Mongos和訪問單機Mongod並沒有什麽區別。接下來的方法都是既可以訪問mongod又可以訪問Mongos的。

另外,讀作java寫作scala,反正大家都看得懂......大概?

1、不帶認證集群的連接方法(JAVAscala):

  首先是創建連接的方法,我們先聲明一個client,然後指定訪問的DB和collection:

    private lazy val mongo = new MongoClient("192.168.2.51", 27017)
    private lazy val db = mongo.getDatabase("test")
    
private lazy val dbColl = db.getCollection("origin2")

  然後我們讀取數據:

import com.mongodb.client.model.Filters.{eq => eqq}
val docs = dbColl.find(eqq("basiclabel.procedure", "second")).iterator()

  額。。上面那段代碼是帶filter過濾的讀取數據。首先Import com.mongodb.client.model.Filters.eq並把eq重命名為eqq,然後通過dbColl.find(Bson)方法讀取指定數據。剩下的就是正常的叠代器的使用方法了,docs獲取出來的數據是Iterator[Document]。

  然後我們更新數據:

dbColl.updateOne(eqq("_id", x.get("_id")), set("segdata", fenduan(str, name)))

  上面這段代碼是說找到_id對應的數據,並將其中一個字段set為一個新的值,這個值可以為Document,String,Int,List等一系列數據結構。我這裏fenduan方法返回的是一個Document,做了一層嵌套。

  至於插入數據更為簡單: 

dbColl.insertOne(doc)

2、不帶認證的spark讀取方法(scala,理直氣壯)

  兩種方式,其一是在創建sparksession的時候(SparkContext可以使用第二種方法,醒醒兄弟,2017年了

),直接指定"spark.mongodb.input.uri"。然後使用正常的MongoSpark來讀取數據。(pipeline裏面是過濾條件,願意嘗試的各位可以自己試試filter下的其他方法)。使用rdd是因為rdd更適合進行map和flatmap等一系列精細的轉換操作,如果只需要讀數據,可以使用MongoSpark.read(spark)方法,直接獲取DataFrameReader。

val spark = SparkSession.builder()
        .master("spark://192.168.2.51:7077")
        .config(new SparkConf().setJars(Array("hdfs://192.168.2.51:9000/mongolib/mongo-spark-connector_2.11-2.0.0.jar",
                "hdfs://192.168.2.51:9000/mongolib/bson-3.4.2.jar",
                "hdfs://192.168.2.51:9000/mongolib/mongo-java-driver-3.4.2.jar",
                "hdfs://192.168.2.51:9000/mongolib/mongodb-driver-3.4.2.jar",
                "hdfs://192.168.2.51:9000/mongolib/mongodb-driver-core-3.4.2.jar",
                "hdfs://192.168.2.51:9000/mongolib/commons-io-2.5.jar",
                "hdfs://192.168.2.51:9000/segwithorigin2.jar")))
        .config("spark.cores.max", 80)        
        .config("spark.executor.cores", 16)
        .config("spark.executor.memory", "32g")
        .config("spark.mongodb.input.uri", "mongodb://192.168.2.51:27017/test.origin2")
 //       .config("spark.mongodb.output.uri", "mongodb://192.168.12.161:27017/test.origin2")
        .getOrCreate()
val rdd = MongoSpark.builder().sparkSession(spark).pipeline(Seq(`match`(eqq("basiclabel.procedure", "second")))).build.toRDD()

  第二種方式也較為簡單,創建一個ReadConfig,這個是connector提供的一個單例類,可以設置很多參數,例如(此時不必指定"spark.mongodb.input.uri"),如下所示是通過sparkcontext和通過sparksession兩種方式讀取數據的方法:

      val readConfig = ReadConfig(Map(
              "uri" -> "mongodb://192.168.2.48:27017/",
              "database" -> "test",
              "collection" -> "test"
              ))
    val r2 = MongoSpark.load(spark, readConfig).rdd
//    val r2 = MongoSpark.load(spark.sparkContext, readConfig)

3、帶認證的Java讀取方法:

  帶認證的需要先創建一個MongoURI,在URI裏把用戶名,密碼和認證庫都指定清楚。這種方法通用性比較強,因為spark也這麽用,如果使用其他方式認證要麽是必須使用庫等於認證庫,要麽是沒有通用性。這種方法可以在admin認證然後去讀test的數據,就很好。
//帶認證的需要先創建一個MongoURI,在URI裏把用戶名,密碼和認證庫都指定清楚,至於為什麽需要指定庫建議看上一篇博客
val mongoURI = new MongoClientURI("mongodb://gaoze:[email protected]:27017/?authSource=admin") //val mongoURI = new MongoClientURI("mongodb://192.168.2.48:27017/"); lazy val mongo = new MongoClient(mongoURI) private lazy val db = mongo.getDatabase("test") private lazy val dbColl = db.getCollection("test")
//然後和1一樣

4、帶認證的Spark讀取方法:

  同3一樣,在URI裏加入用戶名密碼和庫就行了:

val spark = SparkSession.builder()
        .master("spark://192.168.2.51:7077")
        .config(new SparkConf().setJars(Array("hdfs://192.168.2.51:9000/mongolib/mongo-spark-connector_2.11-2.0.0.jar",
                "hdfs://192.168.2.51:9000/mongolib/bson-3.4.2.jar",
                "hdfs://192.168.2.51:9000/mongolib/mongo-java-driver-3.4.2.jar",
                "hdfs://192.168.2.51:9000/mongolib/mongodb-driver-3.4.2.jar",
                "hdfs://192.168.2.51:9000/mongolib/mongodb-driver-core-3.4.2.jar",
                "hdfs://192.168.2.51:9000/mongolib/commons-io-2.5.jar",
                "hdfs://192.168.2.51:9000/segwithorigin2.jar")))
        .config("spark.cores.max", 80)        
        .config("spark.executor.cores", 16)
        .config("spark.executor.memory", "32g")
//這裏這個配置項指定了用戶名gaoze,密碼gaolaoban,認證庫admin .config(
"spark.mongodb.input.uri", "mongodb://gaoze:[email protected]:27017/test.origin2?authSource=admin") .getOrCreate() val rdd = MongoSpark.builder().sparkSession(spark).pipeline(Seq(`match`(eqq("basiclabel.procedure", "second")))).build.toRDD()

或者:

//這裏指定了用戶名rw,密碼1,認證庫test
val readConfig = ReadConfig(Map( "uri" -> "mongodb://rw:[email protected]:27017/?authSource=test", "database" -> "test", "collection" -> "test" ))

val rdd = MongoSpark.builder().sparkSession(spark).readConfig(readConfig).build().toRDD()
//val r2 = MongoSpark.load(spark.sparkContext, readConfig)

 

java及spark2.X連接mongodb3.X單機或集群的方法(帶認證及不帶認證)