MongoDB Aggregation 業務場景實戰

對於技術人員來說,“管道” 相信大家都不會感到陌生,在很多技術領域都有管道的概念,例如Linux 管道,CI/CD 管道。同樣的,MongoDB 2.2 版本也新增了聚合管道功能,雖然功能釋出已久,但是社群的複雜場景的實踐並不多,給大家造成了 聚合管道 “不好用”的錯覺 。實際在業務場景中,適當的運用聚合往往會帶來事半功倍的效果。
定義
要想了解聚合管道在業務場景中的使用,首先需要了解 聚合管道 的定義:
聚合管道用於資料處理,每個文件通過一個或者是多個階段組成,可以對每個分組進行 分組 和 過濾 等功能,然後經過一系列處理,輸出相應的結果。

通過這張圖,可以清晰的瞭解到 聚合管道 的處理過程,我們常用的管道操作符一般有以下這些:
- $match 主要用於對文件集合的篩選
- $project 主要用於從子文件中提取欄位,可以重新命名欄位,也可以移除欄位
- $group 主要用於根據文件的特定欄位進行分組
- $unwind 主要用於分割陣列嵌入到自己的頂層檔案
- $lookup 主要用於兩個集合之間的左連線操作
- $skip 接受一個數字 n,丟棄結果集中的前 n 個文件
- $limit 接受一個數字 n,返回結果集中的前 n 個文件
- $sort 主要用於結果集的排序
應用
看完了各種各樣的管道操作符,或許有的人在想怎麼把它利用在實際業務場景中呢?下面我就通過客戶管理系統介紹一下聚合管道的最佳實踐,大家聽到客戶管理系統可能有些陌生,它還有一個“別名” CRM。CRM 系統中存在機會、客戶、聯絡人這三個大的物件,物件之間都是存在關聯關係的,機會可以關聯多個聯絡人,可以關聯一個客戶,而聯絡人和客戶是一一對應的。通過三個基本物件我們可以衍生出很多聚合業務場景,基礎架構圖如下所示:

圖一:基礎架構圖
如上圖所示,我們業務場景中經常會出現篩選資料的需求,如條件篩選和自定義欄位查詢等需求,根據特定的條件篩選出我們想要的資料。針對不同的業務需求,我們一般會涉及到以下場景:
- 基礎物件查詢
- 表 join 查詢
- 分類統計
- 巢狀物件排序
- ...
說到了常見的應用場景,下面也介紹一下我們的系統業務資料模型:
- 機會資料模型
db.deals { '_id': ObjectId, 'title': String,// 機會標題 'status': String,// 機會狀態 'peoples': Array,// 關聯聯絡人 '_organizationId': ObjectId,// 關聯客戶 'createdTime': Date,// 建立時間 'updatedTime': Date,// 建立時間 'owner': Object// 擁有者 } 複製程式碼
- 聯絡人資料模型
db.peoples { '_id': ObjectId, 'name': String,// 聯絡人名稱 'phone': Array,// 電話 'email': Array,// 郵箱 'isArchive': Boolean,// 歸檔狀態 'createdTime': Date,// 建立時間 'updatedTime': Date,// 建立時間 'owner': Object// 擁有者 } 複製程式碼
- 客戶資料模型
db.organizations { '_id': ObjectId, 'name': String,// 客戶名稱 'address': Array,// 地址 'isArchive': Boolean,// 歸檔狀態 'createdTime': Date,// 建立時間 'updatedTime': Date,// 建立時間 'owner': Object// 擁有者 } 複製程式碼
有了資料模型,我們就可以用它來做資料聚合了,下面會列舉出客戶管理系統中常用的資料聚合實踐。
- 在 CRM 系統中,作為銷售管理,他管轄的地區在指定時間生成的機會是他所關注的。如果他需要篩選出2018年5月1日之後建立的機會的跟進資訊,可以通過建立時間的篩選實現他的目標,聚合管道語句如下:
涉及到的組合: project
db.deals.aggregate([ { $match: { createdTime: { '$gte': ISODate('2018-05-01') } } }, { $project: { title: 1, owner: 1, status: 1, updatedTime: 1 } } ]) 複製程式碼
- 在 CRM 系統中,作為銷售管理,經常需要統計每個銷售業績情況,用於他們的業績考核指標。所以他需要統計每個銷售贏單的機會數目,為了達成這一目標他可以先通過匹配條件篩選出所有贏單的機會,並過濾所有擁有者為空,然後再通過人員分組,計算每個銷售贏單的機會數,聚合管道語句如下所示:
涉及到的組合: group
db.deals.aggregate([ { $match: { status: 'won', owner: { '$ne': null } } }, { $group: { _id: '$owner._id',num_deals: { $sum: 1 } } } ]) 複製程式碼
- 在 CRM 系統中,作為一名銷售,經常需要拜訪客戶,拜訪完成之後需要在機會中填寫相應的跟進記錄,所以對於他們來說需要完整的客戶地址資訊,用於日後的拜訪使用。這時候他就可以根據擁有者是自己,並且關聯的客戶地址資訊為空的篩選條件來找到那些機會資訊缺失。一般情況下,便於記憶我們按照建立時間倒序要顯示機會資訊,聚合管道語句如下所示:
涉及到的組合: lookup -> sort
db.deals.aggregate([ { $match: { 'owner.name': '張三' } }, { $lookup: { from: 'organizations', localField: '_organizationId', foreignField: '_id', as: 'organization' } }, { $match: { 'organization.address': {'$ne': ''} } }, { $sort: { 'createdTime': -1 } } ]) 複製程式碼
- 在 CRM 系統中,作為一名剛入職的銷售,可以通過檢視別人的贏單的跟單機會來學習如何跟單。這時候他就可以根據狀態是贏單且擁有者是張三的篩選條件,找出別人最近贏單的50條銷售機會來進行學習。聚合管道語句如下所示:
涉及到的組合: project -> skip -> $limit
db.deals.aggregate([ { $match: { 'status': 'won', 'owner.name': '張三' } }, { $project: { 'name': 1, 'status': 1, 'wonTime': 1, 'note': 1 }, { $sort: { 'wonTime': -1 }, { $skip: 0 }, { $limit: 50 } ]) 複製程式碼
- 在 CRM 系統中,我們習慣把資料的完整性和成單概率掛鉤,如果銷售機會關聯的聯絡人和客戶資訊缺失,往往代表這是一條成單率較低的機會。因為缺失聯絡人資訊無法及時與客戶聯絡會造成失單,所以我們可以通過聚合管道關聯操作,尋找存在一個已歸檔或者擁有者為空的聯絡人和客戶,找出對應的銷售機會,評估該機會的資訊缺失率,然後完善關聯資訊。聚合管道語句如下:
涉及到的組合: lookup -> match -> skip -> $limit
db.deals.aggregate([ { $match: { 'owner.name': '張三', '$or': [ { 'peoples': { '$ne': null } }, { '_organizationId':{ '$ne': null } } ] } }, { $lookup: { from: 'peoples', localField: 'peoples', foreignField: '_id', as: 'peoples' } }, { $lookup: { from: 'organizations', localField: '_organizationId', foreignField: '_id', as: 'organization' } }, { $match: { '$or': [ { 'peoples.isArchive': true }, { 'peoples.owner': { '$ne': null } }, { 'organization.isArchive': true }, { 'organization.owner': { '$ne': null } } ] }, { $sort: { 'createdTime': -1 } }, { $skip: 0 }, { $limit: 50 } ]) 複製程式碼
技巧
我們在使用聚合管道滿足我們的業務場景的同時,發現有很多小的技巧能夠幫助我們優化資料查詢,下面給大家列舉一下:
- 管道操作符之**$ifNull**
定義:如果表示式計算為非空值,則計算表示式並返回表示式的值。如果表示式計算為空值,包括未定義的值或缺少欄位的例項,則返回替換表示式的值。
如果需求是按照更新時間對未歸檔機會進行排序,普通的做法是:
db.deals.aggregate([ { $match: { 'isArchive': false } }, { $sort: { 'updatedTime': -1 } ]) 複製程式碼
這樣存在一個問題,由於存在更新時間欄位不存在或者值為空的髒資料,導致排序結果不準確,為了解決這個問題,當然我們也可以這樣去做,排序裡面指定多個欄位排序:
db.deals.aggregate([ { $match: { 'isArchive': false } }, { $sort: { 'updatedTime': -1,'_id': -1 } ]) 複製程式碼
如果我們運用管道操作符 $ifNull 去實現的話,可以更改更新時間結構,並填充預設值來達到我們期望的排序結果:
db.deals.aggregate([ { $match: { 'isArchive': false } }, { $project: { title: 1, updatedTime: { $ifNull: [ "$updatedTime", ISODate('9000-01-01')] } } } { $sort: { 'updatedTime': -1} ]) 複製程式碼
使用 $ifNull 資料填充來進行排序效率比空值比較排序效率要高,MongoDB 官方也給出了排序型別效率順序圖,如下所示:

圖二:不同型別排值圖
- 管道操作符之$cond
定義:評估布林表示式以返回兩個指定的返回表示式之一。 如果我們要實現按照更新時間對未歸檔機會進行排序,更新時間為空的填充預設值,我們可以這樣實現:
db.deals.aggregate([ { $match: { 'isArchive': false } }, { $project: { title: 1, updatedTime: { $cond: { if:{'$eq': ['$updatedTime', null]} , then: ISODate('9000-01-01') else '$updatedTime' } } } } { $sort: { 'updatedTime': -1} ]) 複製程式碼
優化
- 策略優化
- 將** sort **放到管道的前面,可以給集合建立索引,來提高處理資料的效率
- 可以用 limit、$skip 對文件進行提前過濾,以減少後續處理文件的數量
- MongoDB 自身優化器
- match 順序優化
- 如果 sort 之後,優化器會自動把 sort 前面
- limit 順序優化
- 如果 limit 之後,優化器會把 skip 的前面,移動後 skip 的值
- match 順序優化
注意事項
- 返回結果大小
- 聚合結果返回的是一個文件,不能超過 16M,從 MongoDB 2.6版本以後,返回的結果可以是一個遊標或者儲存到集合中,返回的結果不受 16M 的限制。
- 記憶體
- 在進行 Group 操作的時候,如果內容超過 100 M,將會拋錯 “ Exceeded memory limit for $group, but not allow enternal sort, put allowDiskUse true ”,如果需要處理大資料,可以使用 allowDiskUse 選項,儲存到磁碟上**。**
- 聚合操作符使用
- 在進行 $project 操作的時候,我們時常會把不需要的值過濾掉,以此來減少聚合操作對記憶體的消耗,但是不可以刪除預設的 _id, 如果刪除會拋錯 “ exception: The top-level _id field is the only field currently supported for exclusion ”, _id 作為官方欄位不可以刪除掉。
小結
今天我為你介紹了 MongoDB 聚合管道的應用實踐、技巧、優化以及注意事項,解釋了聚合管道操作符的基礎概念,希望能夠對你有所幫助。
這就是以上全部的內容,留幾道思考題給你吧。你們公司使用 MongoDB 聚合管道嗎?一般使用在什麼業務上面?你覺得好用嗎?