1. 程式人生 > >MongoDB 資料實時匯入 Elasticsearch

MongoDB 資料實時匯入 Elasticsearch

專案中需要實現對資料的全文檢索功能,資料主要儲存在了 MongoDB 中。MongoDB 本身是自帶文字檢索功能的,但是不支援中文,而且當資料量增大時,MongoDB 的檢索效率會大大降低。

由於最近在學習 Elasticsearch,而 Elasticsearch 的特性又十分適合全文檢索,於是就選擇了它。

那麼如何在對 MongoDB 進行增刪改查時,實時地將 MongoDB 的資料匯入 Elasticsearch 中呢?

經過幾番調研,發現 mongo-connecter 是實現這個功能的利器。這個是 MongoDB 官方的開發人員用 Python 寫的一個工具,目前支援將 MongoDB 的資料同步到 Solr、ElasticSearch 中,並且支援使用者自己擴充套件。

下面是具體的部署過程:

搭建 MongoDB 副本集

MongoDB 副本集的架構

副本集是 mongodb 提供的一種高可用解決方案。相對於原來的主從複製,副本集能自動感知 Primary 節點的下線,並提升其中一個 Secondary 節點作為 Primary 。整個過程對業務透明,同時也大大降低了運維的成本。

架構圖如下: Alt text

MongoDB 副本集的角色

Primary

預設情況下,讀寫都是在 Primary 上操作的。

Secondary
  • 通過 oplog 來重放 Primary 上的所有操作,擁有 Primary 節點資料的完整拷貝。
  • 預設情況下,不可寫,也不可讀。
  • 根據不同的需求,Secondary 又可配置為如下形式:
    • Priority 0 Replica Set Members - 優先順序為0的節點 優先順序為0的成員永遠不會被選舉為 primary。在mongoDB副本集中,允許給不同的節點設定不同的優先順序。優先順序的取值範圍為0-1000,可設定為浮點數,預設為1。擁有最高優先順序的成員會優先選舉為primary。譬如,在副本集中添加了一個優先順序為2的成員node3:27020,而其它成員的優先順序為1,只要node3:27020擁有最新的資料,那麼當前的primary就會自動降級,node3:27020將會被選舉為新的primary節點,但如果node3:27020中的資料不夠新,則當前primary節點保持不變,直到node3:27020的資料更新到最新。
    • Hidden Replica Set Members - 隱藏節點 隱藏節點的優先順序同樣為0,同時對客戶端不可見使用rs.status()和rs.config()可以看到隱藏節點,但是對於db.isMaster()不可見。客戶端連線到副本集時,會呼叫db.isMaster()命令來檢視可用成員資訊。所以,隱藏節點不會受到客戶端的讀請求。隱藏節點常用於執行特定的任務,譬如報表,備份。
    • Delayed Replica Set Members - 延遲節點 延遲節點會比primary節點延遲指定的時間(通過slaveDelay引數來指定)。延遲節點必須是隱藏節點。
Arbiter

仲裁節點,只是用來投票,且投票的權重只能為1,不復制資料,也不能提升為primary。仲裁節點常用於節點數量是偶數的副本集中。建議:通常將Arbiter部署在業務伺服器上,切忌將其部署在Primary節點或Secondary節點伺服器上。

注: 一個副本集最多有50個成員節點,7個投票節點。

MongoDB 副本集的搭建

建立資料目錄和日誌目錄
mkdir -p data/27017/ data/27018/ data/27019/
mkdir -p log/
啟動 mongod 例項
mongod --replSet myrs --dbpath data/27017/ --port 27017 --logpath log/27017.log --fork
mongod --replSet myrs --dbpath data/27018/ --port 27018 --logpath log/27018.log --fork
mongod --replSet myrs --dbpath data/27019/ --port 27019 --logpath log/27019.log --fork
搭建副本集

連線副本集任意成員,如 27017 埠:

mongo --port 27017

初始化副本集:

> rs.initiate()
{
	"info2" : "no configuration specified. Using a default configuration for the set",
	"me" : "ubuntu:27017",
	"ok" : 1
}

新增節點:

myrs:SECONDARY> rs.add("ubuntu:27018")
{ "ok" : 1 }

新增仲裁節點:

myrs:PRIMARY> rs.addArb("ubuntu:27019")
{ "ok" : 1 }

通過 rs.conf() 檢視副本集配置資訊:

myrs:PRIMARY> rs.conf()
{
	"_id" : "myrs",
	"version" : 3,
	"protocolVersion" : NumberLong(1),
	"members" : [
		{
			"_id" : 0,
			"host" : "ubuntu:27017",
			"arbiterOnly" : false,
			"buildIndexes" : true,
			"hidden" : false,
			"priority" : 1,
			"tags" : {

			},
			"slaveDelay" : NumberLong(0),
			"votes" : 1
		},
		{
			"_id" : 1,
			"host" : "ubuntu:27018",
			"arbiterOnly" : false,
			"buildIndexes" : true,
			"hidden" : false,
			"priority" : 1,
			"tags" : {

			},
			"slaveDelay" : NumberLong(0),
			"votes" : 1
		},
		{
			"_id" : 2,
			"host" : "ubuntu:27019",
			"arbiterOnly" : true,
			"buildIndexes" : true,
			"hidden" : false,
			"priority" : 1,
			"tags" : {

			},
			"slaveDelay" : NumberLong(0),
			"votes" : 1
		}
	],
	"settings" : {
		"chainingAllowed" : true,
		"heartbeatIntervalMillis" : 2000,
		"heartbeatTimeoutSecs" : 10,
		"electionTimeoutMillis" : 10000,
		"catchUpTimeoutMillis" : 60000,
		"getLastErrorModes" : {

		},
		"getLastErrorDefaults" : {
			"w" : 1,
			"wtimeout" : 0
		},
		"replicaSetId" : ObjectId("5a20bb163d61965112afa99c")
	}
}
驗證副本集的可用性

在 primary 中建立一個數據庫,並插入一個文件:

myrs:PRIMARY> use test
switched to db test
myrs:PRIMARY> db.blog.insert({"title": "hello world"});
WriteResult({ "nInserted" : 1 })

在 secondary 中進行驗證:

myrs:SECONDARY> show dbs
2017-12-01T10:17:05.138+0800 E QUERY    [thread1] Error: listDatabases failed:{
	"ok" : 0,
	"errmsg" : "not master and slaveOk=false",
	"code" : 13435,
	"codeName" : "NotMasterNoSlaveOk"
} :
[email protected]/mongo/shell/utils.js:25:13
[email protected]/mongo/shell/mongo.js:62:1
[email protected]/mongo/shell/utils.js:769:19
[email protected]/mongo/shell/utils.js:659:15
@(shellhelp2):1:1
myrs:SECONDARY> rs.slaveOk()
myrs:SECONDARY> show dbs
admin  0.000GB
local  0.000GB
test   0.000GB
myrs:SECONDARY> use test
switched to db test
myrs:SECONDARY> db.blog.find()
{ "_id" : ObjectId("5a20bc0b39b7d2896cd3765a"), "title" : "hello world" }

因仲裁節點實際上並不儲存任何資料,所以無法通過連線仲裁節點檢視剛剛插入的文件

myrs:ARBITER> show dbs
2017-12-01T12:36:57.429+0800 E QUERY    [thread1] Error: listDatabases failed:{
	"ok" : 0,
	"errmsg" : "not master and slaveOk=false",
	"code" : 13435,
	"codeName" : "NotMasterNoSlaveOk"
} :
[email protected]/mongo/shell/utils.js:25:13
[email protected]/mongo/shell/mongo.js:62:1
[email protected]/mongo/shell/utils.js:769:19
[email protected]/mongo/shell/utils.js:659:15
@(shellhelp2):1:1
myrs:ARBITER> rs.slaveOk()
myrs:ARBITER> show dbs
local  0.000GB

啟動 elasticsearch

安裝並執行 mongo-connector

安裝 pip

sudo apt-get install python-pip

安裝 mongo-connector

pip install mongo-connector

執行 mongo-connector

mongo-connector -m 127.0.0.1:27017 -t 127.0.0.1:9200 -d elastic_doc_manager

即可在 elasticsearch 中看到資料,同時可以動態(有一點延遲)看到 MongoDB 的增刪改查。