python解压zip包、解析xml、读文件写mysql的实现

2021-09-23  本文已影响0人  沉思的雨季

一、前言:

如图所示,工作中需要对大量zip包数据进行分析,数据内容以制表符分列逐行写入bcp文件中,列属性名和数据类型名在xml文件中描述,通过python程序将zip包中的数据写入MySQL数据库,实现大批量数据快速分析的需求。

二、实现过程

1、读取目录及子目录中所有的zip文件

def find_zip(zip_path):
    zip_list = []
    for root, dirs, files in os.walk(zip_path):  # 递归取出目录及子目录下的所有文件
        for f in files:
            if f.endswith(".zip"):
                print(f)
                zip_list.append(os.path.join(root, f))

    for file in zip_list:
        unzip_dir = un_zip(file)
        deal_file(unzip_dir)
        shutil.rmtree(unzip_dir)        # 删除文件夹下所有的文件、文件夹

2、解压缩zip文件到同名目录

def un_zip(file_name):
    zip_file = zipfile.ZipFile(file_name)
    if os.path.isdir(file_name.split(".")[0]):       #创建同名目录
        pass
    else:
        os.mkdir(file_name.split(".")[0])
    for names in zip_file.namelist():             #zip文件解压缩
        zip_file.extract(names, file_name.split(".")[0])
    zip_file.close()
    return file_name.split(".")[0]

3、解析xml文件,获取数据类型名和数据属性名列表

def deal_xml(xml_file):
    # 打开xml文档
    dom = minidom.parse(xml_file)
    # 得到文档元素对象
    root = dom.documentElement
    # 根据标签名获取元素节点,是个列表
    dataset = root.getElementsByTagName('DATASET')
    # 根据标签名获取指定元素节点下属子节点
    item_list = dataset[3].getElementsByTagName('ITEM')
    head_list = []
    for item in item_list:      #获取数据列属性名称
        value = item.getAttribute("eng").replace(' ', '').replace('/', '_')
        head_list.append(value)
    namelist = dataset[1].getElementsByTagName('ITEM')      #获取数据类型名称
    table_name = namelist[2].getAttribute("val")
    create_sql = """CREATE TABLE IF NOT EXISTS """ + table_name + " (" + " text,".join(head_list) + " text) "   #拼接SQL语句
    return head_list, table_name, create_sql

4、创建MySQL连接,新建数据表,并以块方式写入数据

def write_mysql(header_name, table_name, create_sql, file):
    con = pymysql.connect(host=mysql_host, port=mysql_port, user=mysql_user, password=mysql_passwd, database=mysql_db)
    cur = con.cursor()
    try:
        cur.execute(create_sql)
        print("create table " + table_name + " successfully!")
    except Exception as e:
        print("create table " + table_name + " fail! because of :", e)
    finally:
        con.close()

    # 读取文件时,默认第一行为标题,设置header=None,表示没有标题,写表头name=['a','b','c']设置列标题
    data = pd.read_table(file, header=None, names=header_name)
    # 连接MySql数据库, //后的参数为: 用户名, 密码, 主机, 数据库名
    engine = create_engine(
        "mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset={5}".format(mysql_user, mysql_passwd, mysql_host, mysql_port,
                                                                 mysql_db, mysql_charset))
    # 使用to_sql()方法插入数据,  参数1: 表的名字,建议不要用中文, con: 之前创建的连接对象,if_exists:"replace"表名存在, 则删掉重建,"append"表名存在, 则会追加数据
    try:
        data.to_sql(table_name, con=engine, index=False, if_exists="append")
        print("insert data to " + table_name + " successfully!")
    except Exception as error:
        print("insert data to table " + table_name + " fail! because of :", error)

三、实例代码

在代码实例中引入公共python模块,定义MySQL连接参数,实现命令行参数化。

#!/usr/bin/env python3
# -*- coding:utf8 -*-

import os
import sys
import glob
import zipfile
import shutil
import argparse
import pymysql
import pandas as pd
from xml.dom import minidom
from sqlalchemy import create_engine


def find_zip(zip_path):
    zip_list = []
    for root, dirs, files in os.walk(zip_path):  # 递归取出目录及子目录下的所有文件
        for f in files:
            if f.endswith(".zip"):
                print(f)
                zip_list.append(os.path.join(root, f))

    for file in zip_list:
        unzip_dir = un_zip(file)
        deal_file(unzip_dir)
        shutil.rmtree(unzip_dir)        # 删除文件夹下所有的文件、文件夹

def un_zip(file_name):
    zip_file = zipfile.ZipFile(file_name)

    if os.path.isdir(file_name.split(".")[0]):
        pass
    else:
        os.mkdir(file_name.split(".")[0])
    for names in zip_file.namelist():
        zip_file.extract(names, file_name.split(".")[0])
    zip_file.close()
    return file_name.split(".")[0]

def deal_file(file_dir):
    xml_file = glob.glob(os.path.join(file_dir, "*.xml"))
    xml_result = deal_xml(xml_file[0]
    header_name = xml_result[0]
    table_name = xml_result[1]
    create_sql = xml_result[2]
    bcp_file = glob.glob(os.path.join(file_dir, "*.bcp"))
    file = bcp_file[0]
    write_mysql(header_name, table_name, create_sql, file)

def deal_xml(xml_file):
    # 打开xml文档
    dom = minidom.parse(xml_file)
    # 得到文档元素对象
    root = dom.documentElement
    # 根据标签名获取元素节点,是个列表
    dataset = root.getElementsByTagName('DATASET')
    # 根据标签名获取指定元素节点下属子节点
    item_list = dataset[3].getElementsByTagName('ITEM')
    head_list = []
    for item in item_list:      #获取数据列属性名称
        value = item.getAttribute("eng").replace(' ', '').replace('/', '_')
        head_list.append(value)
    namelist = dataset[1].getElementsByTagName('ITEM')      #获取数据类型名称
    table_name = namelist[2].getAttribute("val")
    create_sql = """CREATE TABLE IF NOT EXISTS """ + table_name + " (" + " text,".join(head_list) + " text) "   #拼接SQL语句
    return head_list, table_name, create_sql


def write_mysql(header_name, table_name, create_sql, file):
    #通过游标的方式执行sql,创建数据表,可以指定列数据类型
    con = pymysql.connect(host=mysql_host, port=mysql_port, user=mysql_user, password=mysql_passwd, database=mysql_db)
    cur = con.cursor()
    try:
        cur.execute(create_sql)
        print("create table " + table_name + " successfully!")
    except Exception as e:
        print("create table " + table_name + " fail! because of :", e)
    finally:
        con.close()
    # 读取文件时,默认第一行为标题,设置header=None,表示没有标题,写表头name=['a','b','c']设置列标题
    data = pd.read_table(file, header=None, names=header_name)
    # 连接MySql数据库, //后的参数为: 用户名, 密码, 主机, 数据库名
    engine = create_engine(
        "mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset={5}".format(mysql_user, mysql_passwd, mysql_host, mysql_port,
                                                                 mysql_db, mysql_charset))
    # 块方式写MySQL,使用to_sql()方法插入数据,  参数1: 表的名字,建议不要用中文, con: 之前创建的连接对象,if_exists:"replace"表名存在, 则删掉重建,"append"表名存在, 则会追加数据
    try:
        data.to_sql(table_name, con=engine, index=False, if_exists="append")
        print("insert data to " + table_name + " successfully!")
    except Exception as error:
        print("insert data to table " + table_name + " fail! because of :", error)

#全局变量,指定MySQL连接参数
mysql_host = '数据库IP地址'
mysql_port = 3306
mysql_user = '数据库登录名'
mysql_passwd = '数据库登录密码'
mysql_charset = 'utf8mb4'
# 执行脚本默认读取同级data下数据,若传入指定--dir目录名参数,则处理指定目录下数据,指定--db参数,则将数据存入指定数据库
parser = argparse.ArgumentParser()
parser.add_argument("--dir", default="data/")
parser.add_argument("--db", default="pushData")
args = parser.parse_args()
uzip_dir = args.dir
mysql_db = args.db
find_zip(uzip_dir)
上一篇下一篇

猜你喜欢

热点阅读