Python爬虫爬虫一起学python

爬虫代理小记与aiohttp代理尝试

2017-04-06  本文已影响2613人  treelake

总结了一些爬虫代理的资料和知识,并尝试使用asyncio和aiohttp使用代理ip访问目标网站,按代理IP的访问效果实时更新代理IP得分,初始获取3000左右代理IP,在稳定后,对摩拜单车信息的访问可以达到40次/秒-100次/秒。

代理IP方案简述

尝试

    url = ("http://m.66ip.cn/mo.php?tqsl={proxy_number}")
    url = url.format(proxy_number=10000)
    html = requests.get(url, headers=headers).content
    html = html.decode(chardet.detect(html)['encoding'])
    pattern = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}'
    all_ip = re.findall(pattern, html)
class Proxy:
    def __init__(self, ip):
        self._url = 'http://' + ip
        self._score = 100

    @property
    def url(self):
        return self._url

    @property
    def score(self):
        return self._score

    def __lt__(self, other):
        '''
        由于优先队列是返回最小的,而这里分数高的代理优秀
        所以比较时反过来
        '''
        return self._score > other._score

    def success(self, time):
        self._score += int(10 / int(time + 1))

    def timeoutError(self):
        self._score -= 10

    def connectError(self):
        self._score -= 30

    def otherError(self):
        self._score -= 50
async def douban(proxy, session):
# 使用代理访问目标网站,并按情况奖惩代理
    try:
        start = time.time()
        async with session.post(mobike_url,
                                data=data,
                                proxy=proxy.url,
                                headers=headers,  # 可以引用到外部的headers
                                timeout=10) as resp:
            end = time.time()
            # print(resp.status)
            if resp.status == 200:
                proxy.success(end - start)
                print('%6.3d' % proxy._score, 'Used time-->', end - start, 's')
            else:
                proxy.otherError()
                print('*****', resp.status, '*****')
    except TimeoutError as te:
        print('%6.3d' % proxy._score, 'timeoutError')
        proxy.timeoutError()
    except ClientConnectionError as ce:
        print('%6.3d' % proxy._score, 'connectError')
        proxy.connectError()
    except Exception as e:
        print('%6.3d' % proxy._score, 'otherError->', e)
        proxy.otherError()
# ClientHttpProxyError

# TCPConnector维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求,500和100相差不大
# ClientSession调用TCPConnector构造连接,Session可以共用
# Semaphore限制同时请求构造连接的数量,Semphore充足时,总时间与timeout差不多


async def initDouban():

    conn = aiohttp.TCPConnector(verify_ssl=False,
                                limit=100,  # 连接池在windows下不能太大
                                use_dns_cache=True)
    tasks = []
    async with aiohttp.ClientSession(loop=loop, connector=conn) as session:
        for p in proxies:
            task = asyncio.ensure_future(douban(p, session))
            tasks.append(task)

        responses = asyncio.gather(*tasks)
        await responses
    conn.close()


def firstFilter():
    for i in range(2):
        s = time.time()
        future = asyncio.ensure_future(initDouban())
        loop.run_until_complete(future)
        e = time.time()
        print('----- init time %s-----\n' % i, e - s, 's')

    num = 0
    pq = PriorityQueue()
    for proxy in proxies:
        if proxy._score > 50:
            pq.put_nowait(proxy)
            num += 1
    print('原始ip数:%s' % len(all_ip), '; 筛选后:%s' % num)
    return pq
pq = firstFilter()


async def genDouban(sem, session):
    # Getter function with semaphore.
    while True:
        async with sem:
            proxy = await pq.get()
            await douban(proxy, session)
            await pq.put(proxy)


async def dynamicRunDouban(concurrency):
    '''
    TCPConnector维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求
    ClientSession调用TCPConnector构造连接,Session可以共用
    Semaphore限制同时请求构造连接的数量,Semphore充足时,总时间与timeout差不多
    '''
    conn = aiohttp.TCPConnector(verify_ssl=False,
                                limit=concurrency,
                                use_dns_cache=True)
    tasks = []
    sem = asyncio.Semaphore(concurrency)

    async with aiohttp.ClientSession(loop=loop, connector=conn) as session:
        try:
            for i in range(concurrency):
                task = asyncio.ensure_future(genDouban(sem, session))
                tasks.append(task)

            responses = asyncio.gather(*tasks)
            await responses
        except KeyboardInterrupt:
            print('-----finishing-----\n')
            for task in tasks:
                task.cancel()
            if not conn.closed:
                conn.close()


future = asyncio.ensure_future(dynamicRunDouban(200))
loop.run_until_complete(future)
scores = [p.score for p in proxies]
scores.sort(reverse=True)
print('Most popular IPs:\n ------------\n', scores[:50],
      [i for i in scores if i > 100])
loop.is_closed()

其它方案概览:

其他资料

代码

from selenium import webdriver
import time
import aiohttp
from aiohttp.client_exceptions import ClientConnectionError
from aiohttp.client_exceptions import TimeoutError
import asyncio
from asyncio.queues import PriorityQueue
import chardet
import re
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

headers = {'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                          'AppleWebKit/537.36 (KHTML, like Gecko) ')}
loop = asyncio.get_event_loop()


class Proxy:
    def __init__(self, ip):
        self._url = 'http://' + ip
        self._score = 100

    @property
    def url(self):
        return self._url

    @property
    def score(self):
        return self._score

    def __lt__(self, other):
        '''
        由于优先队列是返回最小的,而这里分数高的代理优秀
        所以比较时反过来
        '''
        return self._score > other._score

    def success(self, time):
        self._score += int(10 / int(time + 1))

    def timeoutError(self):
        self._score -= 10

    def connectError(self):
        self._score -= 30

    def otherError(self):
        self._score -= 50


def getProxies():
    url = ("http://m.66ip.cn/mo.php?tqsl={proxy_number}")
    url = url.format(proxy_number=10000)
    html = requests.get(url, headers=headers).content
    html = html.decode(chardet.detect(html)['encoding'])
    pattern = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}'
    all_ip = re.findall(pattern, html)
    if len(all_ip) == 0:
        driver = webdriver.PhantomJS(
            executable_path=r'D:/phantomjs/bin/phantomjs.exe')
        driver.get(url)
        time.sleep(12)  # js等待5秒
        html = driver.page_source
        driver.quit()
        all_ip = re.findall(pattern, html)
    with open('66ip_' + str(time.time()), 'w', encoding='utf-8') as f:
        f.write(html)
    return all_ip


all_ip = set(getProxies()) | set(getProxies())
proxies = [Proxy(proxy) for proxy in all_ip]

mobike_url = "https://mwx.mobike.com/mobike-api/rent/nearbyBikesInfo.do"
data = {  # 请求参数: 纬度,经度!
    'latitude': '33.2',
    'longitude': '113.4',
}
headers = {
    'referer': "https://servicewechat.com/",
}


async def douban(proxy, session):

    try:
        start = time.time()
        async with session.post(mobike_url,
                                data=data,
                                proxy=proxy.url,
                                headers=headers,  # 可以引用到外部的headers
                                timeout=10) as resp:
            end = time.time()
            # print(resp.status)
            if resp.status == 200:
                proxy.success(end - start)
                print('%6.3d' % proxy._score, 'Used time-->', end - start, 's')
            else:
                proxy.otherError()
                print('*****', resp.status, '*****')
    except TimeoutError as te:
        print('%6.3d' % proxy._score, 'timeoutError')
        proxy.timeoutError()
    except ClientConnectionError as ce:
        print('%6.3d' % proxy._score, 'connectError')
        proxy.connectError()
    except Exception as e:
        print('%6.3d' % proxy._score, 'otherError->', e)
        proxy.otherError()
# ClientHttpProxyError

# TCPConnector维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求,500和100相差不大
# ClientSession调用TCPConnector构造连接,Session可以共用
# Semaphore限制同时请求构造连接的数量,Semphore充足时,总时间与timeout差不多


async def initDouban():

    conn = aiohttp.TCPConnector(verify_ssl=False,
                                limit=100,  # 连接池在windows下不能太大, <500
                                use_dns_cache=True)
    tasks = []
    async with aiohttp.ClientSession(loop=loop, connector=conn) as session:
        for p in proxies:
            task = asyncio.ensure_future(douban(p, session))
            tasks.append(task)

        responses = asyncio.gather(*tasks)
        await responses
    conn.close()


def firstFilter():
    for i in range(2):
        s = time.time()
        future = asyncio.ensure_future(initDouban())
        loop.run_until_complete(future)
        e = time.time()
        print('----- init time %s-----\n' % i, e - s, 's')

    num = 0
    pq = PriorityQueue()
    for proxy in proxies:
        if proxy._score > 50:
            pq.put_nowait(proxy)
            num += 1
    print('原始ip数:%s' % len(all_ip), '; 筛选后:%s' % num)
    return pq


pq = firstFilter()


async def genDouban(sem, session):
    # Getter function with semaphore.
    while True:
        async with sem:
            proxy = await pq.get()
            await douban(proxy, session)
            await pq.put(proxy)


async def dynamicRunDouban(concurrency):
    '''
    TCPConnector维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求
    ClientSession调用TCPConnector构造连接,Session可以共用
    Semaphore限制同时请求构造连接的数量,Semphore充足时,总时间与timeout差不多
    '''
    conn = aiohttp.TCPConnector(verify_ssl=False,
                                limit=concurrency,
                                use_dns_cache=True)
    tasks = []
    sem = asyncio.Semaphore(concurrency)

    async with aiohttp.ClientSession(loop=loop, connector=conn) as session:
        try:
            for i in range(concurrency):
                task = asyncio.ensure_future(genDouban(sem, session))
                tasks.append(task)

            responses = asyncio.gather(*tasks)
            await responses
        except KeyboardInterrupt:
            print('-----finishing-----\n')
            for task in tasks:
                task.cancel()
            if not conn.closed:
                conn.close()


future = asyncio.ensure_future(dynamicRunDouban(200))
loop.run_until_complete(future)


scores = [p.score for p in proxies]
scores.sort(reverse=True)
print('Most popular IPs:\n ------------\n', scores[:50],
      [i for i in scores if i > 100])
loop.is_closed()
async def baidu(proxy):
    '''
    验证是否可以访问百度
    '''
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get("http://baidu.com",
                               proxy='http://' + proxy,
                               timeout=5) as resp:
            text = await resp.text()
            if 'baidu.com' not in text:
                print(proxy,
                      '\n----\nis bad for baidu.com\n')
                return False
            return True
async def testProxy(proxy):
    '''
    http://aiohttp.readthedocs.io/en/stable/client_reference.html#aiohttp.ClientSession.request
    '''
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get("http://icanhazip.com",
                               proxy='http://' + proxy,
                               timeout=5) as resp:
            text = await resp.text()
            if len(text) > 20:
                return
            else:
                if await baidu(proxy):
                    firstFilteredProxies.append(proxy)
                    # print('原始:', proxy, '; 结果:', text)
async def httpbin(proxy):
    '''
    访问httpbin获取headers详情, 注意访问https 代理仍为http
    参考资料: https://imququ.com/post/x-forwarded-for-header-in-http.html
    http://www.cnblogs.com/wenthink/p/HTTTP_Proxy_TCP_Http_Headers_Check.html
    '''
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get("https://httpbin.org/get?show_env=1",
                               proxy='http://' + proxy,
                               timeout=4) as resp:
            json_ = await resp.json()
            origin_ip = json_['origin']
            proxy_ip = json_['headers']['X-Forwarded-For']
            via = json_['headers'].get('Via', None)
            print('原始IP:', origin_ip,
                  '; 代理IP:', proxy_ip,
                  '---Via:', via)
            if proxy_ip != my_ip and origin_ip == proxy_ip:
                annoy_proxies.append(proxy)
async def douban(proxy):

    async with aiohttp.ClientSession(loop=loop) as session:
        try:
            async with session.get(('https://api.douban.com/v2/movie/top250'
                                    '?count=10'),
                                   proxy='http://' + proxy,
                                   headers=headers,
                                   timeout=4) as resp:
                print(resp.status)
        except TimeoutError as te:
            print(proxy, te, 'timeoutError')
        except ClientProxyConnectionError as pce:
            print(proxy, pce, 'proxyError')
        except ClientConnectionError as ce:
            print(proxy, ce, 'connectError')
headers = {'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                          'AppleWebKit/537.36 (KHTML, like Gecko) ')}
while True:
    r = requests.get('http://douban.com', headers=headers)
    print(r.status_code)
    r = requests.get('https://movie.douban.com/j/search_subjects?'
                     'type=movie&tag=%E8%B1%86%E7%93%A3%E9%AB%9',
                     headers=headers)
    print(r.status_code)
上一篇 下一篇

猜你喜欢

热点阅读