1. 程式人生 > >使用mongoDB pipeline進行聚合操作

使用mongoDB pipeline進行聚合操作

mongoDB中的聚合操作將多個文件中的值組合在一起,並可對分組資料執行各種操作,以返回單個結果。 在SQL中的 count(*)與group by組合相當於mongodb 中的聚合功能。
mongoDB為我們提供了三種方法來實現聚合操作。分別是aggregation pipeline,Map-Reduce和Single Purpose Aggregation Operations。今天我們主要來討論一下關於aggregation pipeline的內容。關於Map-Reduce和Single Purpose Aggregation Operations後面有時間了再討論。

aggregation pipeline主要是用aggregate()方法來實現聚合操作。
pipeline通俗的理解就是一個管道,其中每一個操作就是管道的一個階段,每次當前操作接受上一個階段的輸出作為輸入,並把輸出結果作為輸入結果給下一個階段。如果你還不是特別明白,不用擔心,慢慢往下看,我們會用更多的例子來說明。語法如下:
db.collection.aggregate( [ { }, … ] )。其中stage可以是如下的內容。

1$project

$project用來重構返回值。下面舉個例子來說明。

db.books.insert(
{
  "_id" : 1,
  title: "abc123",
  isbn: "0001122223334",
  author: { last: "zzz", first: "aaa" },
  copies: 5
}
)

此時我們使用如下命令來查詢資料:

 db.books.aggregate([{$project:{title:1,author:1}}])

輸出如下結果:

{ "_id" : 1, "title" : "abc123", "author" : { "last
" : "zzz", "first" : "aaa" }
}

可以看出,輸出的結果中包含了_id,title,author三個欄位。預設情況下_id欄位是會輸出的,然後在$project中可以通過指定欄位的值為0或者1,來決定輸出結果中包含還是不包含這個欄位。為1時代表返回值包含這個欄位,為0時代表返回值不包含這個欄位。不想返回_id欄位可以這樣:

db.books.aggregate( [ { $project : { _id: 0, title : 1 , author : 1 } } ] )

$project除了可以控制是否返回某個欄位,還可以給返回結果增加欄位。

db.books.aggregate(
   [
      {
         $project
: { title: 1, isbn: { prefix: { $substr: [ "$isbn", 0, 3 ] }, group: { $substr: [ "$isbn", 3, 2 ] }, publisher: { $substr: [ "$isbn", 5, 4 ] }, title: { $substr: [ "$isbn", 9, 3 ] }, checkDigit: { $substr: [ "$isbn", 12, 1] } }, lastName: "$author.last", copiesSold: "$copies" } } ] )

返回結果如下:

{ "_id" : 1, "title" : "abc123", "isbn" : { "prefix" : "000", "group" : "11", "publisher" : "2222", "title" : "333", "checkDigit" : "4" }, "lastName" : "zzz", "copiesSold" : 5 }

可以看到,我們給返回結果的isbsn欄位中添加了prefix,group,publisher,title和checkDigit欄位,這些欄位是通過對原來isbn的字串進行拆分得到的,另一個增加的欄位copiesSold是引用了原來的copies欄位。

2$match

$match通過跟查詢語句相比對,來過濾集合,只返回跟查詢語句相匹配的文件。

db.articles.insert([
{ "_id" : ObjectId("512bc95fe835e68f199c8686"), "author" : "dave", "score" : 80, "views" : 100 }
{ "_id" : ObjectId("512bc962e835e68f199c8687"), "author" : "dave", "score" : 85, "views" : 521 }
{ "_id" : ObjectId("55f5a192d4bede9ac365b257"), "author" : "ahn", "score" : 60, "views" : 1000 }
{ "_id" : ObjectId("55f5a192d4bede9ac365b258"), "author" : "li", "score" : 55, "views" : 5000 }
{ "_id" : ObjectId("55f5a1d3d4bede9ac365b259"), "author" : "annT", "score" : 60, "views" : 50 }
{ "_id" : ObjectId("55f5a1d3d4bede9ac365b25a"), "author" : "li", "score" : 94, "views" : 999 }
{ "_id" : ObjectId("55f5a1d3d4bede9ac365b25b"), "author" : "ty", "score" : 95, "views" : 1000 }
])

執行命令:

db.articles.aggregate([{$match:{author:"dave"}}])

返回結果如下:

{ "_id" : ObjectId("512bc95fe835e68f199c8686"), "author" : "dave", "score" : 80, "views" : 100 }
{ "_id" : ObjectId("512bc962e835e68f199c8687"), "author" : "dave", "score" : 85, "views" : 521 }

3$redact

感覺不好說清楚,直接上例子:

db.forecasts.insert(
{
  _id: 1,
  title: "123 Department Report",
  tags: [ "G", "STLW" ],
  year: 2014,
  subsections: [
    {
      subtitle: "Section 1: Overview",
      tags: [ "SI", "G" ],
      content:  "Section 1: This is the content of section 1."
    },
    {
      subtitle: "Section 2: Analysis",
      tags: [ "STLW" ],
      content: "Section 2: This is the content of section 2."
    },
    {
      subtitle: "Section 3: Budgeting",
      tags: [ "TK" ],
      content: {
        text: "Section 3: This is the content of section3.",
        tags: [ "HCS" ]
      }
    }
  ]
}
)

執行如下操作:

var userAccess = [ "STLW", "G" ];
db.forecasts.aggregate(
   [
     { $match: { year: 2014 } },
     { $redact: {
        $cond: {
           if: { $gt: [ { $size: { $setIntersection: [ "$tags", userAccess ] } }, 0 ] },
           then: "$$DESCEND",
           else: "$$PRUNE"
         }
       }
     }
   ]
);

返回結果如下:

{
  "_id" : 1,
  "title" : "123 Department Report",
  "tags" : [ "G", "STLW" ],
  "year" : 2014,
  "subsections" : [
    {
      "subtitle" : "Section 1: Overview",
      "tags" : [ "SI", "G" ],
      "content" : "Section 1: This is the content of section 1."
    },
    {
      "subtitle" : "Section 2: Analysis",
      "tags" : [ "STLW" ],
      "content" : "Section 2: This is the content of section 2."
    }
  ]
}

解釋一下上面的例子,$redact主要有三個關鍵字:
1)$$DESCEND:包含當前document級別的所有fields。當前級別欄位的內嵌文件將會被繼續檢測。
2)$$PRUNE:不包含當前文件或者內嵌文件級別的所有欄位,不會繼續檢測此級別的其他欄位,即使這些欄位的內嵌文件持有相同的訪問級別。
3)$$KEEP:包含當前文件或內嵌文件級別的所有欄位,不再繼續檢測此級別的其他欄位,即使這些欄位的內嵌文件中持有不同的訪問級別。
在上面的aggregate操作中,首先使用$match過濾掉了一部分資料,只拿到year是2014的文件。此時$matche就是第一個階段,在這一個階段中,我們只拿出跟查詢條件相匹配的結果,並將這個結果作為輸入傳遞個下一個階段。在這裡下一個階段就是$redact。在$redact階段中,我們拿到了上一個階段的輸出結果。$redact常常和$cond配合使用。$cond是英文condition的所寫,代表條件。在$cond中通常會使用if–else–then的表示式。在if中傳入一個判斷語句,如果符合判斷語句,執行then的部分,否則執行else的部分。在我們上面的例子中,$setIntersection表示對兩個集合取交集。$size表示對集合取大小。所以if判斷語句中的意思就是:對於當前階段輸入進來的每一個文件,都判斷tags和userAccess的交集的長度是否大於0,如果大於0,就執行then,否則執行else。
在then這一部分使用了$$DESCEND關鍵字。這個關鍵字的意思我們在上面已經做了說明。在這個例子中的意思就是說,如果我滿足了if條件,那麼會繼續遞迴的向下判斷我的子文件。在我們的例子中,將繼續去判斷subsections欄位裡面的內容,因為它裡面也含有tags欄位。
在else這一部分使用了$$PRUNE關鍵字。這個關鍵字的意思在上面也已經說清楚了。在這個例子中的意思就是說,如果我不滿足if條件,那麼我直接就捨棄當前文件。

4$limit

用來控制當前階段輸出資料的資料。如下:

db.article.aggregate(
    { $limit : 5 }
);

返回結果只有5條,如果還有下一個階段的操作,那麼也只會向下一階段傳遞5條結果。
mongoDB對limit做了優化,如果$sort(排序)剛好在limit前面,那麼$sort只會在記憶體中維護n條記錄,這個n就是limit中的數字。

5$skip

跳過一些資料。
如下:

db.article.aggregate(
    { $skip : 5 }
);

在這個例子中將會跳過5條資料,然後將結果返回。如果還有下一步操作,將結果傳遞給下一階段。

6$unwind

還是直接上例子。
inventory集合中有這樣一個文件。

{ "_id" : 1, "item" : "ABC1", sizes: [ "S", "M", "L"] }

使用:

db.inventory.aggregate( [ { $unwind : "$sizes" } ] )

返回結果:

{ "_id" : 1, "item" : "ABC1", "sizes" : "S" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "M" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "L" }

可以看出,在這個例子中,$unwind命令將sizes欄位的陣列拆分成了一個個的元素。這就是$unwind的主要功能。
$unwind的完整寫法如下:

{
  $unwind:
    {
      path: <field path>,
      includeArrayIndex: <string>,
      preserveNullAndEmptyArrays: <boolean>
    }
}

path:必須有,指明需要查分的欄位,一般為一個數組
includeArrayIndex: 可選,如果需要拆分開的資料包含資料下標,可以指定陣列下標的欄位名
preserveNullAndEmptyArrays: 可選,在陣列為null,缺失 ,或者空的情況下,如果為true, $unwind 同樣輸出當前文件. 如果為false, \$unwind不輸出文件。預設是false.
下面的寫法效果是一樣的:

db.inventory.aggregate( [ { $unwind: "$sizes" } ] )
db.inventory.aggregate( [ { $unwind: { path: "$sizes" } } ] )

假設現在inventory中有如下文件資料:

{ "_id" : 1, "item" : "ABC", "sizes": [ "S", "M", "L"] }
{ "_id" : 2, "item" : "EFG", "sizes" : [ ] }
{ "_id" : 3, "item" : "IJK", "sizes": "M" }
{ "_id" : 4, "item" : "LMN" }
{ "_id" : 5, "item" : "XYZ", "sizes" : null }

使用如下命令:

db.inventory.aggregate( [ { $unwind: "$sizes" } ] )

返回結果:

{ "_id" : 1, "item" : "ABC", "sizes" : "S" }
{ "_id" : 1, "item" : "ABC", "sizes" : "M" }
{ "_id" : 1, "item" : "ABC", "sizes" : "L" }
{ "_id" : 3, "item" : "IJK", "sizes" : "M" }

可以看到,在sizes是空,null或者缺失的情況下,文件結果不會輸出來。(preserveNullAndEmptyArrays預設是false)
我們來將preserveNullAndEmptyArrays指定為true,看一下結果。
執行:

db.inventory.aggregate( [
   { $unwind: { path: "$sizes", preserveNullAndEmptyArrays: true } }
] )

返回結果:

{ "_id" : 1, "item" : "ABC", "sizes" : "S" }
{ "_id" : 1, "item" : "ABC", "sizes" : "M" }
{ "_id" : 1, "item" : "ABC", "sizes" : "L" }
{ "_id" : 2, "item" : "EFG" }
{ "_id" : 3, "item" : "IJK", "sizes" : "M" }
{ "_id" : 4, "item" : "LMN" }
{ "_id" : 5, "item" : "XYZ", "sizes" : null }

區別已經一目瞭然。
我們還可以通過來顯示被查分的資料的陣列下標。

db.inventory.aggregate( [ { $unwind: { path: "$sizes", includeArrayIndex: "arrayIndex" } } ] )

在這裡,返回結果中會多出一個欄位叫做:arrayIndex,這個欄位包含了當前文件原來的資料在陣列中的下標。上面的返回結果:

{ "_id" : 1, "item" : "ABC", "sizes" : "S", "arrayIndex" : NumberLong(0) }
{ "_id" : 1, "item" : "ABC", "sizes" : "M", "arrayIndex" : NumberLong(1) }
{ "_id" : 1, "item" : "ABC", "sizes" : "L", "arrayIndex" : NumberLong(2) }
{ "_id" : 3, "item" : "IJK", "sizes" : "M", "arrayIndex" : null }

7$sample

$sample隨機返回指定大小的資料集合。標準的語法模板如下:

{ $sample: { size: <positive integer> } }

通過下面的命令從users集合中隨機選出三條記錄。

db.users.aggregate(
   [ { $sample: { size: 3 } } ]
)

結果將返回users中隨機的三條記錄。
值得注意的是,$sample的返回結果中,某一條記錄可能會出現不止一次。

8$group

$group幾乎是用的最為頻繁的聚合函式。先來舉個例子看一下效果:

 db.products.insert([
   {proId:"001",saleNum:10},
   {proId:"002",saleNum:5},
   {proId:"001",saleNum:15}
   ])

現在products集合中有三條資料。現在我們想要統計出proId為001的產品總的saleNum。

 db.products.aggregate([
    {
       $group:{_id:"$proId",
              totalNum:{$sum:"$saleNum"}
              }
    }
    ])

$group就類似與mysql中的group by,指定資料根據某個欄位分組。在$group中我們是通過_id來指定分組依據。上面的例子中,我們指定資料根據proId來盡心分組,然後在每個分組中通過$sum函式來計算saleNum的總數,並將計算結果給了totalNum這個欄位。
返回結果如下:

{ "_id" : "002", "totalNum" : 5 }
{ "_id" : "001", "totalNum" : 25 }

通過上面這個例子,相信大家對$group的大致使用已經有所瞭解。現在我們來看看$group的完整的使用模板:

{ $group: { _id: <expression>, <field1>: { <accumulator1> : <expression1> }, ... } }

其中_id指定分組依據,後面的accumulator可以是下面的函式:

  • $sum:返回分組中某個欄位的和。如果某個字串無法轉換成數字,mongoDB將忽略它。從3.2版本之後,在$group和$project階段都有效。
  • $avg:返回分組內某個欄位的平均值。如果某個字串無法轉換成數字,mongoDB將忽略它。從3.2版本之後,在$group和$project階段都有效。
  • $first:返回分組內的第一條文件資料。僅僅在$group階段有效。
  • $last:返回分組內的最後一條文件資料。僅僅在$group階段有效。
  • $max:返回分組內某個欄位最大的資料。從3.2版本之後,在$group和$project階段都有效。
  • $min:返回分組內某個欄位最小的資料。從3.2版本之後,在$group和$project階段都有效。
  • $push:將分組內某個欄位的值加入到一個數組中。僅僅在$group階段有效。
  • $addToSet:將分組內的某個欄位加入一個集合中,無重複值。僅僅在$group階段有效。
  • $stdDevPop:計算分組內某個欄位的標準差。從3.2版本之後,在$group和$project階段都有效。
  • $stdDevSamp:計算樣本中某個欄位的標準差。從3.2版本之後,在$group和$project階段都有效。
    下面我們一一舉例來說明。
    $sum
    表示式模板:
{ $sum: <expression> } 

集合sales中有如下資料:

{ "_id" : 1, "item" : "abc", "price" : 10, "quantity" : 2, "date" : ISODate("2014-01-01T08:00:00Z")}
{ "_id" : 2, "item" : "jkl", "price" : 20, "quantity" : 1, "date" : ISODate("2014-02-03T09:00:00Z")}
{ "_id" : 3, "item" : "xyz", "price" : 5, "quantity" : 5, "date" : ISODate("2014-02-03T09:05:00Z")}
{ "_id" : 4, "item" : "abc", "price" : 10, "quantity" : 10, "date" : ISODate("2014-02-15T08:00:00Z")}
{ "_id" : 5, "item" : "xyz", "price" : 5, "quantity" : 10, "date" : ISODate("2014-02-15T09:05:00Z")}

執行命令:

db.sales.aggregate(
   [
     {
       $group:
         {
           _id: { day: { $dayOfYear: "$date"}, year: { $year: "$date" } },
           totalAmount: { $sum: { $multiply: [ "$price", "$quantity" ] } },
           count: { $sum: 1 }
         }
     }
   ]
)

解釋一下這條命令,$dayOfYear是個函式取到某天是一年中的第幾天,$yearz這個函式取到年份,所以在上面的例子中,我們是用一年中的第幾天和年份來作為分組依據。假設某個分組內有n條資料,對於每一條資料,$multiply函式用來做乘法計算,將每一個分組內每一條資料中的price和quantity相乘。一共計算n次,得到n個乘積,最後把得到的n個乘積求和給totalAmount這個欄位。
count: { $sum: 1 }跟totalAmount類似,只是在這裡把price和quantity相乘直接換成了1。這樣寫的意思就是統計每個分組內有多少條資料。
輸出的結果如下:

{ "_id" : { "day" : 46, "year" : 2014 }, "totalAmount" : 150, "count" : 2 }
{ "_id" : { "day" : 34, "year" : 2014 }, "totalAmount" : 45, "count" : 2 }
{ "_id" : { "day" : 1, "year" : 2014 }, "totalAmount" : 20, "count" : 1 }

補充:

$group階段,如果表示式中的資料是陣列,那麼$sum把該陣列看做非數字型別。

example Field Values Results
{ $sum : <field> } 數字 計算values的和
{ $sum : <field> } 數字和非數字混合 忽略非數字,計算數字的和
{ $sum : <field> } 非數字,或者field不存在 0

$avg
計算每一個分組內某個指定欄位的平均值。
表示式:

{ $avg: <expression> }

這個跟上面的sum使用方法差不多,就不贅述了,舉個例子相信大家都看得懂。
在sales集合中有如下資料:

{ "_id" : 1, "item" : "abc", "price" : 10, "quantity" : 2, "date" : ISODate("2014-01-01T08:00:00Z")}
{ "_id" : 2, "item" : "jkl", "price" : 20, "quantity" : 1, "date" : ISODate("2014-02-03T09:00:00Z")}
{ "_id" : 3, "item" : "xyz", "price" : 5, "quantity" : 5, "date" : ISODate("2014-02-03T09:05:00Z")}
{ "_id" : 4, "item" : "abc", "price" : 10, "quantity" : 10, "date" : ISODate("2014-02-15T08:00:00Z")}
{ "_id" : 5, "item" : "xyz", "price" : 5, "quantity" : 10, "date" : ISODate("2014-02-15T09:12:00Z")}

執行命令:

db.sales.aggregate(
   [
     {
       $group:
         {
           _id: "$item",
           avgAmount: { $avg: { $multiply: [ "$price", "$quantity" ] } },
           avgQuantity: { $avg: "$quantity" }
         }
     }
   ]
)

返回結果如下:

{ "_id" : "xyz", "avgAmount" : 37.5, "avgQuantity" : 7.5 }
{ "_id" : "jkl", "avgAmount" : 20, "avgQuantity" : 1 }
{ "_id" : "abc", "avgAmount" : 60, "avgQuantity" : 6 }

補充:
$group階段,如果表示式中的資料是陣列,那麼$avg把該陣列看做非數字型別。
$first
返回每個分組中第一條資料,注意只有在組內資料有一定的順序的時候,這個操作才具有意義。
語法如下:

{ $first: <expression> }

假定sales集合中有如下資料:

{ "_id" : 1, "item" : "abc", "price" : 10, "quantity" : 2, "date" : ISODate("2014-01-01T08:00:00Z")}
{ "_id" : 2, "item" : "jkl", "price" : 20, "quantity" : 1, "date" : ISODate("2014-02-03T09:00:00Z")}
{ "_id" : 3, "item" : "xyz", "price" : 5, "quantity" : 5, "date" : ISODate("2014-02-03T09:05:00Z")}
{ "_id" : 4,"item" : "abc", "price" : 10, "quantity" : 10, "date" : ISODate("2014-02-15T08:00:00Z")}
{ "_id" : 5,"item" : "xyz", "price" : 5, "quantity" : 10, "date" : ISODate("2014-02-15T09:05:00Z")}
{ "_id" : 6, "item" : "xyz", "price" : 5, "quantity" : 5, "date" : ISODate("2014-02-15T12:05:10Z") }
{ "_id" : 7, "item" : "xyz", "price" : 5, "quantity" : 10, "date" : ISODate("2014-02-15T14:12:12Z")}

執行如下操作:

db.sales.aggregate(
   [
     { $sort: { item: 1, date: 1 } },
     {
       $group:
         {
           _id: "$item",
           firstSalesDate: { $first: "$date" }
         }
     }
   ]
)

首先根據item和date做正向排序,然後根據item欄位分組,並將每一組內第一條資料給firstSalesDate欄位,這樣就拿出了每種item最早的銷售日期。
返回結果:

{ "_id" : "xyz", "firstSalesDate" : ISODate("2014-02-03T09:05:00Z") }
{ "_id" : "jkl", "firstSalesDate" : ISODate("2014-02-03T09:00:00Z") }
{ "_id" : "abc", "firstSalesDate" : ISODate("2014-01-01T08:00:00Z") }

$last
$first用法一樣,不再贅述。
$max
返回最大值。例子一看大家都明白。

db.sales.aggregate(
   [
     {
       $group:
         {
           _id: "$item",
           maxTotalAmount: { $max: { $multiply: [ "$price", "$quantity" ] } },
           maxQuantity: { $max: "$quantity" }
         }
     }
   ]
)

返回結果:

{ "_id" : "xyz", "maxTotalAmount" : 50, "maxQuantity" : 10 }
{ "_id" : "jkl", "maxTotalAmount" : 20, "maxQuantity" : 1 }
{ "_id" : "abc", "maxTotalAmount" : 100, "maxQuantity" : 10 }

$min
$max類似,不在贅述。
$push
將指定的表示式的值新增到一個數組中。
還是以上面的sales集合為例。

db.sales.aggregate(
   [
     {
       $group:
         {
           _id: { day: { $dayOfYear: "$date"}, year: { $year: "$date" } },
           itemsSold: { $push:  { item: "$item", quantity: "$quantity" } }
         }
     }
   ]
)

在這個例子中我們將item: “$item”, quantity: “$quantity”這樣形式的資料每次都放入一個數組中,陣列名字我們取名為itemSold.
返回結果:

{
   "_id" : { "day" : 46, "year" : 2014 },
   "itemsSold" : [
      { "item" : "abc", "quantity" : 10 },
      { "item" : "xyz", "quantity" : 10 },
      { "item" : "xyz", "quantity" : 5 },
      { "item" : "xyz", "quantity" : 10 }
   ]
}
{
   "_id" : { "day" : 34, "year" : 2014 },
   "itemsSold" : [
      { "item" : "jkl", "quantity" : 1 },
      { "item" : "xyz", "quantity" : 5 }
   ]
}
{
   "_id" : { "day" : 1, "year" : 2014 },
   "itemsSold" : [ { "item" : "abc", "quantity" : 2 } ]
}

$addToSet
用法同$push,只是不能有重複資料。
依然用sales集合做例子:

db.sales.aggregate(
   [
     {
       $group:
         {
           _id: { day: { $dayOfYear: "$date"}, year: { $year: "$date" } },
           itemsSold: { $addToSet: "$item" }
         }
     }
   ]
)

返回結果:

{ "_id" : { "day" : 46, "year" : 2014 }, "itemsSold" : [ "xyz", "abc" ] }
{ "_id" : { "day" : 34, "year" : 2014 }, "itemsSold" : [ "xyz", "jkl" ] }
{ "_id" : { "day" : 1, "year" : 2014 }, "itemsSold" : [ "abc" ] }

$stdDevPop
返回標準差。
user集合中有如下資料:

{ "_id" : 1, "name" : "dave123", "quiz" : 1, "score" : 85 }
{ "_id" : 2, "name" : "dave2", "quiz" : 1, "score" : 90 }
{ "_id" : 3, "name" : "ahn", "