项目1-抓取纳斯达克股票数据(网络爬虫)

2019-12-23  本文已影响0人  wangyu2488

2019年12月20日

一.基本思路

1.抓取数据

2..检测数据是否更新(md5实现)

3.分析数据

4.保存数据

5.爬虫工作计划任务

二.具体实现

1.检验更新

# coding=utf-8
"""项目实战:抓取纳斯达克股票数据"""
import urllib.request
import hashlib
from bs4 import BeautifulSoup
import os
url = 'https://www.nasdaq.com/symbol/aapl/historical#.UWdnJBDMhHk'

def validateUpdate(html):
    """验证数据是否更新,更新返回True,未更新返回False"""
    # 创建md5对象
    md5obj = hashlib.md5()
    md5obj.update(html.encode(encoding='utf-8'))
    md5code = md5obj.hexdigest()
    print(md5code)
    old_md5code = ''
    f_name = 'md5.txt'
    if os.path.exists(f_name):  # 如果文件存在读取文件内容
        with open(f_name, 'r', encoding='utf-8') as f:
            old_md5code = f.read()
    if md5code == old_md5code:
        print('数据没有更新')
        return False
    else:
        # 把新的md5码写入到文件中
        with open(f_name, 'w', encoding='utf-8') as f:
            f.write(md5code)
        print('数据更新')
        return True

req = urllib.request.Request(url)
with urllib.request.urlopen(req) as response:
    data = response.read()
    html = data.decode()
    sp = BeautifulSoup(html, 'html.parser')
    # 返回指定CSS选择器的div标签列表
    # div = sp.select('div#quotes_content_left_pnlAJAX')
    div = sp.select('div.historical-data__table-container')
    # 从列表中返回第一个元素
    divstring = div[0]
    if validateUpdate(divstring):  # 数据更新
        pass
        # TODO 分析数据
        # TODO 保存数据到数据库

image.png

2.分析数据

2.1花去抓取字段

image.png

html.js body.path-market-activity.with-header-ads.with-header-ads--loaded div.dialog-off-canvas-main-canvas div.page.with-primary-nav.with-sub-nav main.page__main div.page__content div.quote-subdetail__content div.layout.layout--2-col-large div.layout--main div.historical-data div.historical-data__data.loaded div.historical-data__table-container table.historical-data__table tbody.historical-data__table-body tr.historical-data__row

html.js body.path-market-activity.with-header-ads.with-header-ads--loaded div.dialog-off-canvas-main-canvas div.page.with-primary-nav.with-sub-nav main.page__main div.page__content div.quote-subdetail__content div.layout.layout--2-col-large div.layout--main div.historical-data div.historical-data__data.loaded div.historical-data__table-container table.historical-data__table tbody.historical-data__table-body tr.historical-data__row

该网站已经改成动态数据获取的,具体参考下面完整例子代码

5.爬虫工作计划任务

image.png

三完整例子代码

# coding=utf-8
"""项目实战:抓取纳斯达克股票数据"""
import datetime
import hashlib
import logging
import os
import threading
import time
import urllib.request
import json
from com.pkg1.db.db_access import insert_hisq_data
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(threadName)s - '
                           '%(name)s - %(funcName)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
url = 'https://api.nasdaq.com/api/quote/AAPL/historical?assetclass=stocks&fromdate=2019-01-01&limit=18&todate=2019-12-20'

def validateUpdate(html):
    """验证数据是否更新,更新返回True,未更新返回False"""
    # 创建md5对象
    md5obj = hashlib.md5()
    md5obj.update(html.encode(encoding='utf-8'))
    md5code = md5obj.hexdigest()
    old_md5code = ''
    f_name = 'md5.txt'
    if os.path.exists(f_name):  # 如果文件存在读取文件内容
        with open(f_name, 'r', encoding='utf-8') as f:
            old_md5code = f.read()
    if md5code == old_md5code:
        logger.info('数据没有更新')
        return False
    else:
        # 把新的md5码写入到文件中
        with open(f_name, 'w', encoding='utf-8') as f:
            f.write(md5code)
        logger.info('数据更新')
        return True

# 线程运行标志
isrunning = True
# 爬虫工作间隔
interval = 5

def controlthread_body():
    """控制线程体函数"""
    global interval, isrunning
    while isrunning:
        # 控制爬虫工作计划
        i = input('输入Bye终止爬虫,输入数字改变爬虫工作间隔,单位秒:')
        logger.info('控制输入{0}'.format(i))
        try:
            interval = int(i)
        except ValueError:
            if i.lower() == 'bye':
                isrunning = False

def istradtime():
    """判断交易时间"""
    # return False
    now = datetime.datetime.now()
    df = '%H%M%S'
    strnow = now.strftime(df)
    starttime = datetime.time(9, 30).strftime(df)
    endtime = datetime.time(15, 30).strftime(df)
    if now.weekday() == 5 \
            or now.weekday() == 6 \
            or (strnow < starttime or strnow > endtime):
        # 非工作时间
        return False
    # 工作时间
    return True

def validate_price(oriPrice):
    if oriPrice.find('$') >= 0:
        oriPrice = oriPrice.replace('$', '')
    return oriPrice

def workthread_body():
    """工作线程体函数"""
    global interval, isrunning
    while isrunning:
        if istradtime():
            # 交易时间内不工作
            logger.info('交易时间,爬虫休眠1小时...')
            time.sleep(60 * 60)
            continue
        logger.info('爬虫开始工作...')
        req = urllib.request.Request(url)
        with urllib.request.urlopen(req) as response:
            data = response.read()
            html = data.decode('gbk')
            print(html)
            py_dict = json.loads(html)
            divstring = html
            if validateUpdate(divstring):  # 数据更新
                # 分析数据
                trlist = py_dict['data']['tradesTable']['rows']
                data = []
                for tr in trlist:
                    rows = tr
                    fields = {}
                    try:
                        df = '%m/%d/%Y'
                        fields['Date'] = datetime.datetime.strptime(rows["date"], df)
                    except ValueError:
                        # 实时数据不分析(只有时间,如10:12)
                        continue
                    fields['Open'] = float(validate_price(rows["open"]))
                    fields['High'] = float(validate_price(rows["high"]))
                    fields['Low'] = float(validate_price(rows["low"]))
                    fields['Close'] = float(validate_price(rows["close"]))
                    fields['Volume'] = int(rows["volume"].replace(',', ''))
                    data.append(fields)
                # 保存数据到数据库
                for row in data:
                    row['Symbol'] = 'AAPL'
                    insert_hisq_data(row)
            # 爬虫休眠
            logger.info('爬虫休眠{0}秒...'.format(interval))
            time.sleep(interval)

def main():
    """主函数"""
    global interval, isrunning
    # 创建工作线程对象workthread
    workthread = threading.Thread(target=workthread_body, name='WorkThread')
    # 启动线程workthread
    workthread.start()
    # 创建控制线程对象controlthread
    controlthread = threading.Thread(target=controlthread_body, name='ControlThread')
    # 启动线程controlthread
    controlthread.start()

if __name__ == '__main__':
    main()

image.png image.png

如果您发现本文对你有所帮助,如果您认为其他人也可能受益,请把它分享出去。

上一篇下一篇

猜你喜欢

热点阅读