1. 程式人生 > >C#讀取Txt大資料並更新到資料庫

C#讀取Txt大資料並更新到資料庫

環境

   Sqlserver 2016

   .net 4.5.2

 

目前測試資料1300萬 大約3-4分鐘.(限制一次讀取條數 和 執行緒數是 要節省伺服器資源,如果調太大伺服器其它應用可能就跑不了了), SqlServerDBHelper為資料庫幫助類.沒有什麼特別的處理. 配置連線串時記錄把連線池開起來

另外.以下程式碼中每次寫都建立了連線 .之前試過一個連線反覆用. 130次大約有20多次 資料庫會出問題.並且需要的時間是7-8分鐘 左右. 

配置檔案: xxx.json

 [ {
    /*連線字串 */
    "ConnStr": "",
    "FilePath": "讀取的檔案地址",
    /*資料庫表名稱 */
    "TableName": "寫入的資料庫表名",
    /*匯入前執行的語句 */
    "ExecBeforeSql": "",
    /*匯入後執行的語句 */
    "ExecAfterSql": "",
    /*對映關係 */
    "Mapping": [
      {
        "DBName": "XXX",
        "TxtName": "DDD"
      }      
    ],
    /*過濾資料的正則 當前只實現了小資料一次性讀完的檢查*/
    "FilterRegex": [],
    /*檢查資料合法性(從資料庫獲取欄位屬性進行驗證) */
    "CheckData": false,
    /*列分隔符*/
    "Separator": "\t",
    /*表頭的行數*/
    "HeaderRowsNum": 1
  }
]

 

讀取程式碼 : 注意 ConfigurationManager.AppSettings["frpage"] 和 ConfigurationManager.AppSettings["fr"] 需要自己配置好

 

 //讀取配置檔案資訊
            List<dynamic> dt = JsonConvert.DeserializeObject<List<dynamic>>(File.ReadAllText(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "config\\ImportTxt.json")));
            LogUtil.Info("開始讀取txt資料,讀取配置:" + dt.Count + "條");
            if (dt.Count == 0)
            {
                return;
            }


            List<Task> li = new List<Task>();
            foreach (dynamic row in dt)
            {
                LogUtil.Info("開始處理資料:" + JsonConvert.SerializeObject(row));
                li.Add(ProcessRow(row));

            }
            Task.WaitAll(li.ToArray());
            LogUtil.Info("資料讀取完畢");
        public async Task ProcessRow(dynamic row)
        {
            await Task.Run(() =>
             {
                 AutoResetEvent AE = new AutoResetEvent(false);
                 DataTable Data = null;
                 string error = "", ConnStr, TableName, ExecBeforeSql, ExecAfterSql;
                 Boolean IsCheck = Convert.ToBoolean(row["CheckData"]);
                 TableName = Convert.ToString(row.TableName);
                 ConnStr = Convert.ToString(row.ConnStr);
                 ExecBeforeSql = Convert.ToString(row.ExecBeforeSql);
                 ExecAfterSql = Convert.ToString(row.ExecAfterSql);
                 int HeaderRowsNum = Convert.ToInt32(row.HeaderRowsNum);
                 string Separator = Convert.ToString(row.Separator);

                 Dictionary<string, string> dic = new Dictionary<string, string>();

                 //檔案達到多大時就分行讀取
                 int fr = 0;
                 if (!int.TryParse(ConfigurationManager.AppSettings["fr"], out fr))
                 {
                     fr = 100;
                 }
                 fr = fr * 1024 * 1024;

                 //分行讀取一次讀取多少
                 int page = 0;
                 if (!int.TryParse(ConfigurationManager.AppSettings["frpage"], out page))
                 {
                     page = 50000;
                 }

                 foreach (var dyn in row.Mapping)
                 {
                     dic.Add(Convert.ToString(dyn.TxtName), Convert.ToString(dyn.DBName));
                 }


                 List<string> regex = new List<string>();
                 foreach (string item in row["FilterRegex"])
                 {
                     regex.Add(item);
                 }
                 string fpath = "", cpath = "";




                 cpath = Convert.ToString(row["FilePath"]);
                 string rootPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "tmp");
                 if (!Directory.Exists(rootPath))
                 {
                     Directory.CreateDirectory(rootPath);
                 }

                 fpath = Path.Combine(rootPath, Path.GetFileName(cpath));
                 File.Copy(cpath, fpath, true);
                 LogUtil.Info("拷檔案到本地已經完成.從本地讀取資料操作");
                 int threadCount = Environment.ProcessorCount * 3;

                 FileInfo fi = new FileInfo(fpath);
                 //如果檔案大於100M就需要分批讀取.一次50萬條
                 if (fi.Length > fr)
                 {

                     long sumCount = 0;
                     StreamReader sr = new StreamReader(fi.OpenRead());                    
                     int headRow = 0;
                     string rowstr = "";

                     List<Thread> li_th = new List<Thread>();
                     bool last = false;
                     int ij = 0;
                     LogUtil.Info("生成StreamReader成功  ");
                     #region 逐行讀取
                     
                     
                     while (sr.Peek() > -1)
                     {
                         rowstr = sr.ReadLine();
                         #region 將行資料寫入DataTable
                         if (headRow < HeaderRowsNum)
                         {
                             Data = new DataTable();
                             foreach (string scol in rowstr.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries))
                             {
                                 Data.Columns.Add(scol.Trim(), typeof(string));
                             }
                             headRow++;
                             continue;
                         }
                         else
                         {  //行資料
                             if (headRow > 1)
                             {
                                 for (int i = 1; i < headRow && sr.Peek() > -1; i++)
                                 {
                                     rowstr += " " + sr.ReadLine();
                                 }
                             }
                             Data.Rows.Add(rowstr.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries));
                             if (Data.Rows.Count < page && sr.Peek() > -1)
                             {
                                 continue;
                             }
                         }
                         last = (sr.Peek() == -1);
                         #endregion

                         sumCount += Data.Rows.Count;

                         ProcessPath(Data, page, sr, ref ij, TableName, ExecBeforeSql, ExecAfterSql, dic, IsCheck, li_th);
                                                

                         #region 檢查執行緒等待
                         if ((ij > 0 && (ij % threadCount) == 0) || last)
                         {
                             LogUtil.Info("完成一批次當前共寫資料: " + sumCount);
                             while (true)
                             {
                                 bool isok = true;
                                 foreach (var item in li_th)
                                 {
                                     if (item.IsAlive)
                                     {
                                         isok = false;
                                         Application.DoEvents();
                                         Thread.Sleep(1000);
                                     }
                                 }
                                 if (isok)
                                 {
                                     li_th.Clear();
                                     break;
                                 }
                             }

                             //最後一頁要等所有的執行完才能執行
                             if (sr.Peek() == -1)
                             {
                                 WriteTODB(TableName, Data, ExecBeforeSql, ExecAfterSql, dic, false, true);
                                 LogUtil.Info("最後一次寫入完成");
                             }
                             LogUtil.Info(" 執行緒退出開始新的迴圈...");
                         }
                         Data.Clear();
                         #endregion
                     }
                     sr.Dispose();
                     #endregion
                 }
                 else
                 {
                     using (SQLServerDBHelper sdb = new SQLServerDBHelper())
                     {
                         sdb.OpenConnection();
                         #region 一次性讀取處理
                         Data = LoadDataTableFromTxt(fpath, ref error, Separator, HeaderRowsNum, regex, IsCheck, dic, TableName);
                         if (IsCheck)
                         {
                             DataRow[] rows = Data.Select("ErrorMsg is not null");
                             if (rows.Length > 0)
                             {
                                 LogUtil.Info($"讀取{TableName} 資料出錯 : {JsonConvert.SerializeObject(rows)}");
                                 return;
                             }
                         }

                         LogUtil.Info($"讀取{TableName} 的txt資料完成.共讀取資料:{Data.Rows.Count}條");
                         if (Data.Rows.Count == 0 || !string.IsNullOrWhiteSpace(error))
                         {
                             if (!string.IsNullOrWhiteSpace(error))
                             {
                                 LogUtil.Info("讀取資料出錯,地址:" + Convert.ToString(row["FilePath"]) + "  \r\n 錯誤:" + error);
                             }
                             return;
                         }
                         sdb.BgeinTransaction();
                         try
                         {
                             WriteTODB(TableName, Data, ExecBeforeSql, ExecAfterSql, dic, sdb: sdb);
                             sdb.CommitTransaction();
                             LogUtil.Info(TableName + "資料更新完畢 !!");
                         }
                         catch (Exception ex)
                         {

                             LogUtil.Info(TableName + " 更新資料出錯,錯誤:" + ex.Message + "  \r\n 堆疊:" + ex.StackTrace);
                             sdb.RollbackTransaction();
                         }
                         #endregion

                     }



                 }

                 GC.Collect();
             });

        }

        private void ProcessPath(DataTable Data, int page, StreamReader sr, ref int ij, string TableName, string ExecBeforeSql, string ExecAfterSql, Dictionary<string, string> dic, bool IsCheck, List<Thread> li_th)
        {
            int threadCount = Environment.ProcessorCount * 4;

            string error = "";
            PoolModel p = new PoolModel { TableName = TableName, ExecBeforeSql = ExecBeforeSql, ExecAfterSql = ExecAfterSql, dic = dic };
            p.Data = Data.Copy();
            if (IsCheck)
            {
                using (SQLServerDBHelper sdb = new SQLServerDBHelper())
                {
                    error = CheckData(Data, TableName, dic, sdb);
                }
                DataRow[] rows = Data.Select("ErrorMsg is not null");
                if (rows.Length > 0 || !string.IsNullOrWhiteSpace(error))
                {
                    LogUtil.Info($"讀取{TableName} 資料出錯 : {JsonConvert.SerializeObject(rows)}\r\n錯誤: " + error);
                    return;
                }
            }

            ij++;
            if (ij == 1)
            {

                WriteTODB(p.TableName, p.Data, p.ExecBeforeSql, p.ExecAfterSql, p.dic, true, false);
                LogUtil.Info("首次寫入完成");
            }

            else if (sr.Peek() > -1)
            {

                Thread t = new Thread(d =>
                {

                    PoolModel c = d as PoolModel;
                    try
                    {
                        WriteTODB(c.TableName, c.Data, c.ExecBeforeSql, c.ExecAfterSql, c.dic, false, false);                      
                    }
                    catch (ThreadAbortException)
                    {
                        LogUtil.Error("執行緒退出.................");
                    }
                    catch (Exception ex)
                    {

                        LogUtil.Error(c.TableName + "寫入資料失敗:" + ex.Message + "\r\n堆疊:" + ex.StackTrace + "\r\n 資料:   " + JsonConvert.SerializeObject(c.Data));
                        ExitApp();
                        return;
                    }

                });
                t.IsBackground = true;
                t.Start(p);
                li_th.Add(t);
            }

        }

        public void ExitApp()
        {
            Application.Exit();
        }

        public void WriteTODB(string TableName, DataTable Data, string ExecBeforeSql, string ExecAfterSql, Dictionary<string, string> dic, bool first = true, bool last = true, SQLServerDBHelper sdb = null)
        {
            bool have = false;
            if (sdb == null)
            {
                sdb = new SQLServerDBHelper();
                have = true;
            }

            if (first && !string.IsNullOrWhiteSpace(ExecBeforeSql))
            {
                LogUtil.Info(TableName + "執行前Sql :" + ExecBeforeSql);
                sdb.ExecuteNonQuery(ExecBeforeSql);
            }
            sdb.BulkCopy(Data, TableName, dic);
            if (last && !string.IsNullOrWhiteSpace(ExecAfterSql))
            {
                LogUtil.Info(TableName + "執行後Sql :" + ExecAfterSql);
                sdb.ExecuteNonQuery(ExecAfterSql);
            }
            LogUtil.Info(TableName + "本次執行完成 ");
            if (have)
            {
                sdb.Dispose();
            }
        }


        public string CheckData(DataTable dt, string dbTableName, Dictionary<string, string> dic, SQLServerDBHelper sdb)
        {
            if (string.IsNullOrWhiteSpace(dbTableName))
            {
                return "表名不能為空!";
            }
            if (dic.Count == 0)
            {
                return "對映關係資料不存在!";

            }

            List<string> errorMsg = new List<string>();
            List<string> Cols = new List<string>();
            dic.Foreach(c =>
            {
                if (!dt.Columns.Contains(c.Key))
                {
                    errorMsg.Add(c.Key);
                }
                Cols.Add(c.Key);
            });

            if (errorMsg.Count > 0)
            {
                return "資料列不完整,請與對映表的資料列數量保持一致!列:" + string.Join(",", errorMsg);
            }


            //如果行資料有錯誤資訊則新增到這一列的值裡
            dt.Columns.Add(new DataColumn("ErrorMsg", typeof(string)) { DefaultValue = "" });
            string sql = @"--獲取SqlServer中表結構
                SELECT syscolumns.name as ColName,systypes.name as DBType,syscolumns.isnullable,
                syscolumns.length
                FROM syscolumns, systypes
                WHERE syscolumns.xusertype = systypes.xusertype
                AND syscolumns.id = object_id(@tb) ; ";
            DataSet ds = sdb.GetDataSet(sql, new SqlParameter[] { new SqlParameter("@tb", dbTableName) });
            EnumerableRowCollection<DataRow> TableDef = ds.Tables[0].AsEnumerable();

            // string colName="";
            Object obj_val;

            //將表結構資料重組成字典.
            var dic_Def = TableDef.ToDictionary(c => Convert.ToString(c["ColName"]), d =>
            {
                string DBType = "";
                string old = Convert.ToString(d["DBType"]).ToUpper();
                DBType = GetCSharpType(old);
                return new { ColName = Convert.ToString(d["ColName"]), DBType = DBType, SqlType = old, IsNullble = Convert.ToBoolean(d["isnullable"]), Length = Convert.ToInt32(d["length"]) };
            });

            DateTime now = DateTime.Now;
            foreach (DataRow row in dt.Rows)
            {
                errorMsg.Clear();
                foreach (string colName in Cols)
                {
                    if (dic.ContainsKey(colName))
                    {
                        if (!dic_Def.ContainsKey(dic[colName]))
                        {
                            return "Excel列名:" + colName + " 對映資料表字段:" + dic[colName] + "在當前資料表中不存在!";
                        }
                        //去掉資料兩邊的空格
                        row[colName] = obj_val = Convert.ToString(row[colName]).Trim();
                        var info = dic_Def[dic[colName]];
                        //是否是DBNULL
                        if (obj_val.Equals(DBNull.Value))
                        {
                            if (!info.IsNullble)
                            {
                                errorMsg.Add("列" + colName + "不能為空!");

                            }
                        }
                        else
                        {
                            if (info.DBType == "String")
                            {
                                //time型別不用驗證長度(日期的 時間部分如 17:12:30.0000)
                                if (info.SqlType == "TIME")
                                {
                                    if (!DateTime.TryParse(now.ToString("yyyy-MM-dd") + " " + obj_val.ToString(), out now))
                                    {
                                        errorMsg.Add("列" + colName + "填寫的資料無效應為日期的時間部分如:17:30:12");

                                    }
                                }
                                else if (Convert.ToString(obj_val).Length > info.Length)
                                {
                                    errorMsg.Add("列" + colName + "長度超過配置長度:" + info.Length);
                                }
                            }
                            else
                            {
                                Type t = Type.GetType("System." + info.DBType);
                                try
                                {   //如果數字中有千分位在這一步可以處理掉重新給這個列賦上正確的數值                        
                                    row[colName] = Convert.ChangeType(obj_val, t); ;
                                }
                                catch (Exception ex)
                                {
                                    errorMsg.Add("列" + colName + "填寫的資料" + obj_val + "無效應為" + info.SqlType + "型別.");
                                }

                            }

                        }
                    }

                }
                row["ErrorMsg"] = string.Join(" || ", errorMsg);
            }

            return "";
        }

        /// <summary>
        /// wm 2018年11月28日13:37
        ///  將資料庫常用型別轉為C# 中的類名(.Net的型別名)
        /// </summary>
        /// <param name="old"></param>
        /// <returns></returns>
        private string GetCSharpType(string old)
        {
            string DBType = "";
            switch (old)
            {
                case "INT":
                case "BIGINT":
                case "SMALLINT":
                    DBType = "Int32";
                    break;
                case "DECIMAL":
                case "FLOAT":
                case "NUMERIC":
                    DBType = "Decimal";
                    break;
                case "BIT":
                    DBType = "Boolean";
                    break;
                case "TEXT":
                case "CHAR":
                case "NCHAR":
                case "VARCHAR":
                case "NVARCHAR":
                case "TIME":
                    DBType = "String";
                    break;
                case "DATE":
                case "DATETIME":
                    DBType = "DateTime";
                    break;
                default:
                    throw new Exception("GetCSharpType資料型別" + DBType + "無法識別!");

            }

            return DBType;
        }




    public class PoolModel
    {
        public string TableName { get; set; }
        public DataTable Data { get; set; }
        public string ExecBeforeSql { get; set; }
        public string ExecAfterSql { get; set; }
        public Dictionary<string, string> dic { get; set; }

    }

 

    /// <summary>
        /// wm 2018年11月28日13:32
        /// 獲取Txt資料並對資料進行校驗返回一個帶有ErrorMsg列的DataTable,如果資料校驗失敗則該欄位存放失敗的原因
        /// 注意:在使用該方法前需要資料表應該已經存在
        /// </summary>
        /// <param name="isCheck">是否校驗資料合法性(資料需要校驗則會按傳入的dbTableName獲取資料庫表的結構出來驗證)</param>
        /// <param name="map">如果需要驗證資料則此處需要傳對映關係   key Excel列名,Value 資料庫列名</param>
        /// <param name="dbTableName">驗證資料合法性的表(即資料會插入到的表)</param>
        /// <param name="error">非資料驗證上的異常返回</param>
        /// <param name="Regexs">用來過濾資料的正則</param>
        /// <param name="path">讀取檔案的路徑</param>
        /// <param name="Separator">列分隔符</param>
        /// <param name="HeaderRowsNum">表頭的行數</param>
        /// <returns>如果需求驗證則返回一個帶有ErrorMsg列的DataTable,如果資料校驗失敗則該欄位存放失敗的原因, 不需要驗證則資料讀取後直接返回DataTable</returns>
        public DataTable LoadDataTableFromTxt(string path, ref string error, string Separator, int HeaderRowsNum, List<string> Regexs = null, bool isCheck = false, Dictionary<string, string> map = null, string dbTableName = "", SQLServerDBHelper sdb = null)
        {
            DataTable dt = new DataTable();
            error = "";
            if (isCheck && (map == null || map.Count == 0 || string.IsNullOrWhiteSpace(dbTableName)))
            {
                error = "引數標明需要對錶格資料進行校驗,但沒有指定對映表集合或資料表名.";
                return dt;
            }
            string txts = File.ReadAllText(path);
            #region 把讀出來的方便資料轉成DataTable

            Regexs?.ForEach(c =>
            {
                txts = new Regex(c).Replace(txts, "");
            });
            ////替換掉多表的正則
            //Regex mu_re = new Regex(@"\+[-+]{4,}\s+\+[-+\s|\w./]{4,}\+"); //FTP new Regex(@"\+[-+]{4,}\s+\+[-+\s|\w./]{4,}\+"); //原來以-分隔的 new Regex(@"-{5,}(\s)+-{5,}\s+\|.+(\s)?\|.+(\s)?\|-{5,}");
            ////去掉所有橫線
            //Regex mu_r = new Regex(@"[+-]{4,}"); //FTP new Regex(@"[+-]{4,}"); //原  new Regex(@"(\|-{5,})|(-{5,})"); 
            //string s1 = mu_re.Replace(txts, "");
            //string s2 = mu_r.Replace(s1, "");
            // string[] tts = s2.Split(new string[] { "\r\n" }, StringSplitOptions.None);
            string[] tts = txts.Split(new string[] { "\r\n" }, StringSplitOptions.None);
            string[] vals;
            string s1;
            //生成表頭預設第一行時表頭直到遇到第一個只有一個|的內容為止(有幾行表頭,下面的內容就會有幾行)
            int headerNum = -1;//記錄表頭有幾列

            DataRow dr;
            //處理col重複的問題,如果有重複按第幾個來命名 比如  A1 A2 
            Dictionary<string, int> col_Rep = new Dictionary<string, int>();
            string colName = "";
            bool isre = false;//記錄當前是否有重複列
            int empty_HeaderRow = 0;
            for (int i = 0; i < tts.Length; i++)
            {
                s1 = tts[i];

                //還未獲取出表頭
                if (headerNum < HeaderRowsNum)
                {
                    vals = s1.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries);
                    foreach (string col in vals)
                    {
                        colName = col.Trim();

                        if (col_Rep.Keys.Contains(colName))
                        {
                            col_Rep[colName]++;
                            isre = true;
                            //重複列處理
                            //colName += col_Rep[colName];
                            continue;
                        }
                        else
                        {
                            col_Rep.Add(colName, 1);
                        }
                        dt.Columns.Add(colName, typeof(string));
                    }
                    headerNum = (i == (HeaderRowsNum - 1)) ? HeaderRowsNum : 0;
                }
                else
                {
                    if (string.IsNullOrWhiteSpace(s1.Trim()) || string.IsNullOrWhiteSpace(s1.Replace(Separator, "")))
                    {
                        continue;
                    }
                    if (isre)
                    {
                        error = "列:" + string.Join(",", col_Rep.Where(c => c.Value > 1).Select(c => c.Key)) + "存在重複";
                        return dt;
                    }


                    //多行時把多行的資料加在一起處理
                    if (headerNum > 1)
                    {
                        for (int j = 1; j < headerNum && (i + j) < tts.Length; j++)
                        {
                            //資料第一行最後沒有| 如果沒資料則直接換行了所以這裡補一個空格防止資料被當空資料移除了
                            s1 += " " + tts[i + j];
                        }
                    }
                    vals = s1.Split(new string[] { Separator }, StringSplitOptions.RemoveEmptyEntries);
                    dr = dt.NewRow();
                    dr.ItemArray = vals;
                    dt.Rows.Add(dr);
                    //因為本次迴圈結束上面會去++ 所以這裡只加headerNum-1次
                    i += (headerNum - 1);
                }

            }
            #endregion

            if (isCheck)
            {
                //dt.Columns.Remove("Item");
                //dt.Columns["Item1"].ColumnName = "Item";
                //dt.Columns.RemoveAt(dt.Columns.Count - 2);
                error = CheckData(dt, dbTableName, map, sdb);
            }

            return dt;

        }

&n