C# 讀寫MongoDB
阿新 • • 發佈:2018-12-24
驅動:Official .NET driver for MongoDB
版本:2.5.0
mongodb內部是用Bson格式儲存的,與json大致類似但有區別,因此它也原生支援json串語法格式進行操作,
在C#版本驅動中就有BsonDucument類來處理json串,
所以大多驅動介面的範型引數TDocument型別都可以用BsonDucument
,即直接用BsonDucument來處理json和bson進行增刪改查。
但是我不喜歡在C#語言還去寫json,總覺得這樣做以後比較難維護,所以儘量用C#語法本身去操作。
得益於驅動提供了BsonSerializer類,能夠將C#中的大多數自定義class物件進行序列化與反序列化。
所以TDocument範型引數不受限於BsonDucument,可以是任何自定義的類物件,
它內部可以將自定義的類物件序列化成BsonDucument,也可以反序列化,於是就可以在自定義物件層面操作mongodb了。
先簡單做個封裝類:
using MongoDB.Bson; using MongoDB.Bson.IO; using MongoDB.Bson.Serialization; using MongoDB.Driver; using MongoDB.Driver.Linq; using System; using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; using System.Threading.Tasks; using MongoDB.Bson.Serialization.Attributes;
public class MongoDbHelper { private readonly IMongoDatabase _database; private MongoDbHelper(string connectionString, string databaseName) { var client = new MongoClient(connectionString); _database = client.GetDatabase(databaseName); } public Task Close() { return Task.CompletedTask; } public async Task EnsureIndexes<TC,TD>(IEnumerable<string> keys) { var collection = _database.GetCollection<TD>(typeof(TC).Name); var indexManager = collection.Indexes; await indexManager.CreateManyAsync(keys.Select(key => new CreateIndexModel<TD>($"{{{key}:1}}"))); } public Task<long> CountAsync<T>(string grainType, Expression<Func<T, bool>> whereFilter) { var collection = _database.GetCollection<T>(grainType); return collection.CountAsync(whereFilter == null ? FilterDefinition<T>.Empty : Builders<T>.Filter.Where(whereFilter)); } public Task WriteData<T>(string collectionName, IEnumerable<T> data) { var collection = _database.GetCollection<BsonDocument>(collectionName); collection.InsertManyAsync(data.Select(d => d.ToBsonDocument())); //物件可以直接轉Bson return Task.CompletedTask; } //TC用來指明資料庫的Collection名,相當於sql表名; //TD型別用來指明文件序列化物件,相當於sql記錄所對應的物件 public Task WriteData<TC,TD>(IEnumerable<TD> data) { return WriteData<TD>(typeof(TC).Name, data); } //T型別用來指明文件序列化物件,相當於sql記錄所對應的物件 public async Task<List<T>> Query<T>(string collectionName, Expression<Func<T, bool>> whereFilter) { try { var collection = _database.GetCollection<T>(collectionName); var filterDefinition = whereFilter == null ? FilterDefinition<T>.Empty : Builders<T>.Filter.Where(whereFilter); var projection = Builders<T>.Projection.Exclude("_id"); var findoptions = new FindOptions<T>() { Projection = projection }; var cursor = await collection.FindAsync(filterDefinition, findoptions); return cursor.ToEnumerable().ToList(); } catch (Exception e) { Console.WriteLine(e); } return null; } public Task<List<TD>> Query<TC, TD>(Expression<Func<TD, bool>> whereFilter) { return Query(typeof(TC).Name, whereFilter); } private async Task<List<TO>> AggregateQuery<TI, TO>(string collectionName, IEnumerable<IPipelineStageDefinition> pipelineStage) { try { var pipeline = new PipelineStagePipelineDefinition<TI, TO>(pipelineStage); var collection = _database.GetCollection<TI>(collectionName); var result = await collection.AggregateAsync(pipeline); return result.ToEnumerable().ToList(); } catch (Exception e) { Console.WriteLine(e); } return null; } public Task<List<TO>> AggregateQuery<TC, TI, TO>(IEnumerable<IPipelineStageDefinition> pipelineStage) { return AggregateQuery<TI,TO>(typeof(TC).Name, pipelineStage); } /// <summary> /// /// </summary> /// <typeparam name="TC">collection名稱類</typeparam> /// <typeparam name="TI">輸入文件類</typeparam> /// <typeparam name="TG">group by欄位型別</typeparam> /// <typeparam name="TO">輸出型別</typeparam> /// <param name="match"></param> /// <param name="groupby"></param> /// <param name="groupValue"></param> /// <returns></returns> public async Task<List<TO>> AggregateQuery<TC,TI,TG,TO>(Expression<Func<TI, bool>> match, Expression<Func<TI, TG>> groupby, Expression<Func<IGrouping<TG, TI>, TO>> groupValue) { var pipelineStageDefinitions = new IPipelineStageDefinition[] { PipelineStageDefinitionBuilder.Match(match), PipelineStageDefinitionBuilder.Group( groupby, groupValue) }; return await AggregateQuery<TC, TI, TO>(pipelineStageDefinitions); } private async Task<List<T>> AggregateQuery<T>(string collectionName, IEnumerable<IPipelineStageDefinition> pipelineStage) { try { var pipeline = new PipelineStagePipelineDefinition<BsonDocument, BsonDocument>(pipelineStage); var collection = _database.GetCollection<BsonDocument>(collectionName); var result = await collection.AggregateAsync(pipeline); return result.ToEnumerable().Select(b => BsonSerializer.Deserialize<T>(b)).ToList(); } catch (Exception e) { Console.WriteLine(e); } return null; } public Task<List<TO>> AggregateQuery<TC,TO>(IEnumerable<string> pipelineStageJson) { return AggregateQuery<TO> (typeof(TC).Name,pipelineStageJson.Select( json => new JsonPipelineStageDefinition<BsonDocument,BsonDocument>(json) as IPipelineStageDefinition).ToList()); } }
自定義一個類:
public class Recharge
{
public long UserId { get; set; }
public ulong Amount { get; set; }
[BsonDateTimeOptions(Kind = DateTimeKind.Local)]
public DateTime Time { get; set; } = DateTime.Now;
}
先打開個資料庫連線:
//連線串和資料庫名引數按自己的實際情況填
var helper = new MongoDbHelper("XXXX","YYYY");
寫資料:
//寫入多條資料
await helper.WriteData<Recharge, Recharge>(new[]
{
new Recharge() {UserId = 1, Amount = 100},
new Recharge() {UserId = 1, Amount = 200},
new Recharge() {UserId = 2, Amount = 500},
new Recharge() {UserId = 2, Amount = 1000},
new Recharge() {UserId = 3, Amount = 800},
new Recharge() {UserId = 4, Amount = 100},
});
查詢資料: //按id篩選
var list1 = await helper.Query<Recharge, Recharge>(recharge=>recharge.UserId == 1);
//按時間篩選
var list2 = await helper.Query<Recharge, Recharge>(
recharge => recharge.Time >= DateTime.MinValue &&
recharge.Time <= DateTime.Now);
//按金額篩選
var list3 = await helper.Query<Recharge, Recharge>(
recharge => recharge.Amount >= 500);
注:開始新手的我以為查詢過程是:先將bson文件反序列化成C#物件,再呼叫C#的lambda進行篩選。如果直接這樣,效率肯定很低,相當於總是要全盤掃描。而實際上是驅動內部把lambda函式解析成了Expression表示式,再解析翻譯為了原生json串查詢語法,相較於原本的json語法操作,只是犧牲了點翻譯時間。
統計查詢:
//先定義一個統計類物件
public class RechargeStat
{
public long UserId { get; set; }
public ulong Amount { get; set; }
}
//統計所有人的總充值
var stats = await helper.AggregateQuery<Recharge, Recharge, long, RechargeStat>(
recharge=>true, //直接返回true,即不篩選
recharge=>recharge.UserId,//按UserId統計,相當於sql的group by UserId
group =>new RechargeStat()
{
UserId = group.Key,
Amount = (ulong)group.Sum(v=>(double)v.Amount)
}
);