SQL Server 批量插入資料方案 SqlBulkCopy 的簡單封裝,讓批量插入更方便
阿新 • • 發佈:2020-12-08
# 一、Sql Server插入方案介紹
* 關於 `SqlServer` 批量插入的方式,有三種比較常用的插入方式,`Insert`、`BatchInsert`、`SqlBulkCopy`,下面我們對比以下三種方案的速度
## 1.普通的`Insert`插入方法
```csharp
public static void Insert(IEnumerable persons)
{
using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;"))
{
con.Open();
foreach (var person in persons)
{
using (var com = new SqlCommand(
"INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES(@Id,@Name,@Age,@CreateTime,@Sex)",
con))
{
com.Parameters.AddRange(new[]
{
new SqlParameter("@Id", SqlDbType.BigInt) {Value = person.Id},
new SqlParameter("@Name", SqlDbType.VarChar, 64) {Value = person.Name},
new SqlParameter("@Age", SqlDbType.Int) {Value = person.Age},
new SqlParameter("@CreateTime", SqlDbType.DateTime)
{Value = person.CreateTime ?? (object) DBNull.Value},
new SqlParameter("@Sex", SqlDbType.Int) {Value = (int)person.Sex},
});
com.ExecuteNonQuery();
}
}
}
}
```
## 2.拼接`BatchInsert`插入語句
```csharp
public static void BatchInsert(Person[] persons)
{
using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;"))
{
con.Open();
var pageCount = (persons.Length - 1) / 1000 + 1;
for (int i = 0; i < pageCount; i++)
{
var personList = persons.Skip(i * 1000).Take(1000).ToArray();
var values = personList.Select(p =>
$"({p.Id},'{p.Name}',{p.Age},{(p.CreateTime.HasValue ? $"'{p.CreateTime:yyyy-MM-dd HH:mm:ss}'" : "NULL")},{(int) p.Sex})");
var insertSql =
$"INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES{string.Join(",", values)}";
using (var com = new SqlCommand(insertSql, con))
{
com.ExecuteNonQuery();
}
}
}
}
```
## 3.`SqlBulkCopy`插入方案
```csharp
public static void BulkCopy(IEnumerable persons)
{
using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;"))
{
con.Open();
var table = new DataTable();
table.Columns.AddRange(new []
{
new DataColumn("Id", typeof(long)),
new DataColumn("Name", typeof(string)),
new DataColumn("Age", typeof(int)),
new DataColumn("CreateTime", typeof(DateTime)),
new DataColumn("Sex", typeof(int)),
});
foreach (var p in persons)
{
table.Rows.Add(new object[] {p.Id, p.Name, p.Age, p.CreateTime, (int) p.Sex});
}
using (var copy = new SqlBulkCopy(con))
{
copy.DestinationTableName = "Person";
copy.WriteToServer(table);
}
}
}
```
## 3.三種方案速度對比
| 方案 | 數量 | 時間|
|--|--|--|
| Insert | 1千條 | 145.4351ms |
| BatchInsert| 1千條 | 103.9061ms |
| SqlBulkCopy | 1千條 | 7.021ms |
|||
| Insert | 1萬條 | 1501.326ms |
| BatchInsert| 1萬條 | 850.6274ms |
|SqlBulkCopy | 1萬條 | 30.5129ms|
|||
| Insert |10萬條 | 13875.4934ms |
| BatchInsert| 10萬條 | 8278.9056ms |
|SqlBulkCopy | 10萬條 | 314.8402ms |
* 兩者插入效率對比,`Insert`明顯比`SqlBulkCopy`要慢太多,大概20~40倍效能差距,下面我們將`SqlBulkCopy`封裝一下,讓批量插入更加方便
# 二、SqlBulkCopy封裝程式碼
## 1.方法介紹
**批量插入擴充套件方法簽名**
| 方法 | 方法引數| 介紹 |
|--|--|--|
| BulkCopy | | 同步的批量插入方法 |
| | SqlConnection connection | sql server 連線物件 |
| | IEnumerable\ source | 需要批量插入的資料來源 |
| | string tableName = null | 插入表名稱【為NULL預設為實體名稱】 |
| |int bulkCopyTimeout = 30 | 批量插入超時時間 |
| | int batchSize = 0 | 寫入資料庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決於您的環境,尤其是行數和網路延遲。就個人而言,我將從BatchSize屬性設定為1000行開始,然後看看其效能如何。如果可行,那麼我將使行數加倍(例如增加到2000、4000等),直到效能下降或超時。否則,如果超時發生在1000,那麼我將行數減少一半(例如500),直到它起作用為止。】 |
| | SqlBulkCopyOptions options = SqlBulkCopyOptions.Default | 批量複製引數 |
| | SqlTransaction externalTransaction = null | 執行的事務物件 |
| BulkCopyAsync | | 非同步的批量插入方法 |
| | SqlConnection connection | sql server 連線物件 |
| | IEnumerable\ source | 需要批量插入的資料來源 |
| | string tableName = null | 插入表名稱【為NULL預設為實體名稱】 |
| |int bulkCopyTimeout = 30 | 批量插入超時時間 |
| | int batchSize = 0 | 寫入資料庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決於您的環境,尤其是行數和網路延遲。就個人而言,我將從BatchSize屬性設定為1000行開始,然後看看其效能如何。如果可行,那麼我將使行數加倍(例如增加到2000、4000等),直到效能下降或超時。否則,如果超時發生在1000,那麼我將行數減少一半(例如500),直到它起作用為止。】 |
| | SqlBulkCopyOptions options = SqlBulkCopyOptions.Default | 批量複製引數 |
| | SqlTransaction externalTransaction = null | 執行的事務物件 |
* 這個方法主要解決了兩個問題:
* 免去了手動構建`DataTable`或者`IDataReader`介面實現類,手動構建的轉換比較難以維護,如果修改欄位就得把這些地方都進行修改,特別是還需要將列舉型別特殊處理,轉換成他的基礎型別(預設`int`)
* 不用親自建立`SqlBulkCopy`物件,和配置資料庫列的對映,和一些屬性的配置
* 此方案也是在我公司中使用,以滿足公司的批量插入資料的需求,例如第三方的對賬資料
* 此方法使用的是`Expression`動態生成資料轉換函式,其效率和手寫的原生程式碼差不多,和原生手寫程式碼相比,多餘的轉換損失很小【最大的效能損失都是在`值型別`拆裝箱上】
* 此方案和其他網上的方案有些不同的是:不是將`List`先轉換成`DataTable`,然後寫入`SqlBulkCopy`的,而是使用一個實現`IDataReader`的讀取器包裝`List`,每往`SqlBulkCopy`插入一行資料才會轉換一行資料
* **`IDataReader`方案和`DataTable`方案相比優點**
* 效率高:`DataTable`方案需要先完全轉換後,才能交由`SqlBulkCopy`寫入資料庫,而`IDataReader`方案可以邊轉換邊交給`SqlBulkCopy`寫入資料庫(**例如:10萬資料插入速度可提升30%**)
* 佔用記憶體少:`DataTable`方案需要先完全轉換後,才能交由`SqlBulkCopy`寫入資料庫,需要佔用大量記憶體,而`IDataReader`方案可以邊轉換邊交給`SqlBulkCopy`寫入資料庫,無須佔用過多記憶體
* 強大:因為是邊寫入邊轉換,而且`EnumerableReader`傳入的是一個迭代器,可以實現持續插入資料的效果
## 2.實現原理
### ① 實體Model與表對映
* 資料庫表程式碼
```sql
CREATE TABLE [dbo].[Person](
[Id] [BIGINT] NOT NULL,
[Name] [VARCHAR](64) NOT NULL,
[Age] [INT] NOT NULL,
[CreateTime] [DATETIME] NULL,
[Sex] [INT] NOT NULL,
PRIMARY KEY CLUSTERED
(
[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
```
* 實體類程式碼
```csharp
public class Person
{
public long Id { get; set; }
public string Name { get; set; }
public int Age { get; set; }
public DateTime? CreateTime { get; set; }
public Gender Sex { get; set; }
}
public enum Gender
{
Man = 0,
Woman = 1
}
```
* 建立欄位對映【如果沒有此欄位對映會導致資料填錯位置,如果型別不對還會導致報錯】【因為:沒有此欄位對映預設是按照列序號對應插入的】
* 建立對映使用的`SqlBulkCopy`型別的`ColumnMappings`屬性來完成,資料列與資料庫中列的對映
```csharp
//建立批量插入物件
using (var copy = new SqlBulkCopy(connection, options, externalTransaction))
{
foreach (var column in ModelToDataTable.Columns)
{
//建立欄位對映
copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
}
```
### ② 實體轉換成資料行
* 將資料轉換成資料行採用的是:`反射`+`Expression`來完成
* 其中`反射`是用於獲取編寫`Expression`所需程式類,屬性等資訊
* 其中`Expression`是用於生成高效轉換函式
* 其中`ModelToDataTable`型別利用了靜態泛型類特性,實現泛型引數的快取效果
* 在`ModelToDataTable`的靜態建構函式中,生成轉換函式,獲取需要轉換的屬性資訊,並存入靜態只讀欄位中,完成快取
### ③ 使用IDataReader插入資料的過載
* `EnumerableReader`是實現了`IDataReader`介面的讀取類,用於將模型物件,在迭代器中讀取出來,並轉換成資料行,可供`SqlBulkCopy`讀取
* `SqlBulkCopy`只會呼叫三個方法:`GetOrdinal`、`Read`、`GetValue`
* 其中`GetOrdinal`只會在首行讀取每個列所代表序號【需要填寫:`SqlBulkCopy`型別的`ColumnMappings`屬性】
* 其中`Read`方法是迭代到下一行,並呼叫`ModelToDataTable.ToRowData.Invoke()`來將模型物件轉換成資料行`object[]`
* 其中`GetValue`方法是獲取當前行指定下標位置的值
## 3.完整程式碼
**擴充套件方法類**
```csharp
public static class SqlConnectionExtension
{
///
/// 批量複製
///
/// 插入的模型物件
/// 需要批量插入的資料來源
/// 資料庫連線物件
/// 插入表名稱【為NULL預設為實體名稱】
/// 插入超時時間
/// 寫入資料庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決於您的環境,尤其是行數和網路延遲。就個人而言,我將從BatchSize屬性設定為1000行開始,然後看看其效能如何。如果可行,那麼我將使行數加倍(例如增加到2000、4000等),直到效能下降或超時。否則,如果超時發生在1000,那麼我將行數減少一半(例如500),直到它起作用為止。】
/// 批量複製引數
/// 執行的事務物件
/// 插入數量
public static int BulkCopy(this SqlConnection connection,
IEnumerable source,
string tableName = null,
int bulkCopyTimeout = 30,
int batchSize = 0,
SqlBulkCopyOptions options = SqlBulkCopyOptions.Default,
SqlTransaction externalTransaction = null)
{
//建立讀取器
using (var reader = new EnumerableReader(source))
{
//建立批量插入物件
using (var copy = new SqlBulkCopy(connection, options, externalTransaction))
{
//插入的表
copy.DestinationTableName = tableName ?? typeof(TModel).Name;
//寫入資料庫一批數量
copy.BatchSize = batchSize;
//超時時間
copy.BulkCopyTimeout = bulkCopyTimeout;
//建立欄位對映【如果沒有此欄位對映會導致資料填錯位置,如果型別不對還會導致報錯】【因為:沒有此欄位對映預設是按照列序號對應插入的】
foreach (var column in ModelToDataTable.Columns)
{
//建立欄位對映
copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
//將資料批量寫入資料庫
copy.WriteToServer(reader);
//返回插入資料數量
return reader.Depth;
}
}
}
///
/// 批量複製-非同步
///
/// 插入的模型物件
/// 需要批量插入的資料來源
/// 資料庫連線物件
/// 插入表名稱【為NULL預設為實體名稱】
/// 插入超時時間
/// 寫入資料庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決於您的環境,尤其是行數和網路延遲。就個人而言,我將從BatchSize屬性設定為1000行開始,然後看看其效能如何。如果可行,那麼我將使行數加倍(例如增加到2000、4000等),直到效能下降或超時。否則,如果超時發生在1000,那麼我將行數減少一半(例如500),直到它起作用為止。】
/// 批量複製引數
/// 執行的事務物件
/// 插入數量
public static async Task BulkCopyAsync(this SqlConnection connection,
IEnumerable source,
string tableName = null,
int bulkCopyTimeout = 30,
int batchSize = 0,
SqlBulkCopyOptions options = SqlBulkCopyOptions.Default,
SqlTransaction externalTransaction = null)
{
//建立讀取器
using (var reader = new EnumerableReader(source))
{
//建立批量插入物件
using (var copy = new SqlBulkCopy(connection, options, externalTransaction))
{
//插入的表
copy.DestinationTableName = tableName ?? typeof(TModel).Name;
//寫入資料庫一批數量
copy.BatchSize = batchSize;
//超時時間
copy.BulkCopyTimeout = bulkCopyTimeout;
//建立欄位對映【如果沒有此欄位對映會導致資料填錯位置,如果型別不對還會導致報錯】【因為:沒有此欄位對映預設是按照列序號對應插入的】
foreach (var column in ModelToDataTable.Columns)
{
//建立欄位對映
copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
//將資料批量寫入資料庫
await copy.WriteToServerAsync(reader);
//返回插入資料數量
return reader.Depth;
}
}
}
}
```
**封裝的迭代器資料讀取器**
```csharp
///
/// 迭代器資料讀取器
///
/// 模型型別
public class EnumerableReader : IDataReader
{
///
/// 例項化迭代器讀取物件
///
/// 模型源
public EnumerableReader(IEnumerable source)
{
_source = source ?? throw new ArgumentNullException(nameof(source));
_enumerable = source.GetEnumerator();
}
private readonly IEnumerable _source;
private readonly IEnumerator _enumerable;
private object[] _currentDataRow = Array.Empty