1. 程式人生 > >mysql基於binlog回滾工具_flashback(python版本)

mysql基於binlog回滾工具_flashback(python版本)

 update、delete的條件寫錯甚至沒有寫,導致資料操作錯誤,需要恢復被誤操作的行記錄。這種情形,其實時有發生,可以選擇用備份檔案+binlog來恢復到測試環境,然後再做資料修復,但是這樣其實需要耗費一定的時間跟資源。

    其實,如果binlog format為row,binlog檔案中是會詳細記錄每一個事務涉及到操作,並把每一個事務影響到行記錄均儲存起來,能否給予binlog 檔案來反解析資料庫的行記錄變動情況呢?     業界已有不少相關的指令碼及工具,但是隨著MySQL版本的更新、binlog記錄內容的變化以及需求不一致,大多數指令碼不太適合個人目前的使用需求,所以開始著手編寫 mysql的 flash back指令碼。      僅在MySQL 5.6/5.7版本測試,python執行環境需要安裝pymysql模組。

1 實現內容

    根據binlog檔案,對某個\些事務、某段時間某些表、某段時間全庫 做回滾操作,實現閃回功能。工具處理過程中,會把binlog中的事務修改的行記錄儲存到表格中去,通過 dml_sql 列,可以檢視每一個事務內部的所有行記錄變更情況,通過 undo_sql 查看回滾的SQL內容。如下圖,然後再根據表格內容做回滾操作。             那麼這個指令碼有哪些優點呢?
  • 回滾分為2個命令:第一個命令 分析binglog並存儲進入資料庫;第二個命令 執行回滾操作;
  • 回滾的時候,可以把執行指令碼跟回滾指令碼統一存放到資料庫中,可以檢視 更新內容以及回滾內容;
  • 根據儲存的分析表格,方便指定事務或者指定表格來來恢復;
  • 詳細的日誌輸出,說明分析進度跟執行進度。
    分析binlog的輸出截圖(分析1G的binlog檔案)        回滾資料庫的輸出截圖:    

2 原理

    前提:例項啟動了binlog並且格式為ROW。     使用python對mysqlbinlog後的log檔案進行文字分析處理,在整個處理過程中,有以下6個疑難點需要處理:
  1. 判斷一個事務的開始跟結束
  2. 同一個事務的執行順序需要反序執行
  3. 解析回滾SQL
  4. 同一個事務操作不同表格處理
  5. 轉義字元處理,比如換行符、tab符等等
  6. timestamp資料型別引數值轉換
  7. 負數處理
  8. 單個事務涉及到行修改SQL操作了 max_allow
  9. 針對某個表格做回滾,而不是全庫回滾

2.1 事務的開始與結束

    按照Xid出現的位置來判斷,從binlog檔案的最開始開始讀取,遇到SQL語句則提取出來,直到遇到Xid,統一把之前提取出來的SQL彙總為一個事務,然後繼續提取SQL語句,直到遇到下一個Xid,再把這個事務的SQL彙總成一個事務,一直這樣迴圈,直至檔案順序遍歷結束。    

2.2 事務內部反序處理

    同一個事務中,如果有多個表格多行記錄發生變更,在回滾的時候,應該反序回滾SQL,那麼,如何將提取出來的SQL反序儲存呢?思路如下:
  • 每行記錄的修改SQL獨立出來
  • 將獨立出來的SQL反序儲存
   假設正序的事務SQL語句儲存在變數 dml_sql 中,反序後的可以回滾的SQL儲存在變數 undo_sql中。按順序把行記錄修改的SQL抽取出來 儲存到變數 record_sql 中去,然後 賦值 undo_sql =record_sql + undo_sql ,再置空 record_sql 變數,如此,便可實現反序事務內部的執行SQL。

2.3 解析回滾SQL

    首先,檢視binlog的日誌內容,發現行修改的SQL情形如下,提取過程中需要注意這幾個問題:
  • 行記錄的列名配對,binlog file儲存的列序號,不能直接使用
  • WHERE部分 跟 SET部分 之間並無關鍵字或者符號,需要新增 AND 或者 逗號
  • DELETE SQL 需要反轉為 INSERT
  • UPDATE SQL 需要把WHERE 跟 SET的部分進行替換
  • INSERT SQL需要反轉為 DELETE

2.4 同事務不同表格處理

    同一個事務中,允許對不同表格進行資料修改,這點在列名替換列序號的時候,需要留意處理。     每一個的行記錄前有一行記錄,含有 'Table_map' 標識,會說明這一行當行記錄是修改哪個表格,可以根據這個提示,來替換binlog裡邊的列序號為列名。

2.5 轉義字元處理

    binlog檔案在對非空格的空白字元處理,採用轉義字元字串儲存,比如,在表格insert一列記錄含換行符,而實際上在binlog檔案中,是使用了 \x0a 替換了 換行操作,所以在回滾資料的過程中,需要對轉義字元做處理。    

   

    這裡注意一個地方,039的轉義字元是沒有在函式 esc_code 中統一處理,而是單獨做另外處理。         轉移字元表相見下圖:    

2.6 timestamp資料型別處理

    timestamp實際在資料庫中的儲存值是 INT型別,需要使用 函式 from_unixtime轉換。     建立測試表格tbtest,只有一列timestamp的列,儲存值後檢視binlog的內容,具體截圖如下:         在處理行記錄的時候,要對timestamp的value做處理,新增from_unixtime函式轉換。

2.7 負數值處理

    這個一開始寫程式碼的時候,並沒有考慮到。大量測試的過程中發現,所有整型的資料型別,在儲存負數的時候,都會存入一個最大範圍值。binlog在處理這塊的機制有些不是很瞭解。測試如下:        所以當遇到INT的各種資料型別並且VALUE為負數的時候,需要把 這個範圍值去除,才能執行執行undo_sql。

2.8 單個事務行記錄總SQL超過max_allowed_package處理

    分析binlog後儲存兩種sql型別,一種是行記錄的修改SQL,即 dml_sql;一種是 行記錄的回滾sql,即 undo_sql。從程式碼可知,儲存這兩個sql的列是longtext,最大可儲存4G的內容。但是 MySQL中單個會話的包大小是有限制的,限制的引數為 max_allowed_packet,預設大小為 4Mb,最大為1G,所以這個指令碼使用前,請手動設定 儲存binlog file的資料庫例項以及線上的資料庫例項這個引數: set global max_allowed_packet = 1073741824; #記得後續修改回來     萬一操作了呢?那麼回滾只能分段來回滾,先回滾到 這個大事務,然後單獨執行這個大事務,緊接著繼續回滾,這部分不能使用pymysql嗲用source 檔案執行,所以只能手動做這個操作。 求高能人士修改這個邏輯程式碼!!!

2.9 針對性回滾

    假設誤操作的沒有明確的時間點,只有一個區間,而這個區間還有其他的表格操作,那麼這個時候,需要在分析binlog檔案的時候,新增--database選項,先帥選到同一個資料庫中binlog檔案中。     這裡的處理是將這段區間的dml_sql跟undo_sql都儲存到資料庫表格中,然後再刪除不需要回滾的事務,剩餘需要回滾的事務。再執行回滾操作。

3 使用說明

3.1 引數說明

    這個指令碼的引數稍微多些,可以 --help 檢視具體說明。         本人喜歡用各種顏色來分類引數(blingbling五顏六色,看著多有趣多精神),所以,按顏色來說明這些引數。
  • 黃色區域:這6個引數,提供的是 分析並存儲binlog file的相關值,說明儲存分析結果的資料庫的連結方式、binlog檔案的位置以及儲存結果的表格名字;
  • 藍色區域:這4個引數,提供 與線上資料庫表結構一致的DB例項連線方式,僅需跟線上一模一樣的表結構,不一定需要是主從庫;
  • 綠色區域:最最重要的選項 -a,0代表僅分析binlog檔案,1代表僅執行回滾操作,必須先執行0才可以執行1;
  • 紫色區域:舉例說明。

3.2 應用場景說明

  • 全庫回滾某段時間
    • 需要回滾某個時間段的所有SQL操作,回滾到某一個時間點
    • 這種情況下呢,大多數是使用備份檔案+binlog解決
    • 但是這個指令碼也可以滿足,但請勿直接在線上操作,先 -a=0,看下分析結果,是否符合,符合的話,停掉某個從庫,再在從庫上執行,最後開發業務接入檢查是否恢復到指定時間點,資料是否正常。
  • 某段時間某些表格回滾某些操作
    • 比如,開發提交了一個批量更新指令碼,各個測試層面驗證沒有問題,提交線上執行,但是執行後,發現有個業務漏測試,導致某些欄位更新後影響到其他業務,現在需要緊急 把被批量更新的表格回滾到原先的行記錄
    • 這個並不能單純從技術角度來處理,要綜合考慮
      • 這種情況下,如何回顧tab A表格的修改操作呢?
      • 個人覺得,這種方式比較行得通,dump tabA表格的資料到測試環境,然後再分析 binlog file 從11點-12點的undo sql,接著在測試環境回滾該表格到11點這個時刻,緊接著,由開發跟業務對比測試環境11點的資料跟線上現有的資料中,看下是哪些行哪些列需要在線上進行回滾,哪些是不需要的,然後開發再提交SQL指令碼,再在線上執行。其實,這裡邊,DBA僅提供一個角色,就是把 表格 tab A 在一個新的環境上,回滾到某個時間點,但是不提供直接線上回滾SQL的處理。
  • 回滾某個/些SQL
    • 這種情況比較常見,某個update某個delete缺少where條件或者where條件執行錯誤
    • 這種情況下,找到對應的事務,執行回滾即可,回滾流程請參考上面一說,對的,我就是這麼膽小怕事 

3.3 測試案例

3.3.1 全庫回滾某段時間

假設需要回滾9點10分到9點15分間資料庫的所有操作:
  • 準備測試環境例項儲存分析後的資料 
  • 測試環境修改set global max_allowed_packet = 1073741824
  • mysqlbinlog分析binlog檔案
  • python指令碼分析檔案,action=0
  • 線上測試環境修改set global max_allowed_packet = 1073741824
  • 回滾資料,action=1
  • 線上測試環境修改set global max_allowed_packet = 4194304
 1 --測試環境(請安裝pymysql):IP: 192.168.9.242,PORT:3310 ,資料庫:flashback,表格:tbevent
 2 --具有線上表結構的db:IP:192.168.9.243 PORT:3310
 3 
 4 
 5 mysql> show global variables like 'max_allowed_packet';
 6 +--------------------+----------+
 7 | Variable_name      | Value    |
 8 +--------------------+----------+
 9 | max_allowed_packet | 16777216 |
10 +--------------------+----------+
11 1 row in set (0.00 sec)
12 
13 mysql> set global max_allowed_packet = 1073741824;
14 Query OK, 0 rows affected (0.00 sec)
15 
16 [[email protected] ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log
17 
18 [[email protected] pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=0
19 2017-06-19 10:59:39,041 INFO begin to assign values to parameters
20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent
21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok
22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=3310
23 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok
24 2017-06-19 10:59:39,054 INFO MySQL connection is ok
25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event
26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent 
27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!
28 2017-06-19 10:59:39,061 INFO analyzing...
29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!
30 2017-06-19 11:49:53,782 INFO release all db connections
31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 
32 
33 
34 [[email protected] pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=1
35 2017-06-19 16:30:20,633 INFO begin to assign values to parameters
36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent
37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok
38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=3310
39 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok
40 2017-06-19 16:30:20,871 INFO MySQL connection is ok
41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions 
42 2017-06-19 16:30:21,243 INFO doing batch : 1 
43 2017-06-19 16:31:01,182 INFO doing batch : 2 
44 2017-06-19 16:31:16,909 INFO doing batch : 3 
45 -------省空間忽略不截圖--------------
46 2017-06-19 16:41:11,287 INFO doing batch : 34 
47 2017-06-19 16:41:25,577 INFO doing batch : 35 
48 2017-06-19 16:41:44,629 INFO release all db connections
49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310

3.3.2 某段時間某些表格回滾某些操作

  • 準備測試環境例項儲存分析後的資料 
  • 測試環境修改set global max_allowed_packet = 1073741824
  • mysqlbinlog分析binlog檔案
  • python指令碼分析檔案,action=0
  • 分析帥選需要的事務,rename表格
  • dump 對應的表格到測試環境
  • 回滾資料,action=1
  • 提交給開發業務對比資料

3.3.3 回滾某個/些SQL

  • 準備測試環境例項儲存分析後的資料 
  • 測試環境修改set global max_allowed_packet = 1073741824
  • mysqlbinlog分析binlog檔案
  • python指令碼分析檔案,action=0
  • 分析帥選需要的事務,rename表格
  • dump 對應的表格到測試環境
  • 回滾資料,action=1
  • 提交給開發業務對比資料

4 python指令碼

     指令碼會不定期修復bug,若是感興趣,可以往github下載:https://github.com/xinysu/mysql.git 中的 mysql_xinysu_flashback 。

  1 # -*- coding: utf-8 -*-
  2 __author__ = 'xinysu'
  3 __date__ = '2017/6/15 10:30'
  4 
  5 
  6 
  7 import re
  8 import os
  9 import sys
 10 import datetime
 11 import time
 12 import logging
 13 import importlib
 14 importlib.reload(logging)
 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ')
 16 
 17 import pymysql
 18 from pymysql.cursors import DictCursor
 19 
 20 usage='''\nusage: python [script's path] [option]
 21 ALL options need to assign:
 22 \033[1;33;40m
 23 -h    : host, the database host,which database will store the results after analysis
 24 -u    : user, the db user
 25 -p    : password, the db user's password
 26 -P    : port, the db port
 27 
 28 -f    : file path, the binlog file
 29 -t    : table name, the table name to store the results after analysis , {dbname}.{tbname},
 30         when you want to store in `test` db and the table name is `tbevent`,then this parameter 
 31         is test.tbevent
 32 \033[1;34;40m
 33 -oh   : online host, the database host,which database have the online table schema
 34 -ou   : online user, the db user
 35 -op   : online password, the db user's password
 36 -oP   : online port, the db port
 37 \033[1;32;40m
 38 -a    : action, 
 39         0 just analyse the binlog file ,and store sql in table; 
 40         1 after execute self.dotype=0, execute the undo_sql in the table
 41 \033[0m  
 42 --help: help document
 43 \033[1;35;40m
 44 Example:
 45 analysize binlog:
 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 47                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 48                        -a=0
 49 
 50 flash back:
 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 52                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 53                        -a=1
 54 \033[0m                        
 55 '''
 56 
 57 class flashback:
 58     def __init__(self):
 59         self.host=''
 60         self.user=''
 61         self.password=''
 62         self.port='3306'
 63         self.fpath=''
 64         self.tbevent=''
 65 
 66         self.on_host=''
 67         self.on_user=''
 68         self.on_password=''
 69         self.on_port='3306'
 70 
 71         self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table
 72 
 73         self._get_db() # 從輸入引數獲取連線資料庫的相關引數值
 74 
 75         # 連線資料庫,該資料庫是用來儲存binlog檔案分析後的內容
 76         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent))
 77         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8')
 78         self.cur = self.mysqlconn.cursor(cursor=DictCursor)
 79         logging.info('MySQL which userd to store binlog event connection is ok')
 80 
 81         # 連線資料庫,該資料庫的表結構必須跟binlogfile基於對資料庫表結構一致
 82         # 該資料庫用於提供 binlog file 檔案中涉及到表結構分析
 83         logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port))
 84         self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8')
 85         self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor)
 86         logging.info('MySQL which userd to analyse online table schema connection is ok')
 87 
 88         logging.info('\033[33mMySQL connection is ok\033[0m')
 89 
 90         self.dml_sql=''
 91         self.undo_sql=''
 92 
 93         self.tbfield_where = []
 94         self.tbfield_set = []
 95 
 96         self.begin_time=''
 97         self.db_name=''
 98         self.tb_name=''
 99         self.end_time=''
100         self.end_pos=''
101         self.sqltype=0
102 
103     #_get_db用於獲取執行命令的輸入引數
104     def _get_db(self):
105         logging.info('begin to assign values to parameters')
106         if len(sys.argv) == 1:
107             print(usage)
108             sys.exit(1)
109         elif sys.argv[1] == '--help':
110             print(usage)
111             sys.exit()
112         elif len(sys.argv) > 2:
113             for i in sys.argv[1:]:
114                 _argv = i.split('=')
115                 if _argv[0] == '-h':
116                     self.host = _argv[1]
117                 elif _argv[0] == '-u':
118                     self.user = _argv[1]
119                 elif _argv[0] == '-P':
120                     self.port = int(_argv[1])
121                 elif _argv[0] == '-f':
122                     self.fpath = _argv[1]
123                 elif _argv[0] == '-t':
124                     self.tbevent = _argv[1]
125                 elif _argv[0] == '-p':
126                     self.password = _argv[1]
127 
128                 elif _argv[0] == '-oh':
129                     self.on_host = _argv[1]
130                 elif _argv[0] == '-ou':
131                     self.on_user = _argv[1]
132                 elif _argv[0] == '-oP':
133                     self.on_port = int(_argv[1])
134                 elif _argv[0] == '-op':
135                     self.on_password = _argv[1]
136 
137                 elif _argv[0] == '-a':
138                     self.action = _argv[1]
139 
140                 else:
141                     print(usage)
142 
143     #建立表格,用於儲存分析後的BINLOG內容
144     def create_tab(self):
145         logging.info('creating table {} to store binlog event'.format(self.tbevent))
146         create_tb_sql ='''
147         CREATE TABLE IF NOT EXISTS {}(
148             auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,
149             binlog_nam