1. 程式人生 > >angr原始碼分析——DFG 資料流圖

angr原始碼分析——DFG 資料流圖

這篇文章主要講述,angr中資料流圖(Data Flow Gragh)的構建。

DFG恢復的是CFG中每個基本塊的資料流!
DFG為CFG的每個基本塊構建一個數據流圖(DFG)
DFG可以通過字典self.dfgs獲得,其中key的值為基本塊的地址,或DFG中的值。
param CFG:用於獲得所有基本塊的CFG

param annocfg:一個由向後片構建的註釋cfg,用於在白名單上構建DFG。

建構函式:

def __init__(self, cfg=None, annocfg=None):
        """
        Build a Data Flow Grah (DFG) for every basic block of a CFG
        The DFGs are available in the dict self.dfgs where the key
        is a basic block addr and the value a DFG.
        :param cfg: A CFG used to get all the basic blocks
        :param annocfg: An AnnotatedCFG built from a backward slice used to only build the DFG on the whitelisted statements
        """
        if cfg is None:
            self._cfg = self.project.analyses.CFGAccurate()
        else:
            self._cfg = cfg
        self._annocfg = annocfg

        self.dfgs = self._construct()

如果沒有cfg就構建cfg。

然後,呼叫_construct()函式構建DFG。這個函式,有點長,不過也是構造資料流的主要函式。下面開始分析吧。

def _construct(self):
        """
        We want to build the type of DFG that's used in "Automated Ident. of Crypto
        Primitives in Binary Code with Data Flow Graph Isomorphisms." Unlike that
        paper, however, we're building it on Vex IR instead of assembly instructions.
        """
        cfg = self._cfg
        p = self.project
        dfgs = {}
        l.debug("Building Vex DFG...")

        for node in cfg.nodes():#遍歷每個節點
            try:
                if node.simprocedure_name == None:
                    irsb = p.factory.block(node.addr).vex #根據節點獲得irsb
                else:
                    l.debug("Cannot process SimProcedures, ignoring %s" % node.simprocedure_name)
                    continue
            except Exception as e:
                l.debug(e)
                continue
            tmpsnodes = {}
            storesnodes = {}
            putsnodes = {}
            statements = irsb.statements #獲取irsb的所有語句
            dfg = DiGraph()

            for stmt_idx, stmt in enumerate(statements):#遍歷每條語句
                # We want to skip over certain types, such as Imarks
                if self._need_to_ignore(node.addr, stmt, stmt_idx):
                    continue

                # break statement down into sub-expressions
                exprs = stmt.expressions #獲得語句的子表示式
                stmt_node = stmt
                dfg.add_node(stmt)

                if stmt.tag == 'Ist_WrTmp':
                    tmpsnodes[stmt.tmp] = stmt_node
                    if exprs[0].tag == 'Iex_Binop':
                        if exprs[1].tag == 'Iex_RdTmp':
                            dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node)
                        else:
                            dfg.add_edge(exprs[1], stmt_node)
                        if exprs[2].tag == 'Iex_RdTmp':
                            dfg.add_edge(tmpsnodes[exprs[2].tmp], stmt_node)
                        else:
                            dfg.add_edge(exprs[2], stmt_node)

                    elif exprs[0].tag == 'Iex_Unop':
                        dfg.remove_node(stmt_node)
                        if exprs[1].tag == 'Iex_RdTmp':
                            tmpsnodes[stmt.tmp] = copy(tmpsnodes[exprs[1].tmp])
                            tmpsnodes[stmt.tmp].tmp = stmt.tmp
                        else:
                            tmpsnodes[stmt.tmp] = exprs[1]

                    elif exprs[0].tag == 'Iex_RdTmp':
                        tmpsnodes[stmt.tmp] = copy(tmpsnodes[exprs[0].tmp])
                        tmpsnodes[stmt.tmp].tmp = stmt.tmp

                    elif exprs[0].tag == 'Iex_Get':
                        if putsnodes.has_key(exprs[0].offset):
                            dfg.add_edge(putsnodes[exprs[0].offset], stmt_node)
                        if len(exprs) > 1 and exprs[1].tag == "Iex_RdTmp":
                            dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node)
                        elif len(exprs) > 1:
                            dfg.add_edge(exprs[1], stmt_node)

                    elif exprs[0].tag == 'Iex_Load':
                        if exprs[1].tag == 'Iex_RdTmp':
                            dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node)
                        else:
                            dfg.add_edge(exprs[1], stmt_node)

                    else:
                        # Take a guess by assuming exprs[0] is the op and any other expressions are args
                        for e in exprs[1:]:
                            if e.tag == 'Iex_RdTmp':
                                dfg.add_edge(tmpsnodes[e.tmp], stmt_node)
                            else:
                                dfg.add_edge(e, stmt_node)

                elif stmt.tag == 'Ist_Store':
                    if exprs[0].tag == 'Iex_RdTmp':
                        dfg.add_edge(tmpsnodes[exprs[0].tmp], stmt_node)

                    elif exprs[0].tag == 'Iex_Const':
                        dfg.add_edge(exprs[0], stmt_node)

                    if exprs[1].tag == 'Iex_RdTmp':
                        dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node)
                    else:
                        dfg.add_edge(exprs[1], stmt_node)

                elif stmt.tag == 'Ist_Put':
                    if exprs[0].tag == 'Iex_RdTmp':
                        dfg.add_edge(tmpsnodes[exprs[0].tmp], stmt_node)
                    elif exprs[0].tag == 'Iex_Const':
                        dfg.add_edge(exprs[0], stmt_node)
                    putsnodes[stmt.offset] = stmt_node

                elif stmt.tag == 'Ist_Exit':
                    if exprs[0].tag == 'Iex_RdTmp':
                        dfg.add_edge(tmpsnodes[exprs[0].tmp], stmt_node)

                elif stmt.tag == 'Ist_Dirty':
                    tmpsnodes[stmt.tmp] = stmt_node
                elif stmt.tag == 'Ist_CAS':
                    tmpsnodes[stmt.oldLo] = stmt_node

                else:
                    for e in stmt.expressions:
                        if e.tag == 'Iex_RdTmp':
                            dfg.add_edge(tmpsnodes[e.tmp], stmt_node)
                        else:
                            dfg.add_edge(e, stmt_node)

            for vtx in list(dfg.nodes()):
                if dfg.degree(vtx) == 0:
                    dfg.remove_node(vtx)

            if dfg.size() > 0:
                dfgs[node.addr] = dfg
        return dfgs

根據不同statements的型別,標記不同的點。

其實不僅可以用cfg來恢復資料流圖,任意的一個block都可以利用這個方法恢復資料流。

唯一的遺憾就是,恢復的資料流是block的,要想恢復函式間的資料流,就應該恢復資料依賴圖。

下面是我寫的恢復任意一個block的測試程式碼,當然呼叫仍然是construct函式。

def main():
    proj = angr.Project("test2.bin",load_options={'auto_load_libs':False})
    start_addr=0x1F065405
    start_state= proj.factory.blank_state(addr=start_addr)
    addrs=[start_addr]
    dfgs=constructDFG(addrs,proj)
    print len(dfgs)
    plot_common(dfgs[start_addr],"dfg_1F065405")

constructDFG只是對construct的一點更改

def constructDFG(addrs,project):
    dfgs={}
    for addr in addrs:
        irsb = project.factory.block(addr).vex
        if irsb is not None:
            tmpsnodes = {}
            storesnodes = {}
            putsnodes = {}
            statements = irsb.statements 
            dfg = DiGraph()

            for stmt_idx, stmt in enumerate(statements): 

                # break statement down into sub-expressions 
                exprs = stmt.expressions 
                stmt_node = stmt 
                dfg.add_node(stmt)

                if stmt.tag == 'Ist_WrTmp': 
                    tmpsnodes[stmt.tmp] = stmt_node 
                    if exprs[0].tag == 'Iex_Binop':
                        if exprs[1].tag == 'Iex_RdTmp':
                            dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node)
                        else:
                            dfg.add_edge(exprs[1], stmt_node)
                        if exprs[2].tag == 'Iex_RdTmp':
                            dfg.add_edge(tmpsnodes[exprs[2].tmp], stmt_node)
                        else:
                            dfg.add_edge(exprs[2], stmt_node)

                    elif exprs[0].tag == 'Iex_Unop':
                        dfg.remove_node(stmt_node)
                        if exprs[1].tag == 'Iex_RdTmp':
                            tmpsnodes[stmt.tmp] = copy(tmpsnodes[exprs[1].tmp])
                            tmpsnodes[stmt.tmp].tmp = stmt.tmp
                        else:
                            tmpsnodes[stmt.tmp] = exprs[1]

                    elif exprs[0].tag == 'Iex_RdTmp':
                        tmpsnodes[stmt.tmp] = copy(tmpsnodes[exprs[0].tmp])
                        tmpsnodes[stmt.tmp].tmp = stmt.tmp

                    elif exprs[0].tag == 'Iex_Get':
                        if putsnodes.has_key(exprs[0].offset):
                            dfg.add_edge(putsnodes[exprs[0].offset], stmt_node)
                        if len(exprs) > 1 and exprs[1].tag == "Iex_RdTmp":
                            dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node)
                        elif len(exprs) > 1:
                            dfg.add_edge(exprs[1], stmt_node)

                    elif exprs[0].tag == 'Iex_Load':
                        if exprs[1].tag == 'Iex_RdTmp':
                            dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node)
                        else:
                            dfg.add_edge(exprs[1], stmt_node)

                    else:
                        # Take a guess by assuming exprs[0] is the op and any other expressions are args
                        for e in exprs[1:]:
                            if e.tag == 'Iex_RdTmp':
                                dfg.add_edge(tmpsnodes[e.tmp], stmt_node)
                            else:
                                dfg.add_edge(e, stmt_node)

                elif stmt.tag == 'Ist_Store':
                    if exprs[0].tag == 'Iex_RdTmp':
                        dfg.add_edge(tmpsnodes[exprs[0].tmp], stmt_node)

                    elif exprs[0].tag == 'Iex_Const':
                        dfg.add_edge(exprs[0], stmt_node)

                    if exprs[1].tag == 'Iex_RdTmp':
                        dfg.add_edge(tmpsnodes[exprs[1].tmp], stmt_node)
                    else:
                        dfg.add_edge(exprs[1], stmt_node)

                elif stmt.tag == 'Ist_Put':
                    if exprs[0].tag == 'Iex_RdTmp':
                        dfg.add_edge(tmpsnodes[exprs[0].tmp], stmt_node)
                    elif exprs[0].tag == 'Iex_Const':
                        dfg.add_edge(exprs[0], stmt_node)
                    putsnodes[stmt.offset] = stmt_node

                elif stmt.tag == 'Ist_Exit':
                    if exprs[0].tag == 'Iex_RdTmp':
                        dfg.add_edge(tmpsnodes[exprs[0].tmp], stmt_node)

                elif stmt.tag == 'Ist_Dirty':
                    tmpsnodes[stmt.tmp] = stmt_node
                elif stmt.tag == 'Ist_CAS':
                    tmpsnodes[stmt.oldLo] = stmt_node

                else:
                    for e in stmt.expressions:
                        if e.tag == 'Iex_RdTmp':
                            dfg.add_edge(tmpsnodes[e.tmp], stmt_node)
                        else:
                            dfg.add_edge(e, stmt_node)

            for vtx in list(dfg.nodes()):
                if dfg.degree(vtx) == 0:
                    dfg.remove_node(vtx)

            if dfg.size() > 0:
                dfgs[addr] = dfg
        return dfgs

最終畫出的圖為: