基础知识

爬虫代理IP池的实现

2018-07-21  本文已影响79人  小温侯

代理池的思路

之前写过一篇关于代理的文档:爬虫和IP代理,里面介绍了一些代理的基本知识,后半部分我也见到那说了下如何用Python抓取免费的代理IP并检验其有效性。这篇文章就是在其基础上尝试完整实现了一个代理IP池:

这里还想补充一个关于unittest的库,因为之前写C代码,要写很多测试代码,而我一般命名测试函数为unittest,后来我发现Python里也有类似的函数,一开始我以为和C差不多,最近我才知道原来有一个叫unittest的测试框架,可以用来编写测试用例,实现自动化测试。不过这里我就不展开说了。

问题

代码

Configure.py

# proxy related
daxiang_proxy_tid = #order number
iplist_pool_size = 10
#iplist_thread_num = 1

# DB
db_name = 'proxy.db'

Sqlite3api.py

import sqlite3
import os
import Configure

def sqlite3_init():
    try:
        conn = sqlite3.connect(Configure.db_name)
    except Exception as e:
        print ('sqlite3 init fail.')
        print (e)

    return conn

def sqlite3_execute(conn, sql, args = None):
    data = None
    try:
        cur = conn.cursor()
        if args:
            cur.execute(sql, args)
        else:
            cur.execute(sql)
        data = cur.fetchall()
    except Exception as e:
        print (e, "[SQL]:" + sql.strip())
        conn.rollback()

    conn.commit()
    if data:
        return data
    return None

def sqlite3_close(conn):
    conn.close()

def unittest():
    conn = sqlite3_init()

    sqlite3_execute(conn, "CREATE TABLE stocks (date text, trans text, symbol text, qty real, price real)")

    sqlite3_execute(conn, "INSERT INTO stocks VALUES ('2006-01-05','BUY','RHAT',100,35.14)")
    sqlite3_execute(conn, "INSERT INTO stocks VALUES ('2006-03-28', 'BUY', 'IBM', 1000, 45.00)")
    sqlite3_execute(conn, "INSERT INTO stocks VALUES ('2006-04-05', 'BUY', 'MSFT', 1000, 72.00)")
    sqlite3_execute(conn, "INSERT INTO stocks VALUES ('2006-04-06', 'SELL', 'IBM', 500, 53.00)")

    assert 4 == sqlite3_execute(conn, "SELECT count(*) FROM stocks")[0][0]

    sqlite3_execute(conn, "DROP TABLE stocks")

    sqlite3_close(conn)

if __name__ == '__main__':
    unittest()

proxy.py

import requests
import ast
from random import choice
import os
import time

import Configure
import Sqlite3api as sqlite3

iplist = []

def proxy_init():
    conn = sqlite3.sqlite3_init()
    sqlite3.sqlite3_execute(conn, "CREATE TABLE proxy (t real, valid real, ip text)")
    sqlite3.sqlite3_close(conn)

# main entry, return an ip with certain protocols(either http or https)
def proxy_get_one_ip(protocol='all'): 
    global iplist

    if not iplist:
        __refresh(protocol)

    # pop() return and remove the last element in the list by default, amazing!
    # here I need op on the first element
    ip = iplist.pop(0)
    iplist.append(ip)

    return ip

def proxy_report_invalid_ip(ip, protocol=None):
    # sync write op
    # remove it from list
    iplist.remove(ip)

    # set false in db
    conn = sqlite3.sqlite3_init()
    sqlite3.sqlite3_execute(conn, "DELETE FROM proxy WHERE ip = '{0:s}'".format(ip))
    sqlite3.sqlite3_close(conn)

    return proxy_get_one_ip(protocol) if protocol else None


def proxy_fill_db(num = 20):
    samples = __get_ip_thr_api(num)
    cnt = 0

    conn = sqlite3.sqlite3_init()
        
    for sample in samples:
        ip = "{0:s}:{1:d}".format(sample.get('host'), sample.get('port'))
        if __validation(ip):
            sqlite3.sqlite3_execute(conn, "INSERT INTO proxy VALUES ({0:d},1,'{1:s}')".format(int(time.time()*1000), ip))
            #print ("Found one: {0:s}.".format(ip))
            cnt += 1

    sqlite3.sqlite3_close(conn)
    return cnt

# get ip through api from http://www.daxiangdaili.com
def __get_ip_thr_api(num=1, protocol='all'):
    tid = Configure.daxiang_proxy_tid

    url = "http://tvp.daxiangdaili.com/ip/?tid={0:d}&num={1:d}&delay=3&category=2&sortby=time&filter=on&format=json".format(tid, num)
    
    if protocol == 'https':
        url += "&protocol=https"
    
    try:
        response = requests.get(url)
        content = None

        if response.status_code == requests.codes.ok:
            content = response.text
            
    except Exception as e:
        print (e)

    return ast.literal_eval(content.strip())

# validate ip,addr format: [ip:port]
def __validation(addr):
    proxies = {
        "http": "http://{0}".format(addr),
        "https": "http://{0}".format(addr)
    }

    header = {}
    header['user-agent'] = choice(Configure.FakeUserAgents)

    try:
        response = requests.get("https://www.baidu.com", headers=header, proxies=proxies, timeout=5)
    except Exception as e:
        #print (e)
        return False
    else:
        if response.status_code == requests.codes.ok:
            return True

def __cal_sample_ratio(sampe_size, protocol):
    # collect sample to calculate validity ratio
    cnt, valid = sampe_size, 0
    addrs = __get_ip_thr_api(cnt, protocol)
    conn = sqlite3.sqlite3_init()

    for addr in addrs:
        ip = "{0:s}:{1:d}".format(addr.get('host'), addr.get('port'))
        if __validation(ip):
            valid += 1
            # Logically, here I should not add this ip to db nor iplist
            # because this func is for ratio calculation
            # However, it's not easy to get valid ip, I don't any waste here
            iplist.append(ip)
            sqlite3.sqlite3_execute(conn, "INSERT INTO proxy VALUES ({0:d},1,'{1:s}')".format(int(time.time()*1000), ip))
            
    sqlite3.sqlite3_close(conn)

    # calculate validity ratio
    ratio = float(valid/cnt)
    print ("ratio is {0}".format(ratio))
    return ratio

def __refresh(protocol='all'):
    print ("[Start updating pool]")

    global iplist

    # need this number of valid ips to fill the pool
    missing_ip_num = Configure.iplist_pool_size - len(iplist)

    # (1) get them from db
    #       normally the db should return enough valid ips

    conn = sqlite3.sqlite3_init()
    data = sqlite3.sqlite3_execute(conn, "SELECT ip, valid FROM proxy where valid = 1 order by t limit {0:d}".format(missing_ip_num))
    sqlite3.sqlite3_close(conn)

    for item in data:
        l = list(item)
        iplist.append(l[0])

    # check the results
    if len(iplist) >= Configure.iplist_pool_size:
        print ("Enough valid ips in pool. Refresh finished.")
        return
    else:
        missing_ip_num = Configure.iplist_pool_size - len(iplist)
        print ("Still need {0:d} ips after retrieving from db.".format(missing_ip_num))

    # (2) this means even used all valid ips in db, pool is still not full
    #       then get more valid ips from api or other sources
    # calculate ratio
    ratio = __cal_sample_ratio(10, protocol)
    
    # Since inside __cal_sample_ratio() would add new ips to db
    # I did one more time check here
    if len(iplist) >= Configure.iplist_pool_size:
        print ("Enough valid ips in pool. Refresh finished.")
        return

    # higher means more valid ips, but slower to process
    factor = 1

    # __sample_size__ means based on the validity ratio, this number of ips are needed
    # to filter enough valid ips, approximately
    if ratio == 0.00:
        # extremely bad sample
        sample_size = missing_ip_num * factor * 2 
    else:
        sample_size = int(1/ratio * missing_ip_num * factor)

    print ("Need to collect {0:d} ips for validation test".format(sample_size))

    samples = __get_ip_thr_api(sample_size, protocol)
    # TODO multi threads
    # if sample_size > a certain number, do multi threads
    # else:
    conn = sqlite3.sqlite3_init()
        
    for sample in samples:
        ip = "{0:s}:{1:d}".format(sample.get('host'), sample.get('port'))
        if __validation(ip):
            # sync write operation
            iplist.append(ip)
            sqlite3.sqlite3_execute(conn, "INSERT INTO proxy VALUES ({0:d},1,'{1:s}')".format(int(time.time()*1000), ip))
            print ("Found one: {0:s}.".format(ip))

            if len(iplist) >= Configure.iplist_pool_size:
                print (len(iplist))
                break

    sqlite3.sqlite3_close(conn)

    # Note here I didn't check the list length again.
    # so the list may be still not full because there are
    # not enough valid ips in the sample
    # Generally, this should not happen often, one should
    # call proxy_fill_db() enough times to make sure 
    # db has enough valid ips already for use
    print ("[pool updated.]")

def unittest():
    global iplist
    assert 1 == len(__get_ip_thr_api(1, 'https'))
    assert 1 == len(__get_ip_thr_api(1))

    proxy_init()

    conn = sqlite3.sqlite3_init()
    cnt = sqlite3.sqlite3_execute(conn, "SELECT count(*) FROM proxy")[0][0]
    res = proxy_fill_db(10)

    assert (cnt + res) == sqlite3.sqlite3_execute(conn, "SELECT count(*) FROM proxy")[0][0]

    if res > 0:
        ip = proxy_get_one_ip()

    cnt1 = sqlite3.sqlite3_execute(conn, "SELECT count(*) FROM proxy")[0][0]
    proxy_report_invalid_ip(ip)
    cnt2 = sqlite3.sqlite3_execute(conn, "SELECT count(*) FROM proxy")[0][0]

    assert 1 == (cnt1 - cnt2)
    assert 0 == sqlite3.sqlite3_execute(conn, "SELECT count(*) FROM proxy where ip = '{0:s}'".format(ip))[0][0]
    sqlite3.sqlite3_close(conn)

if __name__ == '__main__':
    unittest()
上一篇 下一篇

猜你喜欢

热点阅读