1. 程式人生 > >python分散式事務方案(一)tcc

python分散式事務方案(一)tcc

python分散式事務方案(一)tcc

隨著單體應用的拆分以及服務化的流行,現在分散式事務已經比較常見,分散式事務理論ACID、CAP、BASE等我就不說了,現在就直接說一下一種常見的解決方案-tcc TCC 其實就是採用的補償機制,其核心思想是:針對每個操作,都要註冊一個與其對應的確認和補償(撤銷)操作。它分為三個階段:

  • Try 階段主要是對業務系統做檢測及資源預留
  • Confirm 階段主要是對業務系統做確認提交,Try階段執行成功並開始執行 Confirm階段時,預設 Confirm階段是不會出錯的。即:只要Try成功,Confirm一定成功。
  • Cancel 階段主要是在業務執行錯誤,需要回滾的狀態下執行的業務取消,預留資源釋放。

優點: 跟和兩階段提交比起來,實現以及流程相對簡單了一些,但資料的一致性比2PC也要差一些

缺點: 缺點還是比較明顯的,在2,3步中都有可能失敗。TCC屬於應用層的一種補償方式,所以需要程式設計師在實現的時候多寫很多補償的程式碼,在一些場景中,一些業務流程可能用TCC不太好定義及處理。

下面介紹下我們應用的一種場景,有一個運維繫統需要運用到zabbix,而運維繫統拆分出了一個配置中心,下面是子系統依賴圖 在這裡插入圖片描述

在配置告警策略時需要呼叫zabbix介面 在這裡插入圖片描述

這時就涉及到一個分散式事務。由於我們這裡只涉及到兩個事務,所以我這裡就寫了一個zabbix代理client,來作為事務協調器

class ZabbixClientProxy(object):
    '''
    zabbix client simple proxy
    '''
    client = models.get_zbx_client()

    def __init__(self):
        self.create_triggers = list()
        self.update_triggers = list()
        self.delete_triggers = list()
        self.update_macros = list()

    def trigger_create(self, name, expression,uuid):
        try:
            trigger = self.client.hosts.trigger_create(name, expression, 1)
            trigger["uuid"]=uuid
            self.create_triggers.append(trigger)
            logger.debug("trigger_create " + name)
            return trigger
        except Exception, e:
            logger.error("trigger_create fail,cause by " + e.message)
            raise

    def trigger_update(self, triggerid, name, expression,uuid):
        try:
            logger.debug("trigger_update " + name)
            old_trigger = self.client.hosts.trigger_get(triggerid)
            update_result = self.client.hosts.trigger_update(
                    triggerid, name=name, expression=expression, priority=1, enable=True)
            old_trigger["uuid"]=uuid
            logger.debug(old_trigger)
            self.update_triggers.append(old_trigger)
            return update_result
        except Exception, e:
            logger.error("trigger_update fail,cause by " + e.message)

    def trigger_delete(self, triggerid,uuid):
        try:
            logger.debug("trigger_delete " + triggerid)
            old_trigger = self.client.hosts.trigger_get(triggerid)
            delete_result = self.client.hosts.trigger_delete(triggerid)
            old_trigger["uuid"]=uuid
            self.delete_triggers.append(old_trigger)
            return delete_result
        except Exception, e:
            logger.error("trigger_delete fail,cause by " + e.message)

    def update_trigger_macro(self, uuid, item_threshold, alert_duration):
        all_hmacros = self.get_macro_by_name(uuid)
        if all_hmacros and len(all_hmacros) > 2:
            self.update_macro(all_hmacros, "DISK_USER_MAX", item_threshold)
            self.update_macro(all_hmacros, "DISK_USER_TIMES", str(alert_duration) + "m")
            self.update_macro(all_hmacros, "DISK_USER_ENABLE", 1)
        else:
            self.create_macro("DISK_USER_MAX", item_threshold, uuid)
            self.create_macro("DISK_USER_TIMES", str(alert_duration) + "m", uuid)
            self.create_macro("DISK_USER_ENABLE", 1, uuid)

    def stop_trigger(self, assets):
        if assets:
            for asset in assets:
                if asset.host is None:
                    continue
                all_hmacros = self.get_macro_by_name(asset.host.uuid)
                if all_hmacros and len(all_hmacros) > 2:
                    self.update_macro(all_hmacros, "DISK_USER_ENABLE", 0)
                else:
                    self.create_macro("DISK_USER_MAX", 80, asset.host.uuid)
                    self.create_macro("DISK_USER_TIMES", "5m", asset.host.uuid)
                    self.create_macro("DISK_USER_ENABLE", 0, asset.host.uuid)

    def get_macro_by_name(self, uuid):
        return self.client.macros.list(uuid)

    def update_macro(self, all_hmacros, macro_name, value):
        for macro in all_hmacros:
            if macro['macro'] == ('{$' + macro_name + '}'):
                try:
                    self.client.macros.update(macro['hostmacroid'], macro=macro_name, value=value)
                    macro['name'] = macro_name
                    self.update_macros.append(macro)
                    logger.debug('update_macro ' + macro_name + ' to ' + str(value))
                except Exception, e:
                    logger.error('update_macro ' + macro_name + ' fail,case by ' + e.message)

    def create_macro(self, macro_name, value, uuid):
        try:
            hostid = self.client.macros._get_hostid(uuid)
            hmacro = self.client.macros.create(macro_name, value, hostid)
            logger.debug("create_macro success,macro_name:" + macro_name + ",value:" + str(value))
        except Exception, e:
            logger.error("create_macro fail,cause by " + e.message)

    def trigger_get(self, triggerid):
        return self.client.hosts.trigger_get(triggerid)

    def trigger_list(self, hostid):
        return self.client.hosts.trigger_list(hostid)

    def item_list(self, uuid):
        return self.client.hosts.item_list(uuid)

    def rollback(self):
        logger.debug("start rollback")
        # rollback create
        for trigger in self.create_triggers:
            try:
                self.client.hosts.trigger_delete(trigger["triggerid"])
                logger.debug('rollback_create_trigger ' + trigger["name"])
            except Exception, e:
                logger.error('rollback_create_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message))
        self.create_triggers = []
        for trigger in self.update_triggers:
            try:
                expression=trigger["expression"].replace(trigger['uuid']+']','{HOST.HOST}]')
                self.client.hosts.trigger_update(trigger["triggerid"], name=trigger["name"],
                                                 expression=expression, priority=1, enable=True)
                logger.debug('rollback_update_trigger ' + trigger["name"])

            except Exception, e:
                logger.error('rollback_update_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message))
        self.update_triggers = []
        for trigger in self.delete_triggers:
            try:
                expression=trigger["expression"].replace(trigger['uuid']+']','{HOST.HOST}]')
                new_trigger = self.client.hosts.trigger_create(trigger["name"], expression, 1)
                logger.debug(new_trigger)
                logger.debug('rollback_delete_trigger ' + trigger["name"])
                # 更新資料中的zabbix trigger id
                alert_models.ConditionTrigger.objects.filter(zabbix_trigger_id=trigger["triggerid"]).update(
                        zabbix_trigger_id=new_trigger["triggerid"])
            except Exception, e:
                logger.error('rollback_delete_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message))
        self.delete_triggers = []

        for macro in self.update_macros:
            try:
                self.client.macros.update(macro['hostmacroid'], macro=macro['name'], value=macro['value'])
            except Exception, e:
                logger.error('rollback_update_macro ' + macro['name'] + ' fail,case by ' + str(e.message))
        logger.debug("end rollback")

事務成功,則提交本地事務,如果失敗則呼叫rollback

def create(self, request, *args, **kwargs):
    '''
    policy add
    '''
    assets = request.data["data"]
    client = ZabbixClientProxy()
    try:
        with transaction.atomic():
            #save policy
            #將client作為引數,對主機、監控項、觸發器進行增刪改
    except rest_framework_serializers.ValidationError, e:
        logger.exception(e)
        client.rollback()
        raise

這樣做還有一個問題就是,在回滾中如果網路突然斷了這時會回滾失敗,這裡我們記錄了日誌,後面我們會通過掃描日誌來做到最終一致性,這裡我們後面坐了補償,下一次修改時會自動修正回滾失敗問題。