1. 程式人生 > >SQL Server 批量插入資料方案 SqlBulkCopy 的簡單封裝,讓批量插入更方便

SQL Server 批量插入資料方案 SqlBulkCopy 的簡單封裝,讓批量插入更方便

# 一、Sql Server插入方案介紹 * 關於 `SqlServer` 批量插入的方式,有三種比較常用的插入方式,`Insert`、`BatchInsert`、`SqlBulkCopy`,下面我們對比以下三種方案的速度 ## 1.普通的`Insert`插入方法 ```csharp public static void Insert(IEnumerable persons) { using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { con.Open(); foreach (var person in persons) { using (var com = new SqlCommand( "INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES(@Id,@Name,@Age,@CreateTime,@Sex)", con)) { com.Parameters.AddRange(new[] { new SqlParameter("@Id", SqlDbType.BigInt) {Value = person.Id}, new SqlParameter("@Name", SqlDbType.VarChar, 64) {Value = person.Name}, new SqlParameter("@Age", SqlDbType.Int) {Value = person.Age}, new SqlParameter("@CreateTime", SqlDbType.DateTime) {Value = person.CreateTime ?? (object) DBNull.Value}, new SqlParameter("@Sex", SqlDbType.Int) {Value = (int)person.Sex}, }); com.ExecuteNonQuery(); } } } } ``` ## 2.拼接`BatchInsert`插入語句 ```csharp public static void BatchInsert(Person[] persons) { using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { con.Open(); var pageCount = (persons.Length - 1) / 1000 + 1; for (int i = 0; i < pageCount; i++) { var personList = persons.Skip(i * 1000).Take(1000).ToArray(); var values = personList.Select(p => $"({p.Id},'{p.Name}',{p.Age},{(p.CreateTime.HasValue ? $"'{p.CreateTime:yyyy-MM-dd HH:mm:ss}'" : "NULL")},{(int) p.Sex})"); var insertSql = $"INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES{string.Join(",", values)}"; using (var com = new SqlCommand(insertSql, con)) { com.ExecuteNonQuery(); } } } } ``` ## 3.`SqlBulkCopy`插入方案 ```csharp public static void BulkCopy(IEnumerable persons) { using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { con.Open(); var table = new DataTable(); table.Columns.AddRange(new [] { new DataColumn("Id", typeof(long)), new DataColumn("Name", typeof(string)), new DataColumn("Age", typeof(int)), new DataColumn("CreateTime", typeof(DateTime)), new DataColumn("Sex", typeof(int)), }); foreach (var p in persons) { table.Rows.Add(new object[] {p.Id, p.Name, p.Age, p.CreateTime, (int) p.Sex}); } using (var copy = new SqlBulkCopy(con)) { copy.DestinationTableName = "Person"; copy.WriteToServer(table); } } } ``` ## 3.三種方案速度對比 | 方案 | 數量 | 時間| |--|--|--| | Insert | 1千條 | 145.4351ms | | BatchInsert| 1千條 | 103.9061ms | | SqlBulkCopy | 1千條 | 7.021ms | ||| | Insert | 1萬條 | 1501.326ms | | BatchInsert| 1萬條 | 850.6274ms | |SqlBulkCopy | 1萬條 | 30.5129ms| ||| | Insert |10萬條 | 13875.4934ms | | BatchInsert| 10萬條 | 8278.9056ms | |SqlBulkCopy | 10萬條 | 314.8402ms | * 兩者插入效率對比,`Insert`明顯比`SqlBulkCopy`要慢太多,大概20~40倍效能差距,下面我們將`SqlBulkCopy`封裝一下,讓批量插入更加方便 # 二、SqlBulkCopy封裝程式碼 ## 1.方法介紹 **批量插入擴充套件方法簽名** | 方法 | 方法引數| 介紹 | |--|--|--| | BulkCopy | | 同步的批量插入方法 | | | SqlConnection connection | sql server 連線物件 | | | IEnumerable\ source | 需要批量插入的資料來源 | | | string tableName = null | 插入表名稱【為NULL預設為實體名稱】 | | |int bulkCopyTimeout = 30 | 批量插入超時時間 | | | int batchSize = 0 | 寫入資料庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決於您的環境,尤其是行數和網路延遲。就個人而言,我將從BatchSize屬性設定為1000行開始,然後看看其效能如何。如果可行,那麼我將使行數加倍(例如增加到2000、4000等),直到效能下降或超時。否則,如果超時發生在1000,那麼我將行數減少一半(例如500),直到它起作用為止。】 | | | SqlBulkCopyOptions options = SqlBulkCopyOptions.Default | 批量複製引數 | | | SqlTransaction externalTransaction = null | 執行的事務物件 | | BulkCopyAsync | | 非同步的批量插入方法 | | | SqlConnection connection | sql server 連線物件 | | | IEnumerable\ source | 需要批量插入的資料來源 | | | string tableName = null | 插入表名稱【為NULL預設為實體名稱】 | | |int bulkCopyTimeout = 30 | 批量插入超時時間 | | | int batchSize = 0 | 寫入資料庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決於您的環境,尤其是行數和網路延遲。就個人而言,我將從BatchSize屬性設定為1000行開始,然後看看其效能如何。如果可行,那麼我將使行數加倍(例如增加到2000、4000等),直到效能下降或超時。否則,如果超時發生在1000,那麼我將行數減少一半(例如500),直到它起作用為止。】 | | | SqlBulkCopyOptions options = SqlBulkCopyOptions.Default | 批量複製引數 | | | SqlTransaction externalTransaction = null | 執行的事務物件 | * 這個方法主要解決了兩個問題: * 免去了手動構建`DataTable`或者`IDataReader`介面實現類,手動構建的轉換比較難以維護,如果修改欄位就得把這些地方都進行修改,特別是還需要將列舉型別特殊處理,轉換成他的基礎型別(預設`int`) * 不用親自建立`SqlBulkCopy`物件,和配置資料庫列的對映,和一些屬性的配置 * 此方案也是在我公司中使用,以滿足公司的批量插入資料的需求,例如第三方的對賬資料 * 此方法使用的是`Expression`動態生成資料轉換函式,其效率和手寫的原生程式碼差不多,和原生手寫程式碼相比,多餘的轉換損失很小【最大的效能損失都是在`值型別`拆裝箱上】 * 此方案和其他網上的方案有些不同的是:不是將`List`先轉換成`DataTable`,然後寫入`SqlBulkCopy`的,而是使用一個實現`IDataReader`的讀取器包裝`List`,每往`SqlBulkCopy`插入一行資料才會轉換一行資料 * **`IDataReader`方案和`DataTable`方案相比優點** * 效率高:`DataTable`方案需要先完全轉換後,才能交由`SqlBulkCopy`寫入資料庫,而`IDataReader`方案可以邊轉換邊交給`SqlBulkCopy`寫入資料庫(**例如:10萬資料插入速度可提升30%**) * 佔用記憶體少:`DataTable`方案需要先完全轉換後,才能交由`SqlBulkCopy`寫入資料庫,需要佔用大量記憶體,而`IDataReader`方案可以邊轉換邊交給`SqlBulkCopy`寫入資料庫,無須佔用過多記憶體 * 強大:因為是邊寫入邊轉換,而且`EnumerableReader`傳入的是一個迭代器,可以實現持續插入資料的效果 ## 2.實現原理 ### ① 實體Model與表對映 * 資料庫表程式碼 ```sql CREATE TABLE [dbo].[Person]( [Id] [BIGINT] NOT NULL, [Name] [VARCHAR](64) NOT NULL, [Age] [INT] NOT NULL, [CreateTime] [DATETIME] NULL, [Sex] [INT] NOT NULL, PRIMARY KEY CLUSTERED ( [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY] ``` * 實體類程式碼 ```csharp public class Person { public long Id { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime? CreateTime { get; set; } public Gender Sex { get; set; } } public enum Gender { Man = 0, Woman = 1 } ``` * 建立欄位對映【如果沒有此欄位對映會導致資料填錯位置,如果型別不對還會導致報錯】【因為:沒有此欄位對映預設是按照列序號對應插入的】 * 建立對映使用的`SqlBulkCopy`型別的`ColumnMappings`屬性來完成,資料列與資料庫中列的對映 ```csharp //建立批量插入物件 using (var copy = new SqlBulkCopy(connection, options, externalTransaction)) { foreach (var column in ModelToDataTable.Columns) { //建立欄位對映 copy.ColumnMappings.Add(column.ColumnName, column.ColumnName); } } ``` ### ② 實體轉換成資料行 * 將資料轉換成資料行採用的是:`反射`+`Expression`來完成 * 其中`反射`是用於獲取編寫`Expression`所需程式類,屬性等資訊 * 其中`Expression`是用於生成高效轉換函式 * 其中`ModelToDataTable`型別利用了靜態泛型類特性,實現泛型引數的快取效果 * 在`ModelToDataTable`的靜態建構函式中,生成轉換函式,獲取需要轉換的屬性資訊,並存入靜態只讀欄位中,完成快取 ### ③ 使用IDataReader插入資料的過載 * `EnumerableReader`是實現了`IDataReader`介面的讀取類,用於將模型物件,在迭代器中讀取出來,並轉換成資料行,可供`SqlBulkCopy`讀取 * `SqlBulkCopy`只會呼叫三個方法:`GetOrdinal`、`Read`、`GetValue` * 其中`GetOrdinal`只會在首行讀取每個列所代表序號【需要填寫:`SqlBulkCopy`型別的`ColumnMappings`屬性】 * 其中`Read`方法是迭代到下一行,並呼叫`ModelToDataTable.ToRowData.Invoke()`來將模型物件轉換成資料行`object[]` * 其中`GetValue`方法是獲取當前行指定下標位置的值 ## 3.完整程式碼 **擴充套件方法類** ```csharp public static class SqlConnectionExtension { /// /// 批量複製 ///
/// 插入的模型物件 /// 需要批量插入的資料來源 /// 資料庫連線物件 /// 插入表名稱【為NULL預設為實體名稱】 /// 插入超時時間 /// 寫入資料庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決於您的環境,尤其是行數和網路延遲。就個人而言,我將從BatchSize屬性設定為1000行開始,然後看看其效能如何。如果可行,那麼我將使行數加倍(例如增加到2000、4000等),直到效能下降或超時。否則,如果超時發生在1000,那麼我將行數減少一半(例如500),直到它起作用為止。】 /// 批量複製引數 /// 執行的事務物件
/// 插入數量 public static int BulkCopy(this SqlConnection connection, IEnumerable source, string tableName = null, int bulkCopyTimeout = 30, int batchSize = 0, SqlBulkCopyOptions options = SqlBulkCopyOptions.Default, SqlTransaction externalTransaction = null) { //建立讀取器 using (var reader = new EnumerableReader(source)) { //建立批量插入物件 using (var copy = new SqlBulkCopy(connection, options, externalTransaction)) { //插入的表 copy.DestinationTableName = tableName ?? typeof(TModel).Name; //寫入資料庫一批數量 copy.BatchSize = batchSize; //超時時間 copy.BulkCopyTimeout = bulkCopyTimeout; //建立欄位對映【如果沒有此欄位對映會導致資料填錯位置,如果型別不對還會導致報錯】【因為:沒有此欄位對映預設是按照列序號對應插入的】 foreach (var column in ModelToDataTable.Columns) { //建立欄位對映 copy.ColumnMappings.Add(column.ColumnName, column.ColumnName); } //將資料批量寫入資料庫 copy.WriteToServer(reader); //返回插入資料數量 return reader.Depth; } } } /// /// 批量複製-非同步 ///
/// 插入的模型物件 /// 需要批量插入的資料來源 /// 資料庫連線物件 /// 插入表名稱【為NULL預設為實體名稱】 /// 插入超時時間 /// 寫入資料庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決於您的環境,尤其是行數和網路延遲。就個人而言,我將從BatchSize屬性設定為1000行開始,然後看看其效能如何。如果可行,那麼我將使行數加倍(例如增加到2000、4000等),直到效能下降或超時。否則,如果超時發生在1000,那麼我將行數減少一半(例如500),直到它起作用為止。】 /// 批量複製引數 /// 執行的事務物件 /// 插入數量 public static async Task BulkCopyAsync(this SqlConnection connection, IEnumerable source, string tableName = null, int bulkCopyTimeout = 30, int batchSize = 0, SqlBulkCopyOptions options = SqlBulkCopyOptions.Default, SqlTransaction externalTransaction = null) { //建立讀取器 using (var reader = new EnumerableReader(source)) { //建立批量插入物件 using (var copy = new SqlBulkCopy(connection, options, externalTransaction)) { //插入的表 copy.DestinationTableName = tableName ?? typeof(TModel).Name; //寫入資料庫一批數量 copy.BatchSize = batchSize; //超時時間 copy.BulkCopyTimeout = bulkCopyTimeout; //建立欄位對映【如果沒有此欄位對映會導致資料填錯位置,如果型別不對還會導致報錯】【因為:沒有此欄位對映預設是按照列序號對應插入的】 foreach (var column in ModelToDataTable.Columns) { //建立欄位對映 copy.ColumnMappings.Add(column.ColumnName, column.ColumnName); } //將資料批量寫入資料庫 await copy.WriteToServerAsync(reader); //返回插入資料數量 return reader.Depth; } } } } ``` **封裝的迭代器資料讀取器** ```csharp /// /// 迭代器資料讀取器 /// /// 模型型別 public class EnumerableReader : IDataReader { /// /// 例項化迭代器讀取物件 /// /// 模型源 public EnumerableReader(IEnumerable source) { _source = source ?? throw new ArgumentNullException(nameof(source)); _enumerable = source.GetEnumerator(); } private readonly IEnumerable _source; private readonly IEnumerator _enumerable; private object[] _currentDataRow = Array.Empty(); private int _depth; private bool _release; public void Dispose() { _release = true; _enumerable.Dispose(); } public int GetValues(object[] values) { if (values == null) throw new ArgumentNullException(nameof(values)); var length = Math.Min(_currentDataRow.Length, values.Length); Array.Copy(_currentDataRow, values, length); return length; } public int GetOrdinal(string name) { for (int i = 0; i < ModelToDataTable.Columns.Count; i++) { if (ModelToDataTable.Columns[i].ColumnName == name) return i; } return -1; } public long GetBytes(int ordinal, long dataIndex, byte[] buffer, int bufferIndex, int length) { if (dataIndex < 0) throw new Exception($"起始下標不能小於0!"); if (bufferIndex < 0) throw new Exception("目標緩衝區起始下標不能小於0!"); if (length < 0) throw new Exception("讀取長度不能小於0!"); var numArray = (byte[])GetValue(ordinal); if (buffer == null) return numArray.Length; if (buffer.Length <= bufferIndex) throw new Exception("目標緩衝區起始下標不能大於目標緩衝區範圍!"); var freeLength = Math.Min(numArray.Length - bufferIndex, length); if (freeLength <= 0) return 0; Array.Copy(numArray, dataIndex, buffer, bufferIndex, length); return freeLength; } public long GetChars(int ordinal, long dataIndex, char[] buffer, int bufferIndex, int length) { if (dataIndex < 0) throw new Exception($"起始下標不能小於0!"); if (bufferIndex < 0) throw new Exception("目標緩衝區起始下標不能小於0!"); if (length < 0) throw new Exception("讀取長度不能小於0!"); var numArray = (char[])GetValue(ordinal); if (buffer == null) return numArray.Length; if (buffer.Length <= bufferIndex) throw new Exception("目標緩衝區起始下標不能大於目標緩衝區範圍!"); var freeLength = Math.Min(numArray.Length - bufferIndex, length); if (freeLength <= 0) return 0; Array.Copy(numArray, dataIndex, buffer, bufferIndex, length); return freeLength; } public bool IsDBNull(int i) { var value = GetValue(i); return value == null || value is DBNull; } public bool NextResult() { //移動到下一個元素 if (!_enumerable.MoveNext()) return false; //行層+1 Interlocked.Increment(ref _depth); //得到資料行 _currentDataRow = ModelToDataTable.ToRowData.Invoke(_enumerable.Current); return true; } public byte GetByte(int i) => (byte)GetValue(i); public string GetName(int i) => ModelToDataTable.Columns[i].ColumnName; public string GetDataTypeName(int i) => ModelToDataTable.Columns[i].DataType.Name; public Type GetFieldType(int i) => ModelToDataTable.Columns[i].DataType; public object GetValue(int i) => _currentDataRow[i]; public bool GetBoolean(int i) => (bool)GetValue(i); public char GetChar(int i) => (char)GetValue(i); public Guid GetGuid(int i) => (Guid)GetValue(i); public short GetInt16(int i) => (short)GetValue(i); public int GetInt32(int i) => (int)GetValue(i); public long GetInt64(int i) => (long)GetValue(i); public float GetFloat(int i) => (float)GetValue(i); public double GetDouble(int i) => (double)GetValue(i); public string GetString(int i) => (string)GetValue(i); public decimal GetDecimal(int i) => (decimal)GetValue(i); public DateTime GetDateTime(int i) => (DateTime)GetValue(i); public IDataReader GetData(int i) => throw new NotSupportedException(); public int FieldCount => ModelToDataTable.Columns.Count; public object this[int i] => GetValue(i); public object this[string name] => GetValue(GetOrdinal(name)); public void Close() => Dispose(); public DataTable GetSchemaTable() => ModelToDataTable.ToDataTable(_source); public bool Read() => NextResult(); public int Depth => _depth; public bool IsClosed => _release; public int RecordsAffected => 0; } ``` **模型物件轉資料行工具類** ```csharp /// /// 物件轉換成DataTable轉換類 /// /// 泛型型別 public static class ModelToDataTable { static ModelToDataTable() { //如果需要剔除某些列可以修改這段程式碼 var propertyList = typeof(TModel).GetProperties().Where(w => w.CanRead).ToArray(); Columns = new ReadOnlyCollection(propertyList .Select(pr => new DataColumn(pr.Name, GetDataType(pr.PropertyType))).ToArray()); //生成物件轉資料行委託 ToRowData = BuildToRowDataDelegation(typeof(TModel), propertyList); } /// /// 構建轉換成資料行委託 /// /// 傳入型別 /// 轉換的屬性 /// 轉換資料行委託 private static Func BuildToRowDataDelegation(Type type, PropertyInfo[] propertyList) { var source = Expression.Parameter(type); var items = propertyList.Select(property => ConvertBindPropertyToData(source, property)); var array = Expression.NewArrayInit(typeof(object), items); var lambda = Expression.Lambda>(array, source); return lambda.Compile(); } /// /// 將屬性轉換成資料 /// /// 源變數 /// 屬性資訊 /// 獲取屬性資料表示式 private static Expression ConvertBindPropertyToData(ParameterExpression source, PropertyInfo property) { var propertyType = property.PropertyType; var expression = (Expression)Expression.Property(source, property); if (propertyType.IsEnum) expression = Expression.Convert(expression, propertyType.GetEnumUnderlyingType()); return Expression.Convert(expression, typeof(object)); } /// /// 獲取資料型別 /// /// 屬性型別 /// 資料型別 private static Type GetDataType(Type type) { //列舉預設轉換成對應的值型別 if (type.IsEnum) return type.GetEnumUnderlyingType(); //可空型別 if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>)) return GetDataType(type.GetGenericArguments().First()); return type; } /// /// 列集合 /// public static IReadOnlyList Columns { get; } /// /// 物件轉資料行委託 /// public static Func ToRowData { get; } /// /// 集合轉換成DataTable /// /// 集合 /// 表名稱 /// 轉換完成的DataTable public static DataTable ToDataTable(IEnumerable source, string tableName = "TempTable") { //建立表物件 var table = new DataTable(tableName); //設定列 foreach (var dataColumn in Columns) { table.Columns.Add(new DataColumn(dataColumn.ColumnName, dataColumn.DataType)); } //迴圈轉換每一行資料 foreach (var item in source) { table.Rows.Add(ToRowData.Invoke(item)); } //返回表物件 return table; } } ``` # 三、測試封裝程式碼 ## 1.測試程式碼 **創表程式碼** ```sql CREATE TABLE [dbo].[Person]( [Id] [BIGINT] NOT NULL, [Name] [VARCHAR](64) NOT NULL, [Age] [INT] NOT NULL, [CreateTime] [DATETIME] NULL, [Sex] [INT] NOT NULL, PRIMARY KEY CLUSTERED ( [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY] ``` **實體類程式碼** * 定義的實體的屬性名稱需要和`SqlServer`列名稱型別對應 ```csharp public class Person { public long Id { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime? CreateTime { get; set; } public Gender Sex { get; set; } } public enum Gender { Man = 0, Woman = 1 } ``` **測試方法** ```csharp //生成10萬條資料 var persons = new Person[100000]; var random = new Random(); for (int i = 0; i < persons.Length; i++) { persons[i] = new Person { Id = i + 1, Name = "張三" + i, Age = random.Next(1, 128), Sex = (Gender)random.Next(2), CreateTime = random.Next(2) == 0 ? null : (DateTime?) DateTime.Now.AddSeconds(i) }; } //建立資料庫連線 using (var conn = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { conn.Open(); var sw = Stopwatch.StartNew(); //批量插入資料 var qty = conn.BulkCopy(persons); sw.Stop(); Console.WriteLine(sw.Elapsed.TotalMilliseconds + "ms"); } ``` **執行批量插入結果** ```bash 226.4767ms 請按任意鍵繼續. . . ``` ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20201128002635787.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzI1MTU0Nw==,size_16,color_FFFFFF,t_70) # 四、程式碼下載 GitHub程式碼地址:[https://github.com/liu-zhen-liang/PackagingComponentsSet/tree/main/SqlBulkCopyComponents](https://github.com/liu-zhen-liang/PackagingComponentsSet/tree/main/SqlBulkCopyComponents)