1. 程式人生 > >內網 大資料同步至外網(雲伺服器) 一

內網 大資料同步至外網(雲伺服器) 一

描述:內網資料庫 多表大量資料  需要同步至外網。

思路方案:1.window定時任務,每天凌晨1點 (多執行緒或者單執行緒) 同步前一天的資料。

            2.因可能會出現同步失敗的 情況,以及會設定到修改資料,內網資料庫各表要有 建立時間和修改時間,外網結構 也要有建立時間和修改時間,方便資料核對,同時 批次插入或修改資料庫時,若失敗 因郵件或日誌檔案通知開發人員。

          3.內網資料庫 多張表,需要封裝一個公共的獲取 資料集合的方法(此方法 分頁獲取表 資料,使用類反射 呼叫類裡面的方法)

          4.內網資料 同步到外網,用http協議傳送 post 資料 請求,外網(雲伺服器)上 要做安全驗證 (僅處理指定ip地址的 請求)

          5.外網(雲伺服器)上 資料批量儲存,使用 SqlBulkCopy,封裝 一個公共的批量儲存方法。(若是修改資料 則先刪除原來的資料再新增)。

核心程式碼:

a.實體物件,繼承自basePush

    public class basePush
    {
        /// <summary>
        /// 資料庫目標表資料列名 對應關係
        /// </summary>
        /// <returns></returns>
        public static Dictionary<string, string> DictionaryColumn()
        {
            Dictionary<string, string> ls = new Dictionary<string, string>();
            return ls;
        }
    }

    [Serializable]
    public class DiseasebuweiInfo : basePush
    {
        public string disname { get; set; }

        public string bw { get; set; }

        public int cnt { get; set; }

        public Int64 rn { get; set; }

        public DateTime? CreateTime { get; set; }

        public DateTime? UpdateTime { get; set; }
    }

b.任務方法

	/// <summary>
        /// 
        /// 每天 凌晨1點後執行
        /// </summary>
        private static void TaskRunDiseasebuwei()
        {
            CHC.DAL.Log.Loger.Log("開始->同步Diseasebuwei:TaskRunDiseasebuwei", "系統任務");
            GetPostList<DiseasebuweiInfo>("DiseasebuweiInfo", "update"); //修改
        }
		private static void TaskRunSysdisease()
        {
            CHC.DAL.Log.Loger.Log("開始->同步SysdiseaseInfo:TaskRunSysdisease", "系統任務");
            GetPostList<SysdiseaseInfo>("SysdiseaseInfo", "create");//新增
        }

c.獲取資料集合的統一方法

        //List物件 post資料
        private static void GetPostList<T>(string name, string typeName)
        {
            int page = 1;
            int rows = 5000;
            int total = 0;
            int pageCount = 1; //資料庫 取資料來源 ,每次同步1000個
            string strName = typeof(T).Name;
            try
            {
                //DateTime datetime = DateTime.Now;
                //前一天的
                string DayOfDay = LastRunDate.ToString("yyyy-MM-dd HH:00:00");
                while (page <= pageCount)
                {
                    Type typ = typeof(TaskPushDAL);
                    MethodInfo methstr = typ.GetMethod(string.Format("Get{0}List", name));
                    object[] objPara = new object[] { DayOfDay, (page - 1) * rows + 1, page * rows, total, typeName };
                    List<T> infoList = methstr.Invoke(null, objPara) as List<T>;
                    total = (int)objPara[3];
                    //List<DiseasebuweiInfo> infoList = TaskPushDAL.GetDiseasebuweiInfoList(DayOfDay, (page - 1) * rows + 1, page * rows, out total);
                    if (page == 1)
                    {
                        pageCount = (total / rows) + (total % rows != 0 ? 1 : 0);
                    }
                    if (infoList != null && infoList.Count > 0)
                    {
                        //post 資料
                        string result = string.Empty;
                        result = ResponseTask(infoList, name, typeName);
                        //通知成功
                        if (result.Contains("ok"))
                        {
                            CHC.DAL.Log.Loger.Log(string.Format("{0}通知成功!===result:{1}", strName, result));
                        }
                        else
                        {
                            CHC.DAL.Log.Loger.Log(string.Format("{0}通知失敗!===result:{1},當前頁數:{2},操作型別:{3}",
								strName, result, page, typeName));
                        }
                    }
                    page = page + 1;
                }
            }
            catch (Exception ex)
            {
                CHC.DAL.Log.Loger.Error(ex, string.Format("同步{0}異常", strName));
            }
        }

       //訪問資料庫 分頁獲取資料 
	public static List<DiseasebuweiInfo> GetDiseasebuweiInfoList(string DayOfDay, int pageStart, int pageEnd, out int total,string typename)
        {
            StringBuilder sql = new StringBuilder();
            sql.AppendFormat(@"select COUNT(1) from [tb_diseasebuwei] a  where {1}>'{0}' ", DayOfDay, 
				typename == "update" ? "a.UpdateTime" : "a.CreateTime");
            total = SqlUtility.ExecuteScalar<int>(_connectionBig, sql.ToString(), null);
            string s = string.Format(@"select * from (select a.disname,a.bw,a.cnt,a.rn,dn=ROW_NUMBER()OVER(ORDER BY a.disname) from  
                                    [tb_diseasebuwei] a with(nolock) where {3}>'{0}' )tmp where dn between {1} and {2} "
                , DayOfDay, pageStart, pageEnd, typename == "update" ? "a.UpdateTime" : "a.CreateTime");
            return SqlUtility.ExecuteObjectList<DiseasebuweiInfo>(_connectionBig, s, null);
        }
//修改時,先刪除舊資料
        public static string DeleteDiseasebuweiInfo(List<DiseasebuweiInfo> list)
        {
            string strs = string.Join("','", list.Select(v => v.disname));
            string sql = string.Format("delete from tb_diseasebuwei where disname in('{0}')", strs);
            return sql;
        }

d.post資料

        //傳送 post 資料 請求
        private static string ResponseTask<T>(List<T> list, string Name, string typeName)
        {
            string result = string.Empty;
            try
            {
                string serverURL = string.Empty;
                if (ConfigurationManager.AppSettings["serverURL"] != null)
                {
                    serverURL = ConfigurationManager.AppSettings["serverURL"];
                    serverURL = serverURL + NotifyUrl;
                }
                if (list != null && list.Count > 0 && !string.IsNullOrEmpty(serverURL) && !string.IsNullOrEmpty(Name))
                {
                    string ModelList = Newtonsoft.Json.JsonConvert.SerializeObject(list,
                      new Newtonsoft.Json.Converters.IsoDateTimeConverter() { DateTimeFormat = "yyyy-MM-dd HH:mm:ss" });
                    TaskData dataInfo = new TaskData() { MothodName = Name, ModelList = ModelList, ModeType = typeName };
                    string param = Newtonsoft.Json.JsonConvert.SerializeObject(dataInfo);
                    result = GetHTMLByPost(serverURL, param).ToLower();
                }
                else
                {
                    result = "引數不能為空";
                }
            }
            catch (Exception ex)
            {
                LogUtil.LogError("傳送 post 資料 請求異常", ex);
                result = ex.Message;
            }
            return result;
        }

	public static string GetHTMLByPost(string Url, string Params)
        {
            string result = "";
            string encoding = "utf-8";
            byte[] bytes = Encoding.GetEncoding(encoding).GetBytes(Params);
            HttpWebRequest httpWebRequest = (HttpWebRequest)WebRequest.Create(Url);
            httpWebRequest.Method = "POST";
            httpWebRequest.ContentType = "application/x-www-form-urlencoded";
            httpWebRequest.ContentLength = (long)bytes.Length;
            using (Stream requestStream = httpWebRequest.GetRequestStream())
            {
                requestStream.Write(bytes, 0, bytes.Length);
            }
            using (WebResponse response = httpWebRequest.GetResponse())
            {
                StreamReader streamReader = new StreamReader(response.GetResponseStream(), Encoding.UTF8);
                result = streamReader.ReadToEnd();
                response.Close();
            }
            return result;
        }

e.雲伺服器 接收資料

    public class PageTask : PageAjaxHandler
    {
        private string checkIp = "127.0.0.1";//指定的ip

        public PageTask(HttpContext context) : base(context) {
            if (ConfigurationManager.AppSettings["checkIp"] != null)
            {
                checkIp = ConfigurationManager.AppSettings["checkIp"];
            } 
        }
        public void SetData()
        {
            ResponseData responseData = new ResponseData();
            CHC.DAL.Log.Loger.Log("大資料儲存-----");
            try
            {
                //獲取 請求的ip地址
                string requestIp = Lin.ToolKit.Common.RequestHelper.GetIPAddress();
                if (!string.IsNullOrEmpty(requestIp) && checkIp.Contains(requestIp))
                {
                    string jasonstrs = new System.IO.StreamReader(Request.InputStream).ReadToEnd();
                    TaskData info = Newtonsoft.Json.JsonConvert.DeserializeObject<TaskData>(jasonstrs);
                    //UrlParam url = new UrlParam(Request);
                    //string MothodName = url.Querys["MothodName"];//方法
                    string MothodName = info.MothodName;
                    string ModelList = info.ModelList;//list 物件 json格式
                    string ModeType = info.ModeType;
                    string result = string.Empty;
                    //list物件儲存到資料庫
                    switch (MothodName)
                    {
                        case "DiseasebuweiInfo":
                            List<DiseasebuweiInfo> list = Newtonsoft.Json.JsonConvert.DeserializeObject<List<DiseasebuweiInfo>>(ModelList);
                            result = TaskPushBLL.SaveDataPush(MothodName, ModeType, list, "tb_diseasebuwei");
                            break;
                        case "SysdiseaseInfo":
                            List<SysdiseaseInfo> SysdiseaseInfoList = Newtonsoft.Json.JsonConvert.DeserializeObject<List<SysdiseaseInfo>>(ModelList);
                            result = TaskPushBLL.SaveDataPush(MothodName, ModeType, SysdiseaseInfoList, "tb_sysdisease");
                            break;
                    }
                    if (string.IsNullOrEmpty(result))
                    {
                        responseData.ret = "ok";
                    }
                    else
                    {
                        responseData.msg = result;
                    }
                }
                else
                {
                    responseData.ret = "error";
                    responseData.msg = "請求的ip地址異常";
                }
            }
            catch (Exception ex)
            {
                responseData.msg = ex.Message;
            }
            WriteObject(responseData);
        } 
    }

f.大資料儲存

        #region 大資料儲存
        /// <summary>
        /// 儲存資料
        /// </summary>
        /// <typeparam name="T">實體物件</typeparam>
        /// <param name="name">需要反射的方法名</param>
        /// <param name="typename">新增或修改</param>
        /// <param name="list">實體物件集合</param>
        /// <param name="tableName">目標表名</param>
        /// <returns></returns>
        public static string SaveDataPush<T>(string name, string typename, List<T> list, string tableName)
        {
            string str = string.Empty;
            string strName = typeof(T).Name;
            if (list != null && list.Count > 0)
            {
                try
                {
                    Dictionary<string, string> DictionaryColumn = new Dictionary<string, string>();
                    MethodInfo methstr = typeof(T).GetMethod("DictionaryColumn");
                    if (methstr != null)
                    {
                        DictionaryColumn = methstr.Invoke(null, null) as Dictionary<string, string>;
                    }
                    //TaskPushDAL.Sqlbulkcopy<T>(list, tableName, DictionaryColumn);
                    str = TaskPushDAL.Sqlbulkcopy(name, typename, list, tableName, DictionaryColumn);
                    CHC.DAL.Log.Loger.Log(string.Format("儲存({0}){1}資料共{2}條!===result:{3}", typename, strName, list.Count, str)); 
                }
                catch (Exception ex)
                {
                    str = ex.Message;
                    CHC.DAL.Log.Loger.Error(ex);
                }
            }
            return str;
        }
        #endregion

g.使用 SqlBulkCopy 批量處理資料

        /// <summary>
        /// typeName 為update :先刪除舊資料 再儲存新資料 ,為create:直接插入新資料
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="name"></param>
        /// <param name="typeName"></param>
        /// <param name="data">資料來源list集合</param>
        /// <param name="tableName">需要插入的資料庫目標表名稱</param>
        /// <param name="DictionaryColumn"></param>
        /// <returns></returns>
        public static string Sqlbulkcopy<T>(string name, string typeName, List<T> data, string tableName, Dictionary<string, string> DictionaryColumn)
        {
            string str = string.Empty;
            List<PropertyInfo> pList = new List<PropertyInfo>();//建立屬性的集合
            DataTable dt = new DataTable();
			 //把所有的public屬性加入到集合 並新增DataTable的列 
            Array.ForEach<PropertyInfo>(typeof(T).GetProperties(), p =>
            {
                pList.Add(p);
                Type colType = p.PropertyType;
                if ((colType.IsGenericType) && (colType.GetGenericTypeDefinition() == typeof(Nullable<>)))
                {
                    colType = colType.GetGenericArguments()[0];
                }
                dt.Columns.Add(new DataColumn(p.Name, colType));
            });
            foreach (var item in data)
            {
                DataRow row = dt.NewRow();
                //pList.ForEach(p => row[p.Name] = (item == null ? null : p.GetValue(item, null)));
                foreach (var p in pList)
                {
                    object tvalue = p.GetValue(item, null);
                    row[p.Name] = tvalue == null ? DBNull.Value : tvalue;
                }
                dt.Rows.Add(row);
            }
            string strName = typeof(T).Name;
            using (SqlConnection conn = new SqlConnection(_connection))
            {
                conn.Open();
                SqlTransaction sqlbulkTransaction = conn.BeginTransaction();
                #region 處理批量插入物件
                SqlBulkCopy bulk = new SqlBulkCopy(conn, SqlBulkCopyOptions.CheckConstraints, sqlbulkTransaction)
                {
                    DestinationTableName = tableName, /*設定資料庫目標表名稱*/
                    BatchSize = dt.Rows.Count, /*每一批次中的行數*/
                };
                bulk.BulkCopyTimeout = 5000;   //指定操作完成的Timeout時間
                if (DictionaryColumn != null && DictionaryColumn.Count > 0)
                {
                    foreach (var item in DictionaryColumn)
                    {
                        bulk.ColumnMappings.Add(item.Key, item.Value);//不一致 先指定對應關係 ;源列:目標列
                    }
                }
                else
                {
                    pList.ForEach(p =>
                    {
                        bulk.ColumnMappings.Add(p.Name, p.Name);//列名和 物件屬性一致
                    });
                }
                #endregion
                try
                {
                    if (typeName == "update") //先刪除 再新增
                    {
                        Type typ = typeof(TaskPushDAL);
                        MethodInfo methstr = typ.GetMethod(string.Format("Delete{0}", name));
                        object[] objPara = new object[] { data };
                        string sql = methstr.Invoke(null, objPara).ToString();
                        SqlCommand sqlComm = new SqlCommand(sql, conn, sqlbulkTransaction);
                        sqlComm.CommandTimeout = 300;
                        int count=sqlComm.ExecuteNonQuery();
                        CHC.DAL.Log.Loger.Log(string.Format("Sqlbulkcopy,{0},Delete的數量:{1}", strName, count));
                    }
                    bulk.WriteToServer(dt);//寫入表
                    CHC.DAL.Log.Loger.Log(string.Format("Sqlbulkcopy,{0},批量執行數量:{1}", strName, dt.Rows.Count));
                    sqlbulkTransaction.Commit();
                }
                catch (Exception ex)
                {
                    str = ex.Message;
                    sqlbulkTransaction.Rollback();
                    CHC.DAL.Log.Loger.Error(ex);
                }
                finally
                {
                    bulk.Close();
                    conn.Close();
                }
            }
            return str;
        }