爬虫专题大数据 爬虫Python AI Sql

Linux下使用Grafana+InfluxDB+MongoDB

2018-11-12  本文已影响208人  dex0423

1.前言

设计原理:爬虫将抓取的数据写入MongoDB,InfluxDB从MongoDB获取数据抓取情况,Grafana 从 InfluxDB 中获取爬虫抓取数据情况并做图形化展示。
系统环境:MacOS High Sierra 10.12.6

2.Grafana介绍

3.InfluxDB介绍

4.安装&配置Grafana、InfluxDB

Grafana 和 InfluxDB 安装非常方便,这一点可以和 Graphite 做一个鲜明的对比。

4.1.安装配置 InfluxDB

brew update
brew install influxdb
vi /usr/local/etc/influxdb.conf 

修改 & 添加 内容如下:

#  此处【 修改 】如下内容:
[data]
  # The directory where the TSM storage engine stores TSM files.
  dir = "/usr/local/var/influxdb/data"     #  将存储 TSM 文件的路径,修改成自己的目录。

  # The directory where the TSM storage engine stores WAL files.
  wal-dir = "/usr/local/var/influxdb/wal"  # 将存储 WAL 文件的路径,修改成自己的目录。


# 此处【 添加 】如下内容:                     # 【 注意 】:原文件中没有此内容,需自己添加。
[admin]
# 设定 admin 管理界面的 host 和 port 
bind-address='127.0.0.1:8083'

# 此处【 修改 】如下内容:
[http]
  # The bind address used by the HTTP service.
  # 修改 API 的 host 和 port
  bind-address = ":8086"

4.2.安装配置 Grafana

brew update
brew install grafana
vi /usr/local/etc/grafana/grafana.ini

修改 内容如下:

[server]
# Protocol (http, https, socket)
;protocol = http

# The ip address to bind to, empty will bind to all interfaces
;http_addr =

# 此处修改端口号
# The http port  to use
;http_port = 3000         # 【 注意 】这里我用默认 3000 端口,可以根据需要修改。

# 此处修改界面访问地址
# The public facing domain name used to access grafana from a browser
;domain = localhost       # 【 注意 】这里我用默认 localhost 地址,可以根据需要修改。

# Redirect to correct domain if host header does not match domain
# Prevents DNS rebinding attacks
;enforce_domain = false

# The full public facing url you use in browser, used for redirects and emails
# If you use reverse proxy and sub path specify full url (with sub path)
;root_url = http://localhost:3000

5.编写爬虫代码

这里我使用的是以前写的一个爬取豆瓣电影的的爬虫代码。

注意:
-- 爬虫使用的 MongoDB 的 database 名称为 learn_selenium_doubandianying;
-- 爬虫使用的 MongoDB 的 table 名称为 movie_info;

爬虫代码如下:

# -*- coding:utf-8 -*-
from selenium import webdriver
import time
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC 
import re
from selenium.common.exceptions import TimeoutException
from config import *
from lxml import etree
import pymongo
import datetime


# 设置MongoDB数据库
MONGO_URL   = 'localhost'
# 设置 MongoDB 的 database 名称为 learn_selenium_doubandianying
MONGO_DB    = 'learn_selenium_doubandianying'
# 设置 MongoDB 的 table 名称为 movie_info
MONGO_TABLE = 'movie_info'

client = pymongo.MongoClient(MONGO_URL)
db     = client[MONGO_DB]

browser = webdriver.Chrome()
wait    = WebDriverWait(browser,10)

browser.get('https://movie.douban.com/')
word = input('请输入您要搜索的内容>>> ')

def search():
    try:
        input = wait.until(
                EC.presence_of_element_located((By.CSS_SELECTOR,'#inp-query'))
            )
        submit = wait.until(
                EC.element_to_be_clickable((By.CSS_SELECTOR,'#db-nav-movie > div.nav-wrap > div > div.nav-search > form > fieldset > div.inp-btn > input[type="submit"]'))
            )
        print('输入搜索的内容【{}】'.format(word))
        input.send_keys('{}'.format(word))
        submit.click()
        print('正在加载')
        active = wait.until(
                EC.presence_of_element_located((By.CSS_SELECTOR,'a.num.activate.thispage'))
            )
        print('加载第【{}】页成功'.format(active.text))
        get_movies()

    except TimeoutException:
        print('等待超时,重新搜索...')
        return search()

def next_page():
    while True:
        try:
            next_page_submit = wait.until(
                    EC.element_to_be_clickable((By.CSS_SELECTOR,'a.next'))
                )
            next_page_submit.click()
            wait.until(
                    EC.presence_of_element_located((By.CSS_SELECTOR,'a.num.activate.thispage'))
                )
            print('成功加载该页数据...')
            get_movies()
            print('--------------加载完成,并打印成功,开始加载下一页------------')
            time.sleep(1.1)
            next_page()

        except TimeoutException:
            print('加载超时,重新加载...')
            return next_page()

        except Exception:
            print('加载至最后一页')
            break

def get_movies():
    try:
        page = browser.page_source
        selector = etree.HTML(page)
        items = selector.xpath('//*[@id="root"]/div/div[2]/div[1]/div[1]')
        for item in items:
            names = item.xpath('div/div/div/div[1]/a/text()')
            urls = item.xpath('div/div/div/div[1]/a/@href')
            ratings = item.xpath('div/div/div/div[2]/span[2]/text()')
        
            # 【注意】使用re.find()时,参数需要使用 r'\xx' 格式
            # 【注意】item.xpath()返回的是列表,需要使用str将其字符化
            durations = re.findall(r'\d\d+',str(item.xpath('div/div/div/div[3]/text()')))
            actors = item.xpath('div/div/div/div[4]/text()')
            
            # 【注意】由于xpath返回的是列表格式,而我们需要将列表中的元素一一对应存放至字典中,这就需要使用zip()函数,将内容存放至空字典中
            for name,url,rating,duration,actor in zip(names,urls,ratings,durations,actors):                         
                movie_info = {}
                movie_info['name'] = name
                movie_info['url'] = url
                if rating == '(尚未上映)' or '(暂无评分)':
                    movie_info['rating'] = None
                else:
                    movie_info['rating'] = float(rating)
                movie_info['duration'] = int(duration)
                movie_info['actors'] = actor

                print(movie_info)
                save_to_mongo(movie_info)

    except Exception as e:
        print(e)
        time.sleep(0.3)
        return get_movies()

def save_to_mongo(result):
    try:
        if db[MONGO_TABLE].insert_one(result):
            print('成功存储到MONGODB!')
    except Exception as e:
        raise e

def main():
    start_time = datetime.datetime.now()
    try:
        search()
        next_page()
    except Exception as e:
        raise e
    finally:
        browser.close()
        end_time = datetime.datetime.now()
        print('*'*100)
        print('开始时间:',start_time)
        print('结束时间:',end_time)
        print('共计用时:',end_time - start_time)
        total_nums = db[MONGO_TABLE].count()
        print('共计获取数据:',total_nums,' 条')
        print('*'*100)

if __name__ == '__main__':
    main()

6.编写监控脚本

考虑到可能要增加爬虫到监控中,因此这里使用了热更新对监控进行动态配置

6.1.监控脚本 influx_monitor.py

import ast
import time
import pymongo
import traceback
from configparser import ConfigParser
from influxdb import InfluxDBClient
from datetime import datetime
from os.path import getmtime

# 配置 influxdb
client = InfluxDBClient(host='localhost', port=8086) # influxdb默认端口为8086

# 创建 database
client.create_database('Spider')
# switch 到 database
client.switch_database('Spider')

# 设定配置文件
config_name = 'influx_settings.conf'

WATCHED_FILES = [config_name]
WATCHED_FILES_MTIMES = [(f, getmtime(f)) for f in WATCHED_FILES]  

_count_dict = {}
_size_dict = {}

# 获取配置文件中的设置
def parse_config(file_name):

    try:
        # 创建一个配置文件对象
        cf = ConfigParser() 
 
        # 打开配置文件
        cf.read(file_name)

        # 获取配置文件中的统计频率
        interval = cf.getint('time', 'interval')
        
        # 获取配置文件中要监控的 dbs 和 collection
        dbs_and_collections = ast.literal_eval(cf.get('db', 'db_collection_dict'))
        
        return interval, dbs_and_collections

    except:
        print(traceback.print_exc())
        return None


# 从 MongoDB 获取数据,并写入 InfluxDB
def insert_data(dbs_and_collections):

    # 连接 MongoDB 数据库
    mongodb_client = pymongo.MongoClient(host='127.0.0.1',port=27017)  # 直接使用默认地址端口连接 MongoDB

    for db_name, collection_name in dbs_and_collections.items():
        
        # 数据库操作,创建 collection 集合对象
        db = mongodb_client[db_name]
        collection = db[collection_name]
        
        # 获取 collection 集合大小
        collection_size = round(float(db.command("collstats", collection_name).get('size')) / 1024 / 1024, 2)
        
        # 获取 collection 集合内数据条数
        current_count = collection.count()

        # 初始化数据条数,当程序刚执行时,条数初始量就设置为第一次执行时获取的数据
        init_count = _count_dict.get(collection_name, current_count)
        # 初始化数据大小,当程序刚执行时,大小初始量就设置为第一次执行时获取的数据大小
        init_size = _size_dict.get(collection_name, collection_size)

        # 得到数据条数增长量
        increase_amount = current_count - init_count        
        # 得到数据大小增长量
        increase_collection_size = collection_size - init_size

        # 得到当前时间
        current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')

        # 赋值
        _count_dict[collection_name] = current_count
        _size_dict[collection_name] = collection_size

        # 构建
        json_body = [
            {
                "measurement": "crawler",
                "time": current_time,
                "tags": {
                    "spider_name": collection_name
                },
                "fields": {
                    "count": current_count,
                    "increase_count": increase_amount,
                    "size": collection_size,
                    "increase_size": increase_collection_size

                }
            }
        ]
        # 将获取
        if client.write_points(json_body):
            print('成功写入influxdb!',json_body)


def main():
    # 获取配置文件中的监控频率和MongoDB数据库设置
    interval, dbs_and_collexctions = parse_config(config_name)

    # 如果配置有问题则报错
    if (interval or dbs_and_collexctions) is None:
        raise ValueError('配置有问题,请打开配置文件重新设置!')

    print('设置监控频率:', interval)
    print('设置要监控的MongoDB数据库和集合:', dbs_and_collexctions)

    last_interval = interval
    last_dbs_and_collexctions = dbs_and_collexctions

    # 这里实现配置文件热更新
    for f, mtime in WATCHED_FILES_MTIMES:
        while True:
            # 检查配置更新情况,如果文件有被修改,则重新获取配置内容
            if getmtime(f) != mtime:
                # 获取配置信息
                interval, dbs_and_collections = parse_config(config_name)
                print('提示:配置文件于 %s 更新!' % (time.strftime("%Y-%m-%d %H:%M:%S")))
                
                # 如果配置有问题,则使用上一次的配置
                if (interval or dbs_and_collexctions) is None:
                    interval = last_interval
                    dbs_and_collexctions = last_dbs_and_collexctions

                else:
                    print('使用新配置!')
                    print('新配置内容:', interval, dbs_and_collexctions)
                    mtime = getmtime(f)

            # 写入 influxdb 数据库
            insert_data(dbs_and_collexctions)

            # 使用 sleep 设置每次写入的时间间隔
            time.sleep(interval)

if __name__ == '__main__':
    main()

6.2.配置文件 influx_settings.conf

配置文件主要用于热更新相关设置

# [需要监控的 MongoDB 数据的 数据库名 和 集合名]
[db]
db_collection_dict = {
    'learn_selenium_doubandianying': 'movie_info',
    }

# [设置循环间隔时间]
[time]
interval = 8

7.配置 Grafana

7.1. 运行 influxDB

python3 influx_monitor.py 

运行,得到下图内容,表示监控脚本运行成功。

监控脚本正常运行
新建一个 terminal 窗口,使用 vi 命令修改配置文件 influx_settings.conf
vi influx_settings.conf

修改间隔时间为8秒,并保存退出。
如下图:


修改间隔时间为8秒

这时运行 influxDB 的窗口,提示配置更新,说明配置热更新可用。
如下图:


红色方可内提示配置更新

7.2. 运行爬虫文件

启动 MongoDB 数据库服务。

brew services mongodb start

新建一个 terminal 窗口,运行爬虫文件。

爬虫文件运行成功

7.3. Grafana web窗口设置

在红框内输入influxDB数据库名称

"measurement": "crawler",
"fields": {
"count": current_count,
"increase_count": increase_amount,
"size": collection_size,
"increase_size": increase_collection_size
}

上一篇下一篇

猜你喜欢

热点阅读