java及spark2.X連接mongodb3.X單機或集群的方法(帶認證及不帶認證)
首先,我們明確的是訪問Mongos和訪問單機Mongod並沒有什麽區別。接下來的方法都是既可以訪問mongod又可以訪問Mongos的。
另外,讀作java寫作scala,反正大家都看得懂......大概?
1、不帶認證集群的連接方法(JAVAscala):
首先是創建連接的方法,我們先聲明一個client,然後指定訪問的DB和collection:
private lazy val mongo = new MongoClient("192.168.2.51", 27017) private lazy val db = mongo.getDatabase("test")private lazy val dbColl = db.getCollection("origin2")
然後我們讀取數據:
import com.mongodb.client.model.Filters.{eq => eqq} val docs = dbColl.find(eqq("basiclabel.procedure", "second")).iterator()
額。。上面那段代碼是帶filter過濾的讀取數據。首先Import com.mongodb.client.model.Filters.eq並把eq重命名為eqq,然後通過dbColl.find(Bson)方法讀取指定數據。剩下的就是正常的叠代器的使用方法了,docs獲取出來的數據是Iterator[Document]。
然後我們更新數據:
dbColl.updateOne(eqq("_id", x.get("_id")), set("segdata", fenduan(str, name)))
上面這段代碼是說找到_id對應的數據,並將其中一個字段set為一個新的值,這個值可以為Document,String,Int,List等一系列數據結構。我這裏fenduan方法返回的是一個Document,做了一層嵌套。
至於插入數據更為簡單:
dbColl.insertOne(doc)
2、不帶認證的spark讀取方法(scala,理直氣壯)
兩種方式,其一是在創建sparksession的時候(SparkContext可以使用第二種方法,醒醒兄弟,2017年了
val spark = SparkSession.builder() .master("spark://192.168.2.51:7077") .config(new SparkConf().setJars(Array("hdfs://192.168.2.51:9000/mongolib/mongo-spark-connector_2.11-2.0.0.jar", "hdfs://192.168.2.51:9000/mongolib/bson-3.4.2.jar", "hdfs://192.168.2.51:9000/mongolib/mongo-java-driver-3.4.2.jar", "hdfs://192.168.2.51:9000/mongolib/mongodb-driver-3.4.2.jar", "hdfs://192.168.2.51:9000/mongolib/mongodb-driver-core-3.4.2.jar", "hdfs://192.168.2.51:9000/mongolib/commons-io-2.5.jar", "hdfs://192.168.2.51:9000/segwithorigin2.jar"))) .config("spark.cores.max", 80) .config("spark.executor.cores", 16) .config("spark.executor.memory", "32g") .config("spark.mongodb.input.uri", "mongodb://192.168.2.51:27017/test.origin2") // .config("spark.mongodb.output.uri", "mongodb://192.168.12.161:27017/test.origin2") .getOrCreate()
val rdd = MongoSpark.builder().sparkSession(spark).pipeline(Seq(`match`(eqq("basiclabel.procedure", "second")))).build.toRDD()
第二種方式也較為簡單,創建一個ReadConfig,這個是connector提供的一個單例類,可以設置很多參數,例如(此時不必指定"spark.mongodb.input.uri"),如下所示是通過sparkcontext和通過sparksession兩種方式讀取數據的方法:
val readConfig = ReadConfig(Map( "uri" -> "mongodb://192.168.2.48:27017/", "database" -> "test", "collection" -> "test" )) val r2 = MongoSpark.load(spark, readConfig).rdd // val r2 = MongoSpark.load(spark.sparkContext, readConfig)
3、帶認證的Java讀取方法:
帶認證的需要先創建一個MongoURI,在URI裏把用戶名,密碼和認證庫都指定清楚。這種方法通用性比較強,因為spark也這麽用,如果使用其他方式認證要麽是必須使用庫等於認證庫,要麽是沒有通用性。這種方法可以在admin認證然後去讀test的數據,就很好。
//帶認證的需要先創建一個MongoURI,在URI裏把用戶名,密碼和認證庫都指定清楚,至於為什麽需要指定庫建議看上一篇博客
val mongoURI = new MongoClientURI("mongodb://gaoze:[email protected]:27017/?authSource=admin") //val mongoURI = new MongoClientURI("mongodb://192.168.2.48:27017/"); lazy val mongo = new MongoClient(mongoURI) private lazy val db = mongo.getDatabase("test") private lazy val dbColl = db.getCollection("test")
//然後和1一樣
4、帶認證的Spark讀取方法:
同3一樣,在URI裏加入用戶名密碼和庫就行了:
val spark = SparkSession.builder() .master("spark://192.168.2.51:7077") .config(new SparkConf().setJars(Array("hdfs://192.168.2.51:9000/mongolib/mongo-spark-connector_2.11-2.0.0.jar", "hdfs://192.168.2.51:9000/mongolib/bson-3.4.2.jar", "hdfs://192.168.2.51:9000/mongolib/mongo-java-driver-3.4.2.jar", "hdfs://192.168.2.51:9000/mongolib/mongodb-driver-3.4.2.jar", "hdfs://192.168.2.51:9000/mongolib/mongodb-driver-core-3.4.2.jar", "hdfs://192.168.2.51:9000/mongolib/commons-io-2.5.jar", "hdfs://192.168.2.51:9000/segwithorigin2.jar"))) .config("spark.cores.max", 80) .config("spark.executor.cores", 16) .config("spark.executor.memory", "32g")
//這裏這個配置項指定了用戶名gaoze,密碼gaolaoban,認證庫admin .config("spark.mongodb.input.uri", "mongodb://gaoze:[email protected]:27017/test.origin2?authSource=admin") .getOrCreate() val rdd = MongoSpark.builder().sparkSession(spark).pipeline(Seq(`match`(eqq("basiclabel.procedure", "second")))).build.toRDD()
或者:
//這裏指定了用戶名rw,密碼1,認證庫test
val readConfig = ReadConfig(Map( "uri" -> "mongodb://rw:[email protected]:27017/?authSource=test", "database" -> "test", "collection" -> "test" ))
val rdd = MongoSpark.builder().sparkSession(spark).readConfig(readConfig).build().toRDD()
//val r2 = MongoSpark.load(spark.sparkContext, readConfig)
java及spark2.X連接mongodb3.X單機或集群的方法(帶認證及不帶認證)