python分散式事務方案(一)tcc
阿新 • • 發佈:2018-11-27
python分散式事務方案(一)tcc
隨著單體應用的拆分以及服務化的流行,現在分散式事務已經比較常見,分散式事務理論ACID、CAP、BASE等我就不說了,現在就直接說一下一種常見的解決方案-tcc TCC 其實就是採用的補償機制,其核心思想是:針對每個操作,都要註冊一個與其對應的確認和補償(撤銷)操作。它分為三個階段:
- Try 階段主要是對業務系統做檢測及資源預留
- Confirm 階段主要是對業務系統做確認提交,Try階段執行成功並開始執行 Confirm階段時,預設 Confirm階段是不會出錯的。即:只要Try成功,Confirm一定成功。
- Cancel 階段主要是在業務執行錯誤,需要回滾的狀態下執行的業務取消,預留資源釋放。
優點: 跟和兩階段提交比起來,實現以及流程相對簡單了一些,但資料的一致性比2PC也要差一些
缺點: 缺點還是比較明顯的,在2,3步中都有可能失敗。TCC屬於應用層的一種補償方式,所以需要程式設計師在實現的時候多寫很多補償的程式碼,在一些場景中,一些業務流程可能用TCC不太好定義及處理。
下面介紹下我們應用的一種場景,有一個運維繫統需要運用到zabbix,而運維繫統拆分出了一個配置中心,下面是子系統依賴圖
在配置告警策略時需要呼叫zabbix介面
這時就涉及到一個分散式事務。由於我們這裡只涉及到兩個事務,所以我這裡就寫了一個zabbix代理client,來作為事務協調器
class ZabbixClientProxy(object): ''' zabbix client simple proxy ''' client = models.get_zbx_client() def __init__(self): self.create_triggers = list() self.update_triggers = list() self.delete_triggers = list() self.update_macros = list() def trigger_create(self, name, expression,uuid): try: trigger = self.client.hosts.trigger_create(name, expression, 1) trigger["uuid"]=uuid self.create_triggers.append(trigger) logger.debug("trigger_create " + name) return trigger except Exception, e: logger.error("trigger_create fail,cause by " + e.message) raise def trigger_update(self, triggerid, name, expression,uuid): try: logger.debug("trigger_update " + name) old_trigger = self.client.hosts.trigger_get(triggerid) update_result = self.client.hosts.trigger_update( triggerid, name=name, expression=expression, priority=1, enable=True) old_trigger["uuid"]=uuid logger.debug(old_trigger) self.update_triggers.append(old_trigger) return update_result except Exception, e: logger.error("trigger_update fail,cause by " + e.message) def trigger_delete(self, triggerid,uuid): try: logger.debug("trigger_delete " + triggerid) old_trigger = self.client.hosts.trigger_get(triggerid) delete_result = self.client.hosts.trigger_delete(triggerid) old_trigger["uuid"]=uuid self.delete_triggers.append(old_trigger) return delete_result except Exception, e: logger.error("trigger_delete fail,cause by " + e.message) def update_trigger_macro(self, uuid, item_threshold, alert_duration): all_hmacros = self.get_macro_by_name(uuid) if all_hmacros and len(all_hmacros) > 2: self.update_macro(all_hmacros, "DISK_USER_MAX", item_threshold) self.update_macro(all_hmacros, "DISK_USER_TIMES", str(alert_duration) + "m") self.update_macro(all_hmacros, "DISK_USER_ENABLE", 1) else: self.create_macro("DISK_USER_MAX", item_threshold, uuid) self.create_macro("DISK_USER_TIMES", str(alert_duration) + "m", uuid) self.create_macro("DISK_USER_ENABLE", 1, uuid) def stop_trigger(self, assets): if assets: for asset in assets: if asset.host is None: continue all_hmacros = self.get_macro_by_name(asset.host.uuid) if all_hmacros and len(all_hmacros) > 2: self.update_macro(all_hmacros, "DISK_USER_ENABLE", 0) else: self.create_macro("DISK_USER_MAX", 80, asset.host.uuid) self.create_macro("DISK_USER_TIMES", "5m", asset.host.uuid) self.create_macro("DISK_USER_ENABLE", 0, asset.host.uuid) def get_macro_by_name(self, uuid): return self.client.macros.list(uuid) def update_macro(self, all_hmacros, macro_name, value): for macro in all_hmacros: if macro['macro'] == ('{$' + macro_name + '}'): try: self.client.macros.update(macro['hostmacroid'], macro=macro_name, value=value) macro['name'] = macro_name self.update_macros.append(macro) logger.debug('update_macro ' + macro_name + ' to ' + str(value)) except Exception, e: logger.error('update_macro ' + macro_name + ' fail,case by ' + e.message) def create_macro(self, macro_name, value, uuid): try: hostid = self.client.macros._get_hostid(uuid) hmacro = self.client.macros.create(macro_name, value, hostid) logger.debug("create_macro success,macro_name:" + macro_name + ",value:" + str(value)) except Exception, e: logger.error("create_macro fail,cause by " + e.message) def trigger_get(self, triggerid): return self.client.hosts.trigger_get(triggerid) def trigger_list(self, hostid): return self.client.hosts.trigger_list(hostid) def item_list(self, uuid): return self.client.hosts.item_list(uuid) def rollback(self): logger.debug("start rollback") # rollback create for trigger in self.create_triggers: try: self.client.hosts.trigger_delete(trigger["triggerid"]) logger.debug('rollback_create_trigger ' + trigger["name"]) except Exception, e: logger.error('rollback_create_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message)) self.create_triggers = [] for trigger in self.update_triggers: try: expression=trigger["expression"].replace(trigger['uuid']+']','{HOST.HOST}]') self.client.hosts.trigger_update(trigger["triggerid"], name=trigger["name"], expression=expression, priority=1, enable=True) logger.debug('rollback_update_trigger ' + trigger["name"]) except Exception, e: logger.error('rollback_update_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message)) self.update_triggers = [] for trigger in self.delete_triggers: try: expression=trigger["expression"].replace(trigger['uuid']+']','{HOST.HOST}]') new_trigger = self.client.hosts.trigger_create(trigger["name"], expression, 1) logger.debug(new_trigger) logger.debug('rollback_delete_trigger ' + trigger["name"]) # 更新資料中的zabbix trigger id alert_models.ConditionTrigger.objects.filter(zabbix_trigger_id=trigger["triggerid"]).update( zabbix_trigger_id=new_trigger["triggerid"]) except Exception, e: logger.error('rollback_delete_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message)) self.delete_triggers = [] for macro in self.update_macros: try: self.client.macros.update(macro['hostmacroid'], macro=macro['name'], value=macro['value']) except Exception, e: logger.error('rollback_update_macro ' + macro['name'] + ' fail,case by ' + str(e.message)) logger.debug("end rollback")
事務成功,則提交本地事務,如果失敗則呼叫rollback
def create(self, request, *args, **kwargs):
'''
policy add
'''
assets = request.data["data"]
client = ZabbixClientProxy()
try:
with transaction.atomic():
#save policy
#將client作為引數,對主機、監控項、觸發器進行增刪改
except rest_framework_serializers.ValidationError, e:
logger.exception(e)
client.rollback()
raise
這樣做還有一個問題就是,在回滾中如果網路突然斷了這時會回滾失敗,這裡我們記錄了日誌,後面我們會通過掃描日誌來做到最終一致性,這裡我們後面坐了補償,下一次修改時會自動修正回滾失敗問題。