1. 程式人生 > >spark讀取mongodb資料

spark讀取mongodb資料

   spark2.x向mongodb中讀取寫入資料,讀取寫入相關引數參考https://docs.mongodb.com/spark-connector/current/configuration/#cache-configuration

從mongodb中讀取資料時指定資料分割槽欄位,分割槽大小提高讀取效率, 當需要過濾部分資料集的情況下使用Dataset/SQL的方式filter,Mongo Connector會建立aggregation pipeline在mongodb端進行過濾,然後再傳回給spark進行優化處理

val spark = SparkSession.builder
          .appName(this.getClass.getName().stripSuffix("$"))
          .getOrCreate()
val inputUri="mongodb://test:
[email protected]
:27017/test.articles" val df = spark.read.format("com.mongodb.spark.sql").options( Map("spark.mongodb.input.uri" -> inputUri, "spark.mongodb.input.partitioner" -> "MongoPaginateBySizePartitioner", "spark.mongodb.input.partitionerOptions.partitionKey" -> "_id", "spark.mongodb.input.partitionerOptions.partitionSizeMB"-> "32")) .load() val currentTimestamp = System.currentTimeMillis() val originDf = df.filter(df("updateTime") < currentTimestamp && df("updateTime") >= currentTimestamp - 1440 * 60 * 1000) .select("_id", "content", "imgTotalCount").toDF("id", "content", "imgnum")

向mongo裡面寫資料可以使用兩種不同的方式mode=overwrite,append
overwirite 以覆蓋的方式寫入
append    以追加的方式寫入
val outputUri="mongodb://test:[email protected]:27017/test.article_garbage" 
saveDF.write.options(Map("spark.mongodb.output.uri"-> outputUri))
          .mode("append")
          .format("com.mongodb.spark.sql")
          .save()

spark操作mongodb的scala-api文件:https://docs.mongodb.com/spark-connector/current/scala-api/

相關推薦

程式碼 | Spark讀取mongoDB資料寫入Hive普通表和分割槽表

版本: spark 2.2.0  hive 1.1.0  scala 2.11.8  hadoop-2.6.0-cdh5.7.0  jdk 1.8  MongoDB 3.6.4 一 原始資料及Hive表  MongoDB資

spark讀取mongodb資料

   spark2.x向mongodb中讀取寫入資料,讀取寫入相關引數參考https://docs.mongodb.com/spark-connector/current/configuration/#cache-configuration 從mongodb中讀取資料時指

spark 讀取mongodb失敗,報executor time out 和GC overhead limit exceeded 異常

資源 base for read 就是 conn context mon getc 代碼: import com.mongodb.spark.config.ReadConfig import com.mongodb.spark.sql._ val config = sql

spark讀取hive資料-java

需求:將hive中的資料讀取出來,寫入es中。 環境:spark 2.0.2 1. SparkSession裡設定enableHiveSupport() SparkConf conf = new SparkConf().setAppName("appName").setMast

spark讀取es資料

spark-2.0.2 scala-2.11.8 <!-- https://mvnrepository.com/artifact/org.webjars.npm/spark-md5 --> <dependency> <groupId>org.apa

0016-Avro序列化&反序列化和Spark讀取Avro資料

溫馨提示:要看高清無碼套圖,請使用手機開啟並單擊圖片放大檢視。 1.簡介 本篇文章主要講如何使用java生成Avro格式資料以及如何通過spark將Avro資料檔案轉換成DataSet和DataFrame進行操作。 1.1Apache Arvo是什麼? Apache Avro 是一個數據序列

spark 讀取 hdfs 資料分割槽規則

下文以讀取 parquet 檔案 / parquet hive table 為例: hive metastore 和 parquet 轉化的方式通過 spark.sql.hive.convertMetastoreParquet 控制,預設為 true。 如果設定為 true ,會

Mongodb學習筆記 --- python讀取mongodb資料

wechat:812716131 ------------------------------------------------------ 技術交流群請聯絡上面wechat ----------------------------------------------

spark讀取mongodb並解決迴圈巢狀array的拆分,屬性不存在整個物件丟失問題。

1、建立SQLContext SQLContext sqlContext = new SQLContext(sc) 2、拼接mongodb連線字串 if(UserName!=null && !"".equals(UserName))

spark讀取kafka資料(兩種方式比較及flume配置檔案)

a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.channels.c1.type = memory a1.channels.c1.capacity

scala實戰之spark讀取mysql資料表並存放到mysql庫中程式設計例項

今天簡單講解一下應用spark1.5.2相關讀取mysql資料到DataFrame的介面以及將DF資料存放到mysql中介面實現例項 同樣我們的程式設計開發環境是不需要安裝spark的,但是需要一臺安裝了mysql的伺服器,我這裡直接在本機安裝了一個mysql,還有就是sc

spark讀取redis資料(互動式,scala單機版,java單機版)

互動式 第一步:向redis中新增資料 第二步:將jedis jar包放入~/lib目錄下,開啟spark服務 第三步:通過spark-shell讀取redis資料,並做相應處理

spark讀取kafka資料寫入hbase

package com.prince.demo.test import java.util.UUID import com.typesafe.config.{Config, ConfigFactory} import org.apache.hadoop.hbase.HBa

Spark學習-SparkSQL--06-spark讀取HBase資料報異常java.io.NotSerializableException

1.準備工作,安裝好HABSE之後,執行Hbase shell create ‘表名稱’, ‘列名稱1’,’列名稱2’,’列名稱N’ create ‘表名稱’,’列族名稱’ 在hbase中列是可以動態新增的,只需要有個列族就可以了 create

mongo-spark-讀取不同的庫資料和寫入不同的庫中

mongo-spark-讀取不同的庫資料和寫入不同的庫中 package com.example.app import com.mongodb.spark.config.{ReadConfig, WriteConfig} import com.mongodb.spark.sql._ object

從原始碼看Spark讀取Hive表資料小檔案和分塊的問題

原文連結:https://mp.csdn.net/postedit/82423831  使用Spark進行資料分析和計算早已成趨勢,你是否關注過讀取一張Hive表時Task數為什麼是那麼多呢?它跟什麼有關係呢? 最近剛好碰到這個問題,而之前對此有些模糊,所以做了些整理,希望大家拍磚探討

spark從hbase讀取寫入資料

將RDD寫入hbase 注意點: 依賴: 將lib目錄下的hadoop開頭jar包、hbase開頭jar包新增至classpath 此外還有lib目錄下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少會提示hbase Rpc

【原創】大資料基礎之Spark(7)spark讀取檔案split過程(即RDD分割槽數量)

spark 2.1.1 spark初始化rdd的時候,需要讀取檔案,通常是hdfs檔案,在讀檔案的時候可以指定最小partition數量,這裡只是建議的數量,實際可能比這個要大(比如檔案特別多或者特別大時),也可能比這個要小(比如檔案只有一個而且很小時),如果沒有指定最小partition數量,初始化完成的

spark讀取多個資料夾(巢狀)下的多個檔案

在正常呼叫過程中,難免需要對多個資料夾下的多個檔案進行讀取,然而之前只是明確了Spark具備讀取多個檔案的能力。 針對多個資料夾下的多個檔案,以前的做法是先進行資料夾的遍歷,然後再進行各個資料夾目錄的讀取。 今天在做測試的時候,居然發現spark原生就支援這樣的能力。

spark streaming讀取kafka資料,記錄offset

如下是pom.xml檔案<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocati