不务正业

微博关键词监控器

2020-04-02  本文已影响0人  要薯条不要辣翅

微博关键词监控器

2020年4月2日阴雨连绵的下午,被疫情所困无法返校开学的我百无聊赖,突然想起之前抖音上刷到一个霓虹国程序员做了一个分手灯,就是一个监控社交网络,有人说分手就会亮一下的灯,我感觉十分有趣,于是决定自己也做一个。但是用计算机控制灯比较麻烦,比较之下我选择了较为方便的手机短信,于是一个沙雕脚本诞生了。

这个沙雕脚本使用爬虫技术监控微博的动态,当搜索关键词有新结果时便会以短信的方式予以通知,而且可以自行定义通知间隔,可以说是非常人(没)性(有)化(用)。

下面我将分模块介绍一下这个只有一百余行的python脚本。

短信模块

短信我们使用https://www.twilio.com/提供的服务,他们家是有免费试用的,有钱的老爷们也可以使用其他供应商的服务,应该用起来都类似。首先在网站注册账号,然后绑定手机,将密钥信息填进API里就可以调用起来了,十分简洁方便。

# 配置
auth_token = '**********************'
account_sid = '***********************'
client = Client(account_sid, auth_token)

# 然后调用client.messages.create就可以了,下边有写成一个工具函数

主程序部分

import copy # 深拷贝
import time # 计时器
import requests # 众所周知这是网络通讯库,爬虫都爱他
import re # 正则表达式
import json # 这个包熟悉网络的同学应该也很熟悉,很多的通讯都采用了json的形式(虽然现在在逐步被淘汰),这个包便是构造和处理json的
from requests import get # http GET方法
from twilio.rest import Client # 发短信用的

def main():
    """
    开启任务循环
    :return: None
    """
    last_pool = [[], []] # 由于微博的时间记录不是精确性的,而是“刚刚”、“1分钟前”这种,这个列表用来记录已经通知过的微博id,配合时间戳可以区分出新微博是否需要被通知
    while True:
        print("\nnew detect launched")
        new_pool = renew(last_pool) # 主方法,刷新一次,通知新微博,并返回新微博id列表
        last_pool = last_pool[1:] # 弹出时间比较久的微博,这部分已经可以用时间戳予以区分
        last_pool.append(new_pool) # 将新的列表添加进已通知池
        time.sleep(30) # 暂停30s


if __name__ == '__main__':
    kw = input("Please input the keyword you wanna overwatch: ")
    headers = {
        'user-agent':
            'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.57.2 (KHTML, like Gecko) Version/5.1.7 Safari/534.57.2'
    }
    url = 'https://m.weibo.cn/api/container/getIndex' # m.weibo.cn是手机浏览器版本的微博,从这个来源比较容易爬,这个URL是搜索的接口
    main() # 执行主函数

构建待访问池

def get_page():
    # 先用requests构造请求,解析出关键词搜索出来的微博总页数
    data_list = [] # @return,每次连接的具体参数
    data = {
        'containerid': '100103type=61&q={}'.format(kw), # 排序方式和搜索关键词
        'page_type': 'searchall'} # 搜索方式
    resp = requests.get(url=url, headers=headers, params=data)
    total_page = resp.json()['data']['cardlistInfo']['total']  # 微博总数
    # 一页有10条微博,用总数对10整除,余数为0则页码为总数/10,余数不为0则页码为(总数/10)+1
    if total_page % 10 == 0:
        page_num = int(total_page / 10)
    else:
        page_num = int(total_page / 10) + 1
    # 页码为1,data为当前data,页码不为1,通过for循环构建每一页的data参数
    if page_num == 1:
        data_list.append(data)
        return data_list
    else:
        for i in range(1, page_num + 1):
            data['page'] = i
            data_list.append(copy.deepcopy(data))
        return data_list

工具函数

def check_time(id, last_pool, time): # 检查是否被通知过
    if time != "刚刚": # 我们只通知最近发出的微博(30s间隔刷新),所以所有不是“刚刚”的微博都不需要通知
        return False
    for pool in last_pool: # 已经通知过的不用通知
        if id in pool:
            return False
    return True


def sent_message(text, phone_number):
    mes = client.messages.create(
        from_='*********',  # 填写在active number处获得的号码
        body=text,
        to=phone_number
    )
    print("msg sent: {}".format(mes.sid))

更新

def renew(last_pool):
    data_list = get_page()  # 接收data参数列表
    pool = [] # 本次连接的已访问池
    for data in data_list:
        with get(url=url, headers=headers, params=data) as resp:  # 携带参数发送请求
            text = resp.text
            text_dict = json.loads(text)['data']['cards'] # 微博页面上的数据是以卡片的形式组织的
            parse_dict = {} # 解析出的数据放在这个字典里边
            for card in text_dict:
                if card['card_type'] == 9: 
                  # 只有这个类型的卡片是记录微博数据的,其他的设计渲染、布局等不用关心的东西
                    scheme = card['scheme']
                    # 卡片中提取信息
                    if card['mblog']['isLongText'] is False:
                        text = card['mblog']['text']
                        text = re.sub(r'<.*?>|\n+', '', text)
                    else:
                        text = card['mblog']['longText']['longTextContent']
                    user = card['mblog']['user']['profile_url']
                    id = card['mblog']['id']
                    time = card['mblog']['created_at']
                    # 检查此条微博是否已经被通知
                    if not check_time(id, last_pool, time) or id in pool:
                        text = "{} new wblog published recently\n".format(len(pool))
                        print(text)
                        if len(pool) > 0:
                            sent_message(text, "+86***********") # 调用发信息方法
                        return pool
                    parse_dict['url'] = scheme
                    parse_dict['text'] = text
                    parse_dict['author'] = user
                    parse_dict['time'] = time
                    pool.append(id)
            # print('new weibo detected:\n'
            #       'author home page: {}\n'
            #       'content: {}\n'
            #       'time: {}\n'.format(parse_dict['author'], parse_dict['text'], parse_dict['time']))
    text = "{} new wblog published recently\n".format(len(pool))
    print(text)
    if len(pool) > 0:
        sent_message(text, "+86***********")
    return pool

完整代码

import copy
import time
import requests
import re
import json
from requests import get
from twilio.rest import Client

auth_token = '**************************'
account_sid = '************************'
client = Client(account_sid, auth_token)


def get_page():
    """
    先用requests构造请求,解析出关键词搜索出来的微博总页数
    :return: 返回每次请求需要的data参数
    """
    data_list = []
    data = {
        'containerid': '100103type=61&q={}'.format(kw),
        'page_type': 'searchall'}
    resp = requests.get(url=url, headers=headers, params=data)
    total_page = resp.json()['data']['cardlistInfo']['total']  # 微博总数
    # 一页有10条微博,用总数对10整除,余数为0则页码为总数/10,余数不为0则页码为(总数/10)+1
    if total_page % 10 == 0:
        page_num = int(total_page / 10)
    else:
        page_num = int(total_page / 10) + 1
    # 页码为1,data为当前data,页码不为1,通过for循环构建每一页的data参数
    if page_num == 1:
        data_list.append(data)
        return data_list
    else:
        for i in range(1, page_num + 1):
            data['page'] = i
            data_list.append(copy.deepcopy(data))
        return data_list


def check_time(id, last_pool, time):
    if time != "刚刚":
        return False
    for pool in last_pool:
        if id in pool:
            return False
    return True


def sent_message(text, phone_number):
    mes = client.messages.create(
        from_='+**********',  # 填写在active number处获得的号码
        body=text,
        to=phone_number
    )
    print("msg sent: {}".format(mes.sid))


def renew(last_pool):
    data_list = get_page()  # 接收data参数列表
    pool = []
    for data in data_list:
        with get(url=url, headers=headers, params=data) as resp:  # 携带参数发送请求
            text = resp.text  # await 等待知道获取完整数据
            text_dict = json.loads(text)['data']['cards']
            parse_dict = {}
            for card in text_dict:
                if card['card_type'] == 9:
                    scheme = card['scheme']
                    if card['mblog']['isLongText'] is False:
                        text = card['mblog']['text']
                        text = re.sub(r'<.*?>|\n+', '', text)
                    else:
                        text = card['mblog']['longText']['longTextContent']
                    user = card['mblog']['user']['profile_url']
                    id = card['mblog']['id']
                    time = card['mblog']['created_at']
                    if not check_time(id, last_pool, time) or id in pool:
                        text = "{} new wblog published recently\n".format(len(pool))
                        print(text)
                        if len(pool) > 0:
                            sent_message(text, "+86*********")
                        return pool
                    parse_dict['url'] = scheme
                    parse_dict['text'] = text
                    parse_dict['author'] = user
                    parse_dict['time'] = time
                    pool.append(id)
            # print('new weibo detected:\n'
            #       'author home page: {}\n'
            #       'content: {}\n'
            #       'time: {}\n'.format(parse_dict['author'], parse_dict['text'], parse_dict['time']))
    text = "{} new wblog published recently\n".format(len(pool))
    print(text)
    if len(pool) > 0:
        sent_message(text, "+86***********")
    return pool


def main():
    """
    开启任务循环
    :return: None
    """
    last_pool = [[], []]
    while True:
        print("\nnew detect launched")
        new_pool = renew(last_pool)
        last_pool = last_pool[1:]
        last_pool.append(new_pool)
        time.sleep(30)


if __name__ == '__main__':
    kw = input("Please input the keyword you wanna overwatch: ")
    headers = {
        'user-agent':
            'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.57.2 (KHTML, like Gecko) Version/5.1.7 Safari/534.57.2'
    }
    url = 'https://m.weibo.cn/api/container/getIndex'
    main()
上一篇下一篇

猜你喜欢

热点阅读