找出连续21天值的时间段

2019-12-16  本文已影响0人  barriers

1找连续时间

import pandas as pd
import datetime
from sqlalchemy import create_engine

def read_sql():
    engine = create_engine("postgresql+psycopg2://glzt:123456@127.0.0.1:5432/spider", max_overflow=5, encoding='utf-8')
    sql = """select count(grid_id) as grid_id, to_char(published_at, 'YYYY-MM-DD HH24:MI:SS') as published_at
from grid_air_quality group by published_at order by published_at;"""
    data = pd.read_sql(sql, engine)
    data['published_at'] = pd.to_datetime(data['published_at'], format='%Y-%m-%d %H:%M:%S')
    return data


def get_continus_data():
    # 设置数据读取方式(此处从数据库中读取,也可以设置为从csv中读取)
    data = read_sql()
    # 排序
    data.sort_values(by="published_at", inplace=True)
    # 重新设置索引
    data.reset_index(drop=True, inplace=True)
    print(data)
    list_continuous = []
    # 获取连续的21天的评级(503个小时(即21天乘24小时减一))
    for index, value in enumerate(data['published_at'][: -503]):
        try:
            delta = data.iloc[index+503]['published_at'] - data.iloc[index]['published_at']
            print(delta)
            if delta.days*24 + delta.seconds//3600 == 503:
                list_continuous.append([data.iloc[index]['published_at'], data.iloc[index+503]['published_at']])
            else:
                print(data.iloc[index]['published_at'])
        except IndexError:
            print(index)
            pass
    list_continuous = pd.DataFrame(data=list_continuous, columns=['start', 'end'])
    list_continuous.to_csv('./train.csv', index=False)
    print(list_continuous)

2找出本地可用的字体

from matplotlib.font_manager import FontManager
import subprocess
fm = FontManager()
mat_fonts = set(f.name for f in fm.ttflist)
output = subprocess.check_output(
'fc-list :lang=zh -f "%{family}\n"', shell=True)
output = output.decode('utf-8')
zh_fonts = set(f.split(',', 1)[0] for f in output.split('\n'))
available = mat_fonts & zh_fonts
print('*' * 10, '可用的字体', '*' * 10)
for f in available:
    print(f)

3redis发布订阅消息

import json
import time
import datetime
import redis

class DbRedis:
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, '_instance'):
           cls._instance = super(DbRedis, cls).__new__(cls, *args, **kwargs)
        return cls._instance

    # 提供数据库配置
    def __init__(self):
        self.redis_link = {
            'host': '192.168.1.115',
            'port': 6379,
            'password': 'glztredis',
            'decode_responses': True,
            # 'db': 2
        }
        self.pool = redis.ConnectionPool(**self.redis_link) # 创建redis连接池
        self.corsur = redis.Redis(connection_pool=self.pool) # 从连接池中取一个连接

def publish(self, message):
    self.corsur.publish('dynamic_feature_published', json.dumps(message))
    print('发布成功')
    return True


# corsur = redis.StrictRedis(host='127.0.0.1', port=6379, db=2)
db = DbRedis()
# corsur = db.corsur
# keys = corsur.keys()
# print(keys)


loss_time = ['2019-11-08 13:00:00---2019-11-08 13:00:00',
 '2019-11-08 18:00:00---2019-11-08 19:00:00',]

for published_ats in loss_time:
    start, end = published_ats.split('---')
    start = datetime.datetime.strptime(start, '%Y-%m-%d %H:%M:%S')
    end = datetime.datetime.strptime(end, '%Y-%m-%d %H:%M:%S')
    hours = (end - start).days*24 + (end - start).seconds//3600
    for hour in range(hours+1):
        loss_current = start + datetime.timedelta(hours=hour)
        print(loss_current)
        db.publish({"published": str(loss_current)})
        time.sleep(1)

#两个for循环加if判断用pandas写法,效率提高多倍
points['id'] = points.apply(lambda x: grids.loc[(grids['bottom_lat'] <= x['lat']) & (grids['top_lat'] >= x['lat']) & (grids['bottom_lng'] <= x['lng']) & (grids['top_lng'] >= x['lng']), 'id'].to_numpy()[0], axis=1)
上一篇 下一篇

猜你喜欢

热点阅读