MongoDB Spark Connector 實戰指南
阿新 • • 發佈:2019-09-30
Why Spark with MongoDB?
- 高效能,官方號稱 100x faster,因為可以全記憶體執行,效能提升肯定是很明顯的
- 簡單易用,支援 Java、Python、Scala、SQL 等多種語言,使得構建分析應用非常簡單
- 統一構建 ,支援多種資料來源,通過 Spark RDD 遮蔽底層資料差異,同一個分析應用可運行於不同的資料來源;
- 應用場景廣泛,能同時支援批處理以及流式處理
MongoDB Spark Connector 為官方推出,用於適配 Spark 操作 MongoDB 資料;本文以 Python 為例,介紹 MongoDB Spark Connector 的使用,幫助你基於 MongoDB 構建第一個分析應用。
準備 MongoDB 環境
安裝 MongoDB 參考 Install MongoDB Community Edition on Linux
mkdir mongodata
mongod --dbpath mongodata --port 9555
準備 Spark python 環境
下載 Spark
cd /home/mongo-spark wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz tar zxvf spark-2.4.4-bin-hadoop2.7.tgz
設定 Spark 環境變數
export SPARK_HOME=/home/mongo-spark/spark-2.4.4-bin-hadoop2.7
export PATH=$PATH:/home/mongo-spark/spark-2.4.4-bin-hadoop2.7/bin
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH=$SPARK_HOME/python:$PATH
執行 Spark RDD 示例
# count.py from pyspark import SparkContext sc = SparkContext("local", "count app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) counts = words.count() $SPARK_HOME/bin/spark-submit count.py Number of elements in RDD → 8
如果上述程式執行成功,說明 Spark python 環境準備成功,還可以測試 Spark 的其他 RDD 操作,比如 collector、filter、map、reduce、join 等,更多示例參考 PySpark - Quick Guide
Spark 操作 MongoDB 資料
參考 Spark Connector Python Guide
準備測試資料 test.coll01 插入3條測試資料,test.coll02 未空
mongo --port 9555
> db.coll01.find()
{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }
> db.coll02.find()
準備操作指令碼,將輸入集合的資料按條件進行過濾,寫到輸出集合
# mongo-spark-test.py
from pyspark.sql import SparkSession
# Create Spark Session
spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1:9555/test.coll01") \
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1:9555/test.coll") \
.getOrCreate()
# Read from MongoDB
df = spark.read.format("mongo").load()
df.show()
# Filter and Write
df.filter(df['qty'] >= 10).write.format("mongo").mode("append").save()
# Use SQL
# df.createOrReplaceTempView("temp")
# some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
# some_fruit.show()
執行指令碼
$SPARK_HOME/bin/spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 mongo-spark-test.py
mongo --port 9555
> db.coll02.find()
{ "_id" : 2, "qty" : 10, "type" : "orange" }
{ "_id" : 3, "qty" : 15, "type" : "banana" }
本文為雲棲社群原創內容,未經