学习并使用aiodns扫描数千万域名

aiodns

aiodns是一个基于asynciopycares编写的DNS解析模块,得力于asyncio模块,使它能够实现简单的DNS异步解析方法。

例子

import asyncio
import aiodns

loop = asyncio.get_event_loop()
resolver = aiodns.DNSResolver(loop=loop)

async def query(name, query_type):
    return await resolver.query(name, query_type)

coro = query('www.baidu.com', 'A')
result = loop.run_until_complete(coro)
print(result)

API

这里列出4个简单的API,他们都是DNSResolver类中的方法,更多的接口需要参考pycares文档

  • query(host, type):  通过传入的hostname和type进行DNS查询,它返回一个asyncio.Future实例,DNS解析结果由pycares模块直接返回。
  • gethostbyname(host, socket_family):  通过传入的hostname和socket_family来进行DNS查询。与query()直接解析DNS不同,gethostbyname在进行DNS解析前会扫描本地的hosts文件,因此可以解析到如localhost的本地主机名。
  • gethostbyaddr(name): 通过传入的IP地址反向查询域名。
  • cancel(): 取消所有所有待处理状态的DNS查询,被取消的Future将获得DNSError异常集合。

一个简单的需求

每天对上千万个域名进行DNS查询,获取域名的解析状态并在数据库中添加标签 resolved ,已解析的为 true,未解析的为false。

代码实现

import time
import asyncio
import aiodns

class AsyncQuery(object):
    def __init__(self):
        self.loop = asyncio.get_event_loop()
        self.resolver = aiodns.DNSResolver(timeout=0.1, loop=self.loop)
        self.queue = asyncio.Queue()
        self.resolved = []
        self.unresolved = []

        # generate domain and put into queue
        for _ in range(10000):
            self.queue.put_nowait("baidu")

    async def query(self):
        while True:
            name = await self.queue.get()
            domain = f"{name}.com"
            try:
                # result is a list.
                result = await self.resolver.query(domain, 'A')

                print(f"{domain} resolver is {result[0].host}")
                # self.resolved.append(name)  # Add domain into the list

            except aiodns.error.DNSError:
                print(f"{domain} is not resolved IP.")
                # self.unresolved.append(name)
            finally:
                self.queue.task_done()

    async def main(self):
        tasks = [asyncio.create_task(self.query()) for _ in range(50)]

        # Wait until the queue is fully processed.
        started_at = time.monotonic()
        await self.queue.join()
        run_time = time.monotonic() - started_at

        for task in tasks:
            task.cancel()

        await asyncio.gather(*tasks, return_exceptions=True)

        print(f'[*] query run time: {run_time:.2f} seconds')

    def run(self):
        try:
            self.loop.run_until_complete(self.main())
        except KeyboardInterrupt:
            print("Interrupt.")

query = AsyncQuery()
query.run()

代码分析

构造函数中首先从asyncio中获取一个EventLoop对象,再依次创建了
DNSResolver实例化对象、queue队列,最后通过for循环向队列中插入指定数量的域名。

query函数可以看作是一个消费者(worker),它通过while True无限循环的从队列(queue)中取任务并对域名进行解析和处理,最后的queue.task_done()是为了配合queue.join()进行阻塞调度的,当协程中的任务未完成时,join将阻塞下一个消费者的queue.get()请求,直到这个协程task_done为止。

main函数的主要功能是创建协程(消费者),用for _ in range(50)创建指定数量的协程,并在后面通过queue.join()阻塞队列,并使用time模块对阻塞的时间进行计算,得出本次扫描的时间。

run函数使用run_until_complete启动协程函数main.

测试

在公司网络和Vultr VPS中测试后,发现Vultr的协程数量可以设置到非常高,随着协程数量的提高,扫描速度也随之变得可观起来。

当域名数量很多的时候,需要删除协程中print(), 毕竟print()也是I/O操作,删除后速度美滋滋。

备注:像这样需求和简单的数据库操作,可以用MongoDB做数据库,批量写入和批量更新标签非常舒服,给集合加索引后更新标签更爽。: )

发表评论

电子邮件地址不会被公开。 必填项已用*标注