CTF-PWN工作生活

Angr 中的函数识别

2019-07-02  本文已影响0人  Nevv

Angr 中的函数识别

函数识别概述

​ 二进制文件通过IDA或者radare2这样的反汇编工具,能够识别二进制文件中的函数边界信息,并根据其调用关系生成整个程序的函数调用图。因此准确识别二进制文件中的函数边界对于进一步的分析二进制文件非常重要。二进制文件通常组织为 数据、代码、元数据的形式,在没有strip掉符号表的二进制文件中,关于函数的起始偏移、大小通常在元数据中可以直接找到,但是对于去除符号表的二进制文件,由于其符号表的缺失,通常需要采用别的以下方法进行函数识别:

​ 但是采用以上方法同样会存在一些不准确的问题,比如依赖函数序言和尾声的检查,在使用不同编译器或者优化选项的情况下,采用这种硬编码的方式可能就不再适用;对于call或者jmp指令的分析,可能很多地址只有在运行时才能确定等等。

Angr中的函数识别

1. 初始化阶段

​ Angr中函数识别的过程是在CFGFast中构建CFG的时候进行的,在初始化阶段主要步骤如下:

2. 分析阶段

_analyze方法很简单,主要就是self._pre_analysis()和根据self._graph_visitor 是否存在来调用对应的分析方法,对于还没有建立一个图结构的分析来说(就比如cfg恢复分析),第一次总是会调用self._analysis_core_baremetal() 方法。

    def _analyze(self):
        """
        The main analysis routine.

        :return: None
        """

        self._pre_analysis()

        if self._graph_visitor is None:
            # There is no base graph that we can rely on. The analysis itself should generate successors for the
            # current job.
            # An example is the CFG recovery.

            self._analysis_core_baremetal()

        else:
            # We have a base graph to follow. Just handle the current job.

            self._analysis_core_graph()

        self._post_analysis()
0x1 _pre_analysis

​ 这个函数的主要功能就是,初始化分析过程中需要用到的变量,使用符号表,将符号表中的函数起始位置作为分析的起始位置,使用函数序言搜索函数,并将搜索结果保存在 _function_prologue_addrs 中。

    def _pre_analysis(self):
        import pdb
        pdb.set_trace()
        # 初始化一些cfg相关的变量
        self._initialize_cfg()

        # Scan for __x86_return_thunk and friends
        self._known_thunks = self._find_thunks()
        """
        这里应该是会寻找一些特殊的字节序列
>>> print disasm('E807000000F3900FAEE8EBF948890424C3'.decode("hex"))
   0:   e8 07 00 00 00          call   0xc
   5:   f3 90                   pause  
   7:   0f ae e8                lfence 
   a:   eb f9                   jmp    0x5
   c:   48                      dec    eax
   d:   89 04 24                mov    DWORD PTR [esp],eax
  10:   c3                      ret
>>> print disasm('E807000000F3900FAEE8EBF9488D642408C3'.decode("hex"))
   0:   e8 07 00 00 00          call   0xc
   5:   f3 90                   pause  
   7:   0f ae e8                lfence 
   a:   eb f9                   jmp    0x5
   c:   48                      dec    eax
   d:   8d 64 24 08             lea    esp,[esp+0x8]
  11:   c3                      ret
>>>  
        """

        # 初始化一些分析时候需要用到的变量
        self._pending_jobs = PendingJobs(self.functions, self._deregister_analysis_job)
        self._traced_addresses = set()
        self._function_returns = defaultdict(set)

        # 不是所有的函数调用都使用call指令,因此需要记录下每一个单一函数的退出点,
        # 在需要函数调用的时候,在函数调用图上添加对应的边
        self._function_exits = defaultdict(set)

        # 创建一个初始化状态
        self._initial_state = self.project.factory.blank_state(mode="fastpath")
        initial_options = self._initial_state.options - {o.TRACK_CONSTRAINTS} - o.refs
        initial_options |= {o.SUPER_FASTPATH, o.SYMBOL_FILL_UNCONSTRAINED_REGISTERS, o.SYMBOL_FILL_UNCONSTRAINED_MEMORY}
        # initial_options.remove(o.COW_STATES)
        self._initial_state.options = initial_options

        starting_points = set()

        # clear all existing functions
        self.kb.functions.clear()

        if self._use_symbols:
            starting_points |= self._function_addresses_from_symbols
        # 根据符号表获取函数的地址,这里有190个函数
        """
        调试的时候不知道这里为什么有一个0
        """

        if self._extra_function_starts:
            starting_points |= set(self._extra_function_starts)

        # 对函数入口点进行排序
        starting_points = sorted(list(starting_points), reverse=True)

        if self._start_at_entry and self.project.entry is not None and self._inside_regions(self.project.entry) and \
                self.project.entry not in starting_points:
            # make sure self.project.entry is inserted
            starting_points += [ self.project.entry ]

        # 对于每一个起始位置,创建一个CFGJOB进行分析
        for sp in starting_points:
            job = CFGJob(sp, sp, 'Ijk_Boring')
            self._insert_job(job)
            # register the job to function `sp`
            self._register_analysis_job(sp, job)

        self._updated_nonreturning_functions = set()

        # 这里是使用函数序言进行函数查找,该例子找到了217个
        if self._use_function_prologues and self.project.concrete_target is None:
            self._function_prologue_addrs = sorted(self._func_addrs_from_prologues())
            # make a copy of those prologue addresses, so that we can pop from the list
            self._remaining_function_prologue_addrs = self._function_prologue_addrs[::]

            # make function_prologue_addrs a set for faster lookups
            self._function_prologue_addrs = set(self._function_prologue_addrs)

0x2 _analysis_core_baremetal

​ 这个函数的功能就是从刚才的队列中取出来添加的 job 并处理。主要就是以下三步:

def _analysis_core_baremetal(self):

    if not self._job_info_queue:
        self._job_queue_empty()

    while not self.should_abort:

        if self._status_callback is not None:
            self._status_callback(self)

        # should_abort might be changed by the status callback function
        if self.should_abort:
            return

        if not self._job_info_queue:
            self._job_queue_empty()   # 时间消耗 1/3

        if not self._job_info_queue:
            # still no job available
            break

        job_info = self._job_info_queue[0]

        try:
            self._pre_job_handling(job_info.job)
        except AngrDelayJobNotice:
            # delay the handling of this job
            continue
        except AngrSkipJobNotice:
            # consume and skip this job
            self._job_info_queue = self._job_info_queue[1:]
            self._job_map.pop(self._job_key(job_info.job), None)
            continue

        # remove the job info from the map
        self._job_map.pop(self._job_key(job_info.job), None)

        self._job_info_queue = self._job_info_queue[1:]

        self._process_job_and_get_successors(job_info)

        # Short-cut for aborting the analysis
        if self.should_abort:
            break

        self._intra_analysis()  
    def _job_queue_empty(self):

        if self._pending_jobs:
            # fastpath
            # look for a job that comes from a function that must return
            # if we can find one, just use it
            job = self._pop_pending_job(returning=True)
            if job is not None:
                self._insert_job(job)
                return

            self._clean_pending_exits()

        # did we finish analyzing any function?
        # fill in self._completed_functions
        self._make_completed_functions()

        # analyze function features, most importantly, whether each function returns or not
        self._analyze_all_function_features()

        # Clear _changed_functions set
        self._updated_nonreturning_functions = set()

        if self._pending_jobs:
            self._clean_pending_exits()

            job = self._pop_pending_job(returning=True)
            if job is not None:
                self._insert_job(job)
                return

            job = self._pop_pending_job(returning=False)
            if job is not None:
                self._insert_job(job)
                return

        # Try to see if there is any indirect jump left to be resolved
        if self._resolve_indirect_jumps and self._indirect_jumps_to_resolve:
            self._process_unresolved_indirect_jumps()

            if self._job_info_queue:
                return

        if self._use_function_prologues and self._remaining_function_prologue_addrs:
            while self._remaining_function_prologue_addrs:
                prolog_addr = self._remaining_function_prologue_addrs[0]
                self._remaining_function_prologue_addrs = self._remaining_function_prologue_addrs[1:]
                if self._seg_list.is_occupied(prolog_addr):
                    continue

                job = CFGJob(prolog_addr, prolog_addr, 'Ijk_Boring')
                self._insert_job(job)
                self._register_analysis_job(prolog_addr, job)
                return

        if self._force_complete_scan:
            addr = self._next_code_addr()
            if addr is None:
                l.debug("Force-scan jumping failed")
            else:
                l.debug("Force-scanning to %#x", addr)

            if addr is not None:
                job = CFGJob(addr, addr, "Ijk_Boring", last_addr=None, job_type=CFGJob.JOB_TYPE_COMPLETE_SCANNING)
                self._insert_job(job)
                self._register_analysis_job(addr, job)

    def _process_job_and_get_successors(self, job_info):
        """
        Process a job, get all successors of this job, and call _handle_successor() to handle each successor.

        :param JobInfo job_info: The JobInfo instance
        :return: None
        """

        job = job_info.job

        successors = self._get_successors(job)

        all_new_jobs = [ ]

        for successor in successors:
            new_jobs = self._handle_successor(job, successor, successors)
            # 在cfgfast中是直接把其所有的后继返回

            if new_jobs:
                all_new_jobs.extend(new_jobs)

                for new_job in new_jobs:
                    self._insert_job(new_job)

        self._post_job_handling(job, all_new_jobs, successors)

​ _get_successors 在子类 cfgFast 中实现,主要功能是从给定的地址在搜索一个基本块

0x3 _get_successors

​ 调用_scan_block函数,并将其后继基本块包装为job对象,并添加到待分析的队列中:

    def _get_successors(self, job):  # pylint:disable=arguments-differ

        # current_function_addr = job.func_addr
        # addr = job.addr

        # if current_function_addr != -1:
        #    l.debug("Tracing new exit %#x in function %#x", addr, current_function_addr)
        # else:
        #    l.debug("Tracing new exit %#x", addr)

        jobs = self._scan_block(job)

        # l.debug("... got %d jobs: %s", len(jobs), jobs)

        for job_ in jobs:  # type: CFGJob
            # register those jobs
            self._register_analysis_job(job_.func_addr, job_)

        return jobs
0x4 _scan_block

    def _scan_block(self, cfg_job):
        """
        Scan a basic block starting at a specific address

        :param CFGJob cfg_job: The CFGJob instance.
        :return: a list of successors
        :rtype: list
        """

        addr = cfg_job.addr
        current_func_addr = cfg_job.func_addr

        # Fix the function address
        # This is for rare cases where we cannot successfully determine the end boundary of a previous function, and
        # as a consequence, our analysis mistakenly thinks the previous function goes all the way across the boundary,
        # resulting the missing of the second function in function manager.
        if addr in self._function_addresses_from_symbols:
            current_func_addr = addr

        if self._addr_hooked_or_syscall(addr):
            entries = self._scan_procedure(cfg_job, current_func_addr)

        else:
            entries = self._scan_irsb(cfg_job, current_func_addr)

        return entries
0x5 _scan_irsb
    def _scan_irsb(self, cfg_job, current_func_addr):
        """
        Generate a list of successors (generating them each as entries) to IRSB.
        Updates previous CFG nodes with edges.

        :param CFGJob cfg_job: The CFGJob instance.
        :param int current_func_addr: Address of the current function
        :return: a list of successors
        :rtype: list
        """
        # 生成cfgnode
        addr, function_addr, cfg_node, irsb = self._generate_cfgnode(cfg_job, current_func_addr)
    
        # 添加函数内部指向该node的边
        cfg_job.apply_function_edges(self, clear=True)

        # function_addr and current_function_addr can be different. e.g. when tracing an optimized tail-call that jumps
        # into another function that has been identified before.

        if cfg_node is None:
            # exceptions occurred, or we cannot get a CFGNode for other reasons
            return [ ]
        
        # 为cfg添加相应的边
        self._graph_add_edge(cfg_node, cfg_job.src_node, cfg_job.jumpkind, cfg_job.src_ins_addr,
                             cfg_job.src_stmt_idx
                             )
        # 将对应的cfg添加到对应函数
        self._function_add_node(cfg_node, function_addr)

        if self.functions.get_by_addr(function_addr).returning is not True:
            self._updated_nonreturning_functions.add(function_addr)

        # If we have traced it before, don't trace it anymore
        real_addr = get_real_address_if_arm(self.project.arch, addr)
        if real_addr in self._traced_addresses:
            # the address has been traced before
            return [ ]
        else:
            # Mark the address as traced
            self._traced_addresses.add(real_addr)

        # irsb cannot be None here
        # assert irsb is not None

        # IRSB在每个CFGNode中只使用一次,因此在这里必须释放掉以节省内存
        cfg_node.irsb = None
        # 1/10 _scan_irsb 的时间消耗
        self._process_block_arch_specific(addr, irsb, function_addr)

        # Scan the basic block to collect data references
        if self._collect_data_ref:
            self._collect_data_references(irsb, addr)
        # 3/20 _scan_irsb 的时间消耗
        # Get all possible successors
        irsb_next, jumpkind = irsb.next, irsb.jumpkind
        successors = [ ]

        last_ins_addr = None
        ins_addr = addr
        if irsb.statements:
            for i, stmt in enumerate(irsb.statements):
                if isinstance(stmt, pyvex.IRStmt.Exit):
                    successors.append((i,
                                       last_ins_addr if self.project.arch.branch_delay_slot else ins_addr,
                                       stmt.dst,
                                       stmt.jumpkind
                                       )
                                      )
                elif isinstance(stmt, pyvex.IRStmt.IMark):
                    last_ins_addr = ins_addr
                    ins_addr = stmt.addr + stmt.delta
        else:
            for ins_addr, stmt_idx, exit_stmt in irsb.exit_statements:
                successors.append((
                    stmt_idx,
                    last_ins_addr if self.project.arch.branch_delay_slot else ins_addr,
                    exit_stmt.dst,
                    exit_stmt.jumpkind
                ))

        successors.append((DEFAULT_STATEMENT,
                           last_ins_addr if self.project.arch.branch_delay_slot else ins_addr, irsb_next, jumpkind)
                          )

        entries = [ ]

        # 如果是arm架构的话,就会做一些处理,然后再返回
        successors = self._post_process_successors(addr, irsb.size, successors)

        # Process each successor 这一部分时间消耗占用了 15/20
        for suc in successors:
            stmt_idx, ins_addr, target, jumpkind = suc

            entries += self._create_jobs(target, jumpkind, function_addr, irsb, addr, cfg_node, ins_addr,
                                         stmt_idx
                                         )

        return entries
0x6 _create_jobs

​ 给定一个node和其后继节点的一些信息,返回CFGJobs的list。这个函数的主要执行流程如下:

    def _create_jobs(self, target, jumpkind, current_function_addr, irsb, addr, cfg_node, ins_addr, stmt_idx):
        """
        Given a node and details of a successor, makes a list of CFGJobs
        and if it is a call or exit marks it appropriately so in the CFG

        :param int target:          Destination of the resultant job
        :param str jumpkind:        The jumpkind of the edge going to this node
        :param int current_function_addr: Address of the current function
        :param pyvex.IRSB irsb:     IRSB of the predecessor node
        :param int addr:            The predecessor address
        :param CFGNode cfg_node:    The CFGNode of the predecessor node
        :param int ins_addr:        Address of the source instruction.
        :param int stmt_idx:        ID of the source statement.
        :return:                    a list of CFGJobs
        :rtype:                     list
        """

        if type(target) is pyvex.IRExpr.Const:  # pylint: disable=unidiomatic-typecheck
            target_addr = target.con.value
        elif type(target) in (pyvex.IRConst.U8, pyvex.IRConst.U16, pyvex.IRConst.U32, pyvex.IRConst.U64):  # pylint: disable=unidiomatic-typecheck
            target_addr = target.value
        elif type(target) is int:  # pylint: disable=unidiomatic-typecheck
            target_addr = target
        else:
            target_addr = None

        if target_addr in self._known_thunks and jumpkind == 'Ijk_Boring':
            thunk_kind = self._known_thunks[target_addr][0]
            if thunk_kind == 'ret':
                jumpkind = 'Ijk_Ret'
                target_addr = None
            elif thunk_kind == 'jmp':
                pass # ummmmmm not sure about this one
            else:
                raise AngrCFGError("This shouldn't be possible")

        jobs = [ ]
        is_syscall = jumpkind.startswith("Ijk_Sys")

        # Special handling:
        # If a call instruction has a target that points to the immediate next instruction, we treat it as a boring jump
        if jumpkind == "Ijk_Call" and \
                not self.project.arch.call_pushes_ret and \
                cfg_node.instruction_addrs and \
                ins_addr == cfg_node.instruction_addrs[-1] and \
                target_addr == irsb.addr + irsb.size:
            jumpkind = "Ijk_Boring"

        if target_addr is None:
            # The target address is not a concrete value

            if jumpkind == "Ijk_Ret":
                # This block ends with a return instruction.
                if current_function_addr != -1:
                    self._function_exits[current_function_addr].add(addr)
                    self._function_add_return_site(addr, current_function_addr)
                    self.functions[current_function_addr].returning = True
                    self._pending_jobs.add_returning_function(current_function_addr)

                cfg_node.has_return = True

            elif self._resolve_indirect_jumps and \
                    (jumpkind in ('Ijk_Boring', 'Ijk_Call', 'Ijk_InvalICache') or jumpkind.startswith('Ijk_Sys')):
                # This is an indirect jump. Try to resolve it.
                # FIXME: in some cases, a statementless irsb will be missing its instr addresses
                # and this next part will fail. Use the real IRSB instead
                irsb = cfg_node.block.vex
                cfg_node.instruction_addrs = irsb.instruction_addresses
                resolved, resolved_targets, ij = self._indirect_jump_encountered(addr, cfg_node, irsb,
                                                                                 current_function_addr, stmt_idx)
                if resolved:
                    for resolved_target in resolved_targets:
                        if jumpkind == 'Ijk_Call':
                            jobs += self._create_job_call(cfg_node.addr, irsb, cfg_node, stmt_idx, ins_addr,
                                                          current_function_addr, resolved_target, jumpkind)
                        else:
                            edge = FunctionTransitionEdge(cfg_node, resolved_target, current_function_addr,
                                                          to_outside=False, stmt_idx=stmt_idx, ins_addr=ins_addr,
                                                          )
                            ce = CFGJob(resolved_target, current_function_addr, jumpkind,
                                        last_addr=resolved_target, src_node=cfg_node, src_stmt_idx=stmt_idx,
                                        src_ins_addr=ins_addr, func_edges=[ edge ],
                                        )
                            jobs.append(ce)
                    return jobs

                if jumpkind in ("Ijk_Boring", 'Ijk_InvalICache'):
                    resolved_as_plt = False

                    if irsb and self._heuristic_plt_resolving:
                        # Test it on the initial state. Does it jump to a valid location?
                        # It will be resolved only if this is a .plt entry
                        resolved_as_plt = self._resolve_plt(addr, irsb, ij)

                        if resolved_as_plt:
                            jump_target = next(iter(ij.resolved_targets))
                            target_func_addr = jump_target  # TODO: FIX THIS

                            edge = FunctionTransitionEdge(cfg_node, jump_target, current_function_addr,
                                                          to_outside=True, dst_func_addr=jump_target,
                                                          stmt_idx=stmt_idx, ins_addr=ins_addr,
                                                          )
                            ce = CFGJob(jump_target, target_func_addr, jumpkind, last_addr=jump_target,
                                        src_node=cfg_node, src_stmt_idx=stmt_idx, src_ins_addr=ins_addr,
                                        func_edges=[edge],
                                        )
                            jobs.append(ce)

                    if resolved_as_plt:
                        # has been resolved as a PLT entry. Remove it from indirect_jumps_to_resolve
                        if ij.addr in self._indirect_jumps_to_resolve:
                            self._indirect_jumps_to_resolve.remove(ij.addr)
                            self._deregister_analysis_job(current_function_addr, ij)
                    else:
                        # add it to indirect_jumps_to_resolve
                        self._indirect_jumps_to_resolve.add(ij)

                        # register it as a job for the current function
                        self._register_analysis_job(current_function_addr, ij)

                else:  # jumpkind == "Ijk_Call" or jumpkind.startswith('Ijk_Sys')
                    self._indirect_jumps_to_resolve.add(ij)
                    self._register_analysis_job(current_function_addr, ij)

                    jobs += self._create_job_call(addr, irsb, cfg_node, stmt_idx, ins_addr, current_function_addr, None,
                                                  jumpkind, is_syscall=is_syscall
                                                  )

        elif target_addr is not None:
            # This is a direct jump with a concrete target.

            # pylint: disable=too-many-nested-blocks
            if jumpkind in ('Ijk_Boring', 'Ijk_InvalICache'):
                # if the target address is at another section, it has to be jumping to a new function
                if not self._addrs_belong_to_same_section(addr, target_addr):
                    target_func_addr = target_addr
                    to_outside = True
                else:
                    # it might be a jumpout
                    target_func_addr = None
                    real_target_addr = get_real_address_if_arm(self.project.arch, target_addr)
                    if real_target_addr in self._traced_addresses:
                        node = self.model.get_any_node(target_addr)
                        if node is not None:
                            target_func_addr = node.function_address
                    if target_func_addr is None:
                        target_func_addr = current_function_addr

                    to_outside = not target_func_addr == current_function_addr

                edge = FunctionTransitionEdge(cfg_node, target_addr, current_function_addr,
                                              to_outside=to_outside,
                                              dst_func_addr=target_func_addr,
                                              ins_addr=ins_addr,
                                              stmt_idx=stmt_idx,
                                              )

                ce = CFGJob(target_addr, target_func_addr, jumpkind, last_addr=addr, src_node=cfg_node,
                            src_ins_addr=ins_addr, src_stmt_idx=stmt_idx, func_edges=[ edge ])
                jobs.append(ce)

            elif jumpkind == 'Ijk_Call' or jumpkind.startswith("Ijk_Sys"):
                jobs += self._create_job_call(addr, irsb, cfg_node, stmt_idx, ins_addr, current_function_addr,
                                              target_addr, jumpkind, is_syscall=is_syscall
                                              )

            else:
                # TODO: Support more jumpkinds
                l.debug("Unsupported jumpkind %s", jumpkind)
                l.debug("Instruction address: %#x", ins_addr)

        return jobs
0x7 时间消耗分析
3. 解析间接调用
0x1 process_unresolved_indirect_jumps
    def _process_unresolved_indirect_jumps(self):
        """
        Resolve all unresolved indirect jumps found in previous scanning.

        Currently we support resolving the following types of indirect jumps:
        - Ijk_Call: indirect calls where the function address is passed in from a proceeding basic block
        - Ijk_Boring: jump tables
        - For an up-to-date list, see analyses/cfg/indirect_jump_resolvers

        :return:    A set of concrete indirect jump targets (ints).
        :rtype:     set
        """

        l.info("%d indirect jumps to resolve.", len(self._indirect_jumps_to_resolve))

        all_targets = set()
        for idx, jump in enumerate(self._indirect_jumps_to_resolve):  # type:int,IndirectJump
            if self._low_priority:
                self._release_gil(idx, 20, 0.0001)
            all_targets |= self._process_one_indirect_jump(jump)

        self._indirect_jumps_to_resolve.clear()

        return all_targets
0x2 process_one_indirect_jump

​ 使用angr.analyses.cfg.indirect_jump_resolvers.jumptable.JumpTableResolver求解器来求解间接调用。

def _process_one_indirect_jump(self, jump):
    """
    Resolve a given indirect jump.

    :param IndirectJump jump:  The IndirectJump instance.
    :return:        A set of resolved indirect jump targets (ints).
    """

    resolved = False
    resolved_by = None
    targets = None

    block = self._lift(jump.addr, opt_level=1)

    for resolver in self.indirect_jump_resolvers:
        resolver.base_state = self._base_state

        if not resolver.filter(self, jump.addr, jump.func_addr, block, jump.jumpkind):
            continue

        resolved, targets = resolver.resolve(self, jump.addr, jump.func_addr, block, jump.jumpkind)
        if resolved:
            resolved_by = resolver
            break

    if resolved:
        self._indirect_jump_resolved(jump, jump.addr, resolved_by, targets)
    else:
        self._indirect_jump_unresolved(jump)

    return set() if targets is None else set(targets)
0x3 indirect_jump_encountered

​ 当遇到间接跳转时调用。将尝试使用不受时间限制(快速)的间接跳转解析器来解决这个间接跳转。如果无法解决,将查看以前是否已经解决了这个间接跳转。

    def _indirect_jump_encountered(self, addr, cfg_node, irsb, func_addr, stmt_idx=DEFAULT_STATEMENT):
        """
        Called when we encounter an indirect jump. We will try to resolve this indirect jump using timeless (fast)
        indirect jump resolvers. If it cannot be resolved, we will see if this indirect jump has been resolved before.

        :param int addr:                Address of the block containing the indirect jump.
        :param cfg_node:                The CFGNode instance of the block that contains the indirect jump.
        :param pyvex.IRSB irsb:         The IRSB instance of the block that contains the indirect jump.
        :param int func_addr:           Address of the current function.
        :param int or str stmt_idx:     ID of the source statement.

        :return:    A 3-tuple of (whether it is resolved or not, all resolved targets, an IndirectJump object
                    if there is one or None otherwise)
        :rtype:     tuple
        """

        jumpkind = irsb.jumpkind
        l.debug('IRSB %#x has an indirect jump (%s) as its default exit.', addr, jumpkind)

        # try resolving it fast
        resolved, resolved_targets = self._resolve_indirect_jump_timelessly(addr, irsb, func_addr, jumpkind)
        if resolved:
            l.debug("Indirect jump at block %#x is resolved by a timeless indirect jump resolver. "
                    "%d targets found.", addr, len(resolved_targets))
            return True, resolved_targets, None

        l.debug("Indirect jump at block %#x cannot be resolved by a timeless indirect jump resolver.", addr)

        # Add it to our set. Will process it later if user allows.
        # Create an IndirectJump instance
        if addr not in self.indirect_jumps:
            if self.project.arch.branch_delay_slot:
                ins_addr = cfg_node.instruction_addrs[-2]
            else:
                ins_addr = cfg_node.instruction_addrs[-1]
            ij = IndirectJump(addr, ins_addr, func_addr, jumpkind, stmt_idx, resolved_targets=[])
            self.indirect_jumps[addr] = ij
            resolved = False
        else:
            ij = self.indirect_jumps[addr]  # type: IndirectJump
            resolved = len(ij.resolved_targets) > 0

        return resolved, ij.resolved_targets, ij
0x4 resolve_indirect_jump_timelessly

​ 会调用angr.analyses.cfg.indirect_jump_resolvers.mips_elf_fast.MipsElfFastResolver来求解间接调用。

    def _resolve_indirect_jump_timelessly(self, addr, block, func_addr, jumpkind):
        """
        Checks if MIPS32 and calls MIPS32 check, otherwise false

        :param int addr: irsb address
        :param pyvex.IRSB block: irsb
        :param int func_addr: Function address
        :return: If it was resolved and targets alongside it
        :rtype: tuple
        """

        if block.statements is None:
            block = self.project.factory.block(block.addr, size=block.size).vex

        for res in self.timeless_indirect_jump_resolvers:
            if res.filter(self, addr, func_addr, block, jumpkind):
                r, resolved_targets = res.resolve(self, addr, func_addr, block, jumpkind)
                if r:
                    return True, resolved_targets
        return False, [ ]
上一篇 下一篇

猜你喜欢

热点阅读