EDNS Client Subnet(ECS)探测

2021-06-07  本文已影响0人  贰爷

使用python进行EDNS Client Subnet(ECS)解析

  1. ECS是什么?
  2. python 解析域名
  3. 使用python进行edns解析
  4. ECS是什么?
    EDNS的一个字段,网上资料很多,不详细解释
  5. python 解析域名
    使用dnspython中的resolver进行解析域名即可,这里不在赘述
    下面是解析一个域名A记录的例子
import dns.resolver
dns_ret = dns.resolver.query("www.qq.com", 'A')
for i in dns_ret.response.answer:
    for j in i.items:
        print j.address
  1. 使用python进行edns解析
    python中的dnspython模块已经支持了edns解析,但是dns.resolver.query并没有对edns做很好的支持,这里通过进程dns.resolver.Resolver来重写query函数,来支持ecs解析,源码如下:
import time
import socket
import random
import dns
from dns.resolver import string_types, NoMetaqueries, NoAnswer, NoNameservers, YXDOMAIN, NXDOMAIN, Answer, Resolver
from dns.edns import ECSOption

class EdnsResolver(Resolver):
    def __init__(self):
        super().__init__()
        self.edns = 0

    def query(self, qname, rdtype=dns.rdatatype.A, rdclass=dns.rdataclass.IN,
              tcp=False, source=None, raise_on_no_answer=True, source_port=0,
              lifetime=None, edns_option=None, name_servers=None):
        """
        支持edns的query
        :param qname: 同原query
        :param rdtype: 同原query
        :param rdclass: 同原query
        :param tcp: 同原query
        :param source: 同原query
        :param raise_on_no_answer: 同原query
        :param source_port: 同原query
        :param lifetime: 同原query
        :param edns_option: EDNS选项,参考dns.edns.Option
        :param name_servers: 指定dns服务器
        :return: 同原query
        """
        if isinstance(qname, string_types):
            qname = dns.name.from_text(qname, None)
        if isinstance(rdtype, string_types):
            rdtype = dns.rdatatype.from_text(rdtype)
        if dns.rdatatype.is_metatype(rdtype):
            raise NoMetaqueries
        if isinstance(rdclass, string_types):
            rdclass = dns.rdataclass.from_text(rdclass)
        if dns.rdataclass.is_metaclass(rdclass):
            raise NoMetaqueries
        qnames_to_try = []
        if qname.is_absolute():
            qnames_to_try.append(qname)
        else:
            if len(qname) > 1:
                qnames_to_try.append(qname.concatenate(dns.name.root))
            if self.search:
                for suffix in self.search:
                    qnames_to_try.append(qname.concatenate(suffix))
            else:
                qnames_to_try.append(qname.concatenate(self.domain))
        all_nxdomain = True
        nxdomain_responses = {}
        start = time.time()
        _qname = None  # make pylint happy
        for _qname in qnames_to_try:
            if self.cache:
                answer = self.cache.get((_qname, rdtype, rdclass))
                if answer is not None:
                    if answer.rrset is None and raise_on_no_answer:
                        raise NoAnswer(response=answer.response)
                    else:
                        return answer
            request = dns.message.make_query(_qname, rdtype, rdclass)
            if self.keyname is not None:
                request.use_tsig(self.keyring, self.keyname,
                                 algorithm=self.keyalgorithm)
            request.use_edns(options=edns_option)
            if self.flags is not None:
                request.flags = self.flags
            response = None
            #
            # make a copy of the servers list so we can alter it later.
            #
            if name_servers:
                nameservers = name_servers
            else:
                nameservers = self.nameservers[:]
            errors = []
            if self.rotate:
                random.shuffle(nameservers)
            backoff = 0.10
            while response is None:
                if len(nameservers) == 0:
                    raise NoNameservers(request=request, errors=errors)
                for nameserver in nameservers[:]:
                    timeout = self._compute_timeout(start, lifetime)
                    port = self.nameserver_ports.get(nameserver, self.port)
                    try:
                        tcp_attempt = tcp
                        if tcp:
                            response = dns.query.tcp(request, nameserver,
                                                     timeout, port,
                                                     source=source,
                                                     source_port=source_port)
                        else:
                            response = dns.query.udp(request, nameserver,
                                                     timeout, port,
                                                     source=source,
                                                     source_port=source_port)
                            if response.flags & dns.flags.TC:
                                # Response truncated; retry with TCP.
                                tcp_attempt = True
                                timeout = self._compute_timeout(start, lifetime)
                                response = \
                                    dns.query.tcp(request, nameserver,
                                                  timeout, port,
                                                  source=source,
                                                  source_port=source_port)
                    except (socket.error, dns.exception.Timeout) as ex:
                        #
                        # Communication failure or timeout.  Go to the
                        # next server
                        #
                        errors.append((nameserver, tcp_attempt, port, ex,
                                       response))
                        response = None
                        continue
                    except dns.query.UnexpectedSource as ex:
                        #
                        # Who knows?  Keep going.
                        #
                        errors.append((nameserver, tcp_attempt, port, ex,
                                       response))
                        response = None
                        continue
                    except dns.exception.FormError as ex:
                        #
                        # We don't understand what this server is
                        # saying.  Take it out of the mix and
                        # continue.
                        #
                        nameservers.remove(nameserver)
                        errors.append((nameserver, tcp_attempt, port, ex,
                                       response))
                        response = None
                        continue
                    except EOFError as ex:
                        #
                        # We're using TCP and they hung up on us.
                        # Probably they don't support TCP (though
                        # they're supposed to!).  Take it out of the
                        # mix and continue.
                        #
                        nameservers.remove(nameserver)
                        errors.append((nameserver, tcp_attempt, port, ex,
                                       response))
                        response = None
                        continue
                    rcode = response.rcode()
                    if rcode == dns.rcode.YXDOMAIN:
                        ex = YXDOMAIN()
                        errors.append((nameserver, tcp_attempt, port, ex,
                                       response))
                        raise ex
                    if rcode == dns.rcode.NOERROR or \
                            rcode == dns.rcode.NXDOMAIN:
                        break
                    #
                    # We got a response, but we're not happy with the
                    # rcode in it.  Remove the server from the mix if
                    # the rcode isn't SERVFAIL.
                    #
                    if rcode != dns.rcode.SERVFAIL or not self.retry_servfail:
                        nameservers.remove(nameserver)
                    errors.append((nameserver, tcp_attempt, port,
                                   dns.rcode.to_text(rcode), response))
                    response = None
                if response is not None:
                    break
                #
                # All nameservers failed!
                #
                if len(nameservers) > 0:
                    #
                    # But we still have servers to try.  Sleep a bit
                    # so we don't pound them!
                    #
                    timeout = self._compute_timeout(start, lifetime)
                    sleep_time = min(timeout, backoff)
                    backoff *= 2
                    time.sleep(sleep_time)
            if response.rcode() == dns.rcode.NXDOMAIN:
                nxdomain_responses[_qname] = response
                continue
            all_nxdomain = False
            break
        if all_nxdomain:
            raise NXDOMAIN(qnames=qnames_to_try, responses=nxdomain_responses)
        answer = Answer(_qname, rdtype, rdclass, response,
                        raise_on_no_answer)
        if self.cache:
            self.cache.put((_qname, rdtype, rdclass), answer)
        return answer

    def ecs_query(self, qname, address, name_servers=None, srclen=32, rdtype=dns.rdatatype.A):
        """
        基于ecs封装了一下query,建单实现ecs查询
        :param qname: 带解析域名
        :param address: ecs地址
        :param name_servers: 指定dns
        :param srclen: ecs地址的子网掩码
        :param rdtype: 解析方式
        :return: 同原query
        """
        ecs = ECSOption(address, srclen)  # 子网掩码必须填写32(默认是24),部分dns不支持解析24的C段
        return self.query(qname, rdtype, edns_option=[ecs], name_servers=name_servers)

使用示例

if __name__ == '__main__':
    resolver = EdnsResolver()
    # x.x.x.x 是要测试的client IP,解析域名时会根据client IP所在的地域进行解析。
    a = resolver.ecs_query('www.qq.com', "x.x.x.x", name_servers=['8.8.8.8'])
    for i in a.response.answer:
        for j in i.items:
            print(j)
上一篇下一篇

猜你喜欢

热点阅读