背景

最近做了幾個規則邏輯。用到mongo查詢比較多,就是查詢交易資訊跑既定規則篩選出交易商戶,使用聚合管道進行統計和取出簡單處理後的資料,用SQL代替業務程式碼邏輯的判斷。

方法

MongoDB聚合使用aggregate,聚合管道採取自動向下子執行方式,基本語法格式:

db.COLLECTION_NAME.aggregate(AGGREGATE_OPERATION)

聚合框架中常用的操作:

  • $project:修改輸入文件的結構。可以用來重新命名、增加或刪除域,也可以用於建立計算結果以及巢狀文件。
  • \(match:用於過濾資料,只輸出符合條件的文件。\)match使用MongoDB的標準查詢操作。
  • $limit:用來限制MongoDB聚合管道返回的文件數。
  • $skip:在聚合管道中跳過指定數量的文件,並返回餘下的文件。
  • $unwind:將文件中的某一個數組型別欄位拆分成多條,每條包含陣列中的一個值。
  • $group:將集合中的文件分組,可用於統計結果。
  • $sort:將輸入文件排序後輸出。
  • $geoNear:輸出接近某一地理位置的有序文件。
表示式 描述 例項
$sum 計算總和。 db.mycol.aggregate([{\(group : {_id : "\)by_user", num_tutorial : {\(sum : "\)likes"}}}])
$avg 計算平均值 db.mycol.aggregate([{\(group : {_id : "\)by_user", num_tutorial : {\(avg : "\)likes"}}}])
$min 獲取集合中所有文件對應值得最小值。 db.mycol.aggregate([{\(group : {_id : "\)by_user", num_tutorial : {\(min : "\)likes"}}}])
$max 獲取集合中所有文件對應值得最大值。 db.mycol.aggregate([{\(group : {_id : "\)by_user", num_tutorial : {\(max : "\)likes"}}}])
$push 在結果文件中插入值到一個數組中。 db.mycol.aggregate([{\(group : {_id : "\)by_user", url : {\(push: "\)url"}}}])
$addToSet 在結果文件中插入值到一個數組中,但不建立副本。 db.mycol.aggregate([{\(group : {_id : "\)by_user", url : {\(addToSet : "\)url"}}}])
$first 根據資源文件的排序獲取第一個文件資料。 db.mycol.aggregate([{\(group : {_id : "\)by_user", first_url : {\(first : "\)url"}}}])
$last 根據資源文件的排序獲取最後一個文件資料 db.mycol.aggregate([{\(group : {_id : "\)by_user", last_url : {\(last : "\)url"}}}])

查詢示例

示例一

部分欄位說明:transAmt:交易金額,transType:交易型別,transTime:交易時間,mercNum:商戶編號

查詢交易資訊,交易商戶昨天交易筆數大於三百,交易金額累加大於三百萬,這裡現根據$match將交易資訊篩選出來,然後使用$group根據商戶編號分組,統計交易筆數和累加交易金額,將分組結果判斷匹配交易筆數大於三百,交易金額大於三百萬。

db.getCollection('box_order').aggregate([
{
$match: {
"transTime":{$gte:ISODate("2020-01-03T00:00:00.000Z"),$lt:ISODate("2020-01-10T00:00:00.000Z")},
"transType":"consume",
"transStatus":{$in:["tsProcessing","success"]}
}
},
{
$group: {
"_id": "$mercNum",
"count": {"$sum": 1},
"totalAmt": {"$sum": "$transAmt"}
}
},
{
$match: {
"count": {"$gte": 300},
"totalAmt": {"$gte": 3000000}
}
}
])

示例二

部分欄位說明:cardNo:交易卡號,transType:交易型別,transTime:交易時間,mercNum:商戶編號

查詢時間段內指定卡號下的交易商戶資訊。

根據卡號和交易時間將交易資料查出來,然後只顯示商戶號和卡號兩列欄位,根據商戶號和卡號分組去重,再根據卡號分組,將商戶號轉化成一個欄位變成陣列。

db.getCollection('order_202011').aggregate([
{
"$match": {
"detailInfo.cardNo": {
"$in": [
"YtCZ7KhCVG5xerKUg8bzJhVAjW/hWAWj",
"cQ7QQ0yCVW6LhHtJNVRq2A==",
"6KDpHmQ9s+0SQAGAUyLJ4A==",
"cQ7QQ0yCVW7iSegn8uqIfg==",
"ZEOcXdI4rfvswAz7dQ80hw==",
"6KDpHmQ9s+2Nz61PPuOamw=="
]
},
"baseInfo.transTime": {
"$gte": new Date(2020,10,01),
"$lt": new Date(2020,10,24)
}
}
},
{
"$project": {
"merchantInfo.mercNum": 1,
"detailInfo.cardNo": 1
}
},
{
"$group": {
"_id": {
"mercNum": "$merchantInfo.mercNum",
"cardNo": "$detailInfo.cardNo"
}
}
},
{
"$group": {
"_id": "$_id.cardNo",
"mercNums": {
"$push": "$_id.mercNum"
}
}
}
])

示例三

根據指定商戶和其他條件查詢交易資訊,根據卡號分組並組裝成一個欄位的集合,最後篩選掉id只保留cardNos陣列

db.getCollection('box_order_fxq_202104').aggregate([
{
"$match": {
"mercNum": "M15201812030753174730",
"transTime": {
"$gte": ISODate("2021-04-17T16:00:00.000Z"),
"$lt": ISODate("2021-04-18T16:00:00.000Z")
},
"mercLevel": {
"$in": [
"C",
"D",
"E"
]
},
"payType": "POSPAY",
"transType": "consume",
"cardType": "2"
}
},
{
"$group": {
"_id": null,
"cardNos": {
"$push": "$cardNo" //$addToSet
}
}
},
{
"$project":{
"cardNos":1,"_id":0
}
}
])

查詢結果:

{
"cardNos" : [
"n2IwHHhfEAJcm6RFsoNPcBVAjW/hWAWj",
"n2IwHHhfEAJcm6RFsoNPcBVAjW/hWAWj"
]
}

示例四

根據時間查詢交易資訊後,根據商戶號分組,並將第一個交易資訊存放入data欄位中。(如果是需要全部的商戶交易資訊那麼將$first修改為$push

db.getCollection('order').aggregate([
{
"$match": {
"startTrxTime": {
"$gte": ISODate("2021-07-20T16:00:00.000Z"),
"$lt": ISODate("2021-07-21T16:00:00.000Z")
}
}
},
{
"$group": {
"_id": "$subMerchantNo",
'data':{'$first': '$$ROOT'} //$push
}
},
{
"$sort": {
"_id": 1
}
}
])

尾言

最近那個到查詢的大差不差,要注意的都是一些小改動,一般情況正常查就可以。後續有什麼不一樣的會繼續補充。先到這裡