1. 程式人生 > >[同步腳本]mysql-elasticsearch同步

[同步腳本]mysql-elasticsearch同步

global name oca continue for txt tel rb+ __file__

公司項目搜索部分用的elasticsearch,那麽這兩個之間的數據同步就是一個問題。

網上找了幾個包,但都有各自的缺點,最後決定還是自己寫一個腳本,大致思路如下:

1.在死循環中不斷的select指定的表

2.讀取表中更新時間晚於某個時間點的所有行 (初始化時候為"1970-01-01 00:00:00")

3.把需要的字段更新到elasticsearch

註意:1.中間要考慮到腳本中斷,或者重啟所以把最後的更新時間記錄到了固定的txt文件

2.為了讓腳本更加通用,不至於為了一個表就大幅度更改腳本,考慮動態生成變量,使用了locals和globals

代碼如下:

#!/usr/bin/env python
# coding=utf-8 import sys sys.path.append(/Users/cangyufu/work_jbkj/elabels-flask) from modules.utils.commons import app, redispool, db_master, db_slave from sqlalchemy import text import os import datetime import time from service.myelasticsearch.index import es from modules.utils.mysqldb import db_obj_dict
import datetime CONST_SLEEP = 3 WORK_INDEX = ‘test #https://stackoverflow.com/questions/136168/get-last-n-lines-of-a-file-with-python-similar-to-tail def tail(f, lines=1): total_lines_wanted = lines BLOCK_SIZE = 1024 f.seek(0, 2) block_end_byte = f.tell() lines_to_go = total_lines_wanted block_number
= -1 blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting # from the end of the file while lines_to_go > 0 and block_end_byte > 0: if (block_end_byte - BLOCK_SIZE > 0): # read the last block we haven‘t yet read f.seek(block_number*BLOCK_SIZE, 2) blocks.append(f.read(BLOCK_SIZE)) else: # file too small, start from begining f.seek(0,0) # only read what was not read blocks.append(f.read(block_end_byte)) lines_found = blocks[-1].count(\n) lines_to_go -= lines_found block_end_byte -= BLOCK_SIZE block_number -= 1 all_read_text = ‘‘.join(reversed(blocks)) return \n.join(all_read_text.splitlines()[-total_lines_wanted:]) def is_file_exists(filename): if not os.path.isfile(filename): file = open(filename, wb) file.write("1970-01-01 00:00:00\n") file.close() #傳入要監控的表名 def sync_main(*args): for table in args: try: callable(globals()[monitor_+table]) except Exception: raise Exception(lack function monitor_{}.format(table)) for table in args: filename = ‘‘.join([monitor_, table, .txt]) locals()[table+path] = os.path.join(os.path.dirname(__file__), filename) is_file_exists(locals()[table+path]) locals()[table+file] = open(locals()[table+path], rb+) try: print "begin" while True: count = 0 for table in args: print handleing +table last_time = tail(locals()[table+file], 1) update_time = globals()[monitor_+table](last_time) print update_time if update_time == last_time: count += 1 continue locals()[table + file].write(update_time+\n) locals()[table + file].flush() if count == len(args): time.sleep(CONST_SLEEP) except Exception, e: print e raise e finally: for table in args: locals()[table + file].close() ######################################################################################################################## # # 如果要監控哪個表,必須要實現 函數 monitor_table_name,比如要監控table1表,就必須要實現monitor_table1函數, # 傳入參數為開始更新的起始時間,初始化時候為1970-01-01 00:00:00,返回更新到的最新的時間 # ######################################################################################################################## def monitor_table1(last_time): pass return last_time
def monitor_table2(last_time):
    pass
    return last_time
def trans_date_time(dt): 
  
return datetime.datetime.strptime(dt, "%Y-%m-%d %H:%M:%S")


sync_main(
‘table1,‘table2)

[同步腳本]mysql-elasticsearch同步