SqlBulkCopy 很簡單也很好用, 但關鍵是當資料來源表非常靈活而多變時(事先不知道有多少列,也不知道每1列的列名), 如何來做這件事呢?

1. 寫入表設定為101列, 第1列為批次,每寫入一批資料作為一批;

2. 寫入表中的除batch之外的列均為 nvarchar(max);

3. 寫入資料時要將來源表中的表頭存放到另一張表中, 便於以後的匯出。

A. 測試資料準備

--1. 資料來源表
IF OBJECT_ID('source_table') IS NOT NULL
	DROP TABLE source_table
GO
CREATE TABLE source_table
(
	school NVARCHAR(100),
	account VARCHAR(200),
	cnt_ct  INT,
	createTime DATETIME
)
GO
INSERT INTO source_table
SELECT '春蘭小學','liuming',25,'2005-10-23' union
SELECT '春蘭大學','leaf',25,'2005-10-20'
GO
--2. 目標表
IF OBJECT_ID('target_table') IS NOT NULL
	DROP TABLE target_table
GO
CREATE TABLE target_table
(
	batch BIGINT,
	C1 NVARCHAR(MAX),
	C2 NVARCHAR(MAX),
	C3 NVARCHAR(MAX),
	C4 NVARCHAR(MAX),
	C5 NVARCHAR(MAX),
	C6 NVARCHAR(MAX),
	C7 NVARCHAR(MAX),
	C8 NVARCHAR(MAX),
	C9 NVARCHAR(MAX),
	C10 NVARCHAR(MAX)
)
GO
--3. 表頭表
IF OBJECT_ID('header_table') IS NOT NULL
	DROP TABLE header_table
GO
CREATE TABLE header_table
(
	id BIGINT IDENTITY(1,1),
	batch BIGINT,
	columnName NVARCHAR(MAX),
	columnIndex int
)
GO

2. DBHelper: 見: 點選開啟連結

3. 測試控制檯程式:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data.SqlClient;
using System.Collections;
using System.Data;
using Common;

namespace ConsoleApplication1
{
    class Program_6
    {
        static void Main(string[] args)
        {
            SqlConnection ConnectionNew = new SqlConnection("Data Source=leaf-home\\sqlserver2005;Initial Catalog=Test;Persist Security Info=True;User ID=??;Password=??");
            SqlConnection ConnectionOld = new SqlConnection("Data Source=leaf-home\\sqlserver2005;Initial Catalog=Test;Persist Security Info=True;User ID=??;Password=??");
            try
            {
                ConnectionNew.Open();
                ConnectionOld.Open();

                //1.在舊錶中,用SqlDataAdapter讀取出資訊  
                string SQL = "select school 學校,account 賬號,cnt_ct 簡訊量, createTime 建立日期 from source_table";

                DataTable dt = Common.DBHelper.GetDataTableBySql(SQL);      //資料來源表
                DataTable dt_clone = dt.Clone();                            //複製表, 用於加多一列, 並將其它列改為String型別

                long batch = 3;
                DataColumn dcBatch = new DataColumn("batch");
                dcBatch.DataType = typeof(long);// Type.GetType("System.Int64");
                dt_clone.Columns.Add(dcBatch);
                dcBatch.SetOrdinal(0);  //設定為第1列 非常重要

                for (int i = 1; i < dt.Columns.Count; i++)
                {
                    dt_clone.Columns[i].DataType = Type.GetType("System.String");
                }

                foreach (DataRow dr in dt.Rows)
                {
                    DataRow dr_clone = dt_clone.NewRow();
                    foreach (DataColumn dc in dt.Columns)
                    {

                        dr_clone[dc.ColumnName] = dr[dc.ColumnName];
                    }
                    dr_clone[0] = batch;
                    dt_clone.Rows.Add(dr_clone);
                }
                //將表頭存入表頭表中, 方便以後匯出時使用
                int dcIdx = 1;
                foreach (DataColumn dc in dt.Columns)
                {
                    SqlParameter[] spArr = new SqlParameter[]{
                        new SqlParameter("columnName", dc.ColumnName),
                        new SqlParameter("batch",Convert.ToInt64(batch)).SetDbType(DbType.Int64),
                        new SqlParameter("columnIndex",dcIdx++)};

                    string sql = @"
if not exists(select 1 from header_table where [email protected] and [email protected])
begin
    insert into header_table (columnName,batch,columnIndex) select @columnName,@batch,@columnIndex
end
";
                    Common.DBHelper.ExecuteNonQuery(sql, spArr);
                }

                //2.初始化SqlBulkCopy物件,用新的連線作為引數。  
                SqlBulkCopy bulkCopy = new SqlBulkCopy(ConnectionNew);

                //3.寫對應關係。如舊錶的People列的資料,對應新表Human列,那麼就寫bulkCopy.ColumnMappings.Add("People","Human")  
                //如果兩張表的結構一樣,那麼對應關係就不用寫了。  
                //我是用雜湊表儲存對應關係的,雜湊表作為引數到傳入方法中,key的值用來儲存舊錶的欄位名,VALUE的值用來儲存新表的值  
                Hashtable ht = new Hashtable();
                ht.Add("batch", "batch");
                int j = 1;
                foreach (DataColumn dc in dt.Columns)
                {
                    ht.Add(dc.ColumnName, "C" + (j++).ToString());

                }

                foreach (string str in ht.Keys)
                {
                    bulkCopy.ColumnMappings.Add(str, ht[str].ToString());
                }

                //4.設定目標表名  
                bulkCopy.DestinationTableName = "target_table";

                //額外,可不寫:設定一次性處理的行數。這個行數處理完後,會激發SqlRowsCopied()方法。預設為1  
                bulkCopy.NotifyAfter = 1;

                //額外,可不寫:設定激發的SqlRowsCopied()方法,這裡為bulkCopy_SqlRowsCopied  
                bulkCopy.SqlRowsCopied += new SqlRowsCopiedEventHandler(bulkCopy_SqlRowsCopied);

                //OK,開始傳資料!  
                bulkCopy.WriteToServer(dt_clone);
                Console.Write("傳輸完畢!");
                Console.Read();
            }
            catch (Exception ex)
            {
                Console.Write(ex.Message);
                Console.Read();
            }
            finally
            {
                ConnectionNew.Close();
                ConnectionOld.Close();
            }
        }

        //激發的方法寫在外頭  
        private static void bulkCopy_SqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
        {
            //執行的內容。  
            //這裡有2個元素值得拿來用  
            //e.RowsCopied,  //返回數值型別,表示當前已經複製的行數  
            //e.Abort,       //用於賦值true or false,用於停止賦值的操作  
            Console.WriteLine("當前已複製的行數:" + e.RowsCopied);
        }
    }
}


將上面的程式碼 batch 改為3(4個欄位), 4 (3個欄位), 執行兩次後結果如下:

很明顯, 在任意變化的情況下, 為我們實現了資料的轉移。

當然, 實際的資料採集過程, 可能還會複雜一些:比如同一批資料會有多次傳入。不過大體的思路可以定下來的了。



.