python操作kudu.md

2019-10-10  本文已影响0人  未然猜
# !/usr/bin/python
# -*- coding: utf-8 -*-

import os
import pandas as pd
import datetime
import time
import kudu
from kudu.client import Partitioning
import logging
from logging import handlers


def Logger(filename, level='info', when='D', backCount=3):
    level_relations = {
        'debug': logging.DEBUG,
        'info': logging.INFO,
        'warning': logging.WARNING,
        'error': logging.ERROR,
        'crit': logging.CRITICAL
    }  # 日志级别关系映射

    fmt = '%(asctime)s %(levelname)s [%(pathname)s:%(funcName)s:%(lineno)d] %(message)s'
    logger = logging.getLogger(filename)
    format_str = logging.Formatter(fmt)
    logger.setLevel(level_relations.get(level))
    sh = logging.StreamHandler()
    sh.setFormatter(format_str)
    th = handlers.TimedRotatingFileHandler(filename=filename, when=when, backupCount=backCount, encoding='utf-8')
    th.setFormatter(format_str)
    logger.addHandler(sh)
    logger.addHandler(th)
    return logger


log = Logger('all.log')


def read_data():
    """读取所有数据"""
    # 读取2019-08-01日数据
    df = pd.read_csv('/hadoop/disk1/data_result/frontlog-2019-08-01.txt', sep='\t', header=None, error_bad_lines=False)
    # 修改列名
    df.columns = ['dt', 'vl', 'nt', 'tm', 'amc', 're', 'nw', 'de', 'id', 'dd', 'fm', 'dip', '_id', 'mb', 'bm', 've',
                  'createdate', 'offsetx', 'sy', 'fn', 'createdatestr', 'dc', 'sid', 'ev', 'dpl', 'dmb', 'imei', 'sd',
                  'asx', 'did', 'loc', 'pid', 'cu', 'ip', 'wbn', 'rs', 'rt', 'pr', 'sp', 'spid', 'se', 'seid',
                  'part_dt']
    log.info('读取所有数据成功')
    return df


def get_data(df, initial_time=0):
    """获取当前时间段数据"""
    # 深拷贝
    lc = df.copy()
    # 替换分区值
    lc['part_dt'] = time.strftime('%Y-%m', time.localtime())
    # 替换为当天时间并转换类型
    lc['tm'] = pd.to_datetime(time.strftime('%Y-%m-%d ', time.localtime()) + lc['tm'].str[11:19],
                              format='%Y-%m-%d %H:%M:%S')
    # 初始化时间为当前时间前3秒
    initial_time = datetime.datetime.now() - datetime.timedelta(seconds=3) if not initial_time else initial_time
    # 获取当前时间
    now_time = datetime.datetime.now()
    if initial_time == now_time:
        log.info('当前时间与上次更新时间相同, 略过')
        return []
    # 获取该时间段数据
    data_timedelta = lc[(lc['tm'] >= initial_time) & (lc['tm'] < now_time)]
    log.info('获取当前时间数据 [%s]-[%s]' % (str(initial_time)[:19], str(now_time)[:19]))
    # 所有字段转换str
    data_timedelta = data_timedelta.applymap(str)
    return data_timedelta.to_dict('records')


def kudu_operation(data):
    """kudu操作"""
    try:
        if not data:
            return datetime.datetime.now()
        # kerberos认证
        os.system('kinit -kt /root/etluser.keytab etluser')
        # 获取kudu连接
        client = kudu.connect(host='172.16.163.216', port=7051)
        table = client.table('impala::data_market_tuomin.frontlog_test_demo')
        # 创建一个新会话用于操作表
        session = client.new_session()
        # 往表插入数据
        for item in data:
            op = table.new_insert(item)
            session.apply(op)
        # 刷新写入操作, 如果发生异常, 忽略
        try:
            session.flush()
        except kudu.KuduBadStatus as e:
            pass
        now_time = datetime.datetime.now()
        log.info('写入数据成功, 数据量: %s' % len(data))
        client.close()
    except Exception as e:
        log.error("kudu操作失败[ERROR: %s]" % e, exc_info=True)


if __name__ == '__main__':
    # 初始化时间
    initial_time = 0
    # 读取数据
    df = read_data()
    while True:
        # 获取数据
        data = get_data(df, initial_time)
        # 写入数据
        initial_time = kudu_operation(data)
上一篇 下一篇

猜你喜欢

热点阅读