python解压zip包、解析xml、读文件写mysql的实现
2021-09-23 本文已影响0人
沉思的雨季
一、前言:
如图所示,工作中需要对大量zip包数据进行分析,数据内容以制表符分列逐行写入bcp文件中,列属性名和数据类型名在xml文件中描述,通过python程序将zip包中的数据写入MySQL数据库,实现大批量数据快速分析的需求。
二、实现过程
1、读取目录及子目录中所有的zip文件
def find_zip(zip_path):
zip_list = []
for root, dirs, files in os.walk(zip_path): # 递归取出目录及子目录下的所有文件
for f in files:
if f.endswith(".zip"):
print(f)
zip_list.append(os.path.join(root, f))
for file in zip_list:
unzip_dir = un_zip(file)
deal_file(unzip_dir)
shutil.rmtree(unzip_dir) # 删除文件夹下所有的文件、文件夹
2、解压缩zip文件到同名目录
def un_zip(file_name):
zip_file = zipfile.ZipFile(file_name)
if os.path.isdir(file_name.split(".")[0]): #创建同名目录
pass
else:
os.mkdir(file_name.split(".")[0])
for names in zip_file.namelist(): #zip文件解压缩
zip_file.extract(names, file_name.split(".")[0])
zip_file.close()
return file_name.split(".")[0]
3、解析xml文件,获取数据类型名和数据属性名列表
def deal_xml(xml_file):
# 打开xml文档
dom = minidom.parse(xml_file)
# 得到文档元素对象
root = dom.documentElement
# 根据标签名获取元素节点,是个列表
dataset = root.getElementsByTagName('DATASET')
# 根据标签名获取指定元素节点下属子节点
item_list = dataset[3].getElementsByTagName('ITEM')
head_list = []
for item in item_list: #获取数据列属性名称
value = item.getAttribute("eng").replace(' ', '').replace('/', '_')
head_list.append(value)
namelist = dataset[1].getElementsByTagName('ITEM') #获取数据类型名称
table_name = namelist[2].getAttribute("val")
create_sql = """CREATE TABLE IF NOT EXISTS """ + table_name + " (" + " text,".join(head_list) + " text) " #拼接SQL语句
return head_list, table_name, create_sql
4、创建MySQL连接,新建数据表,并以块方式写入数据
def write_mysql(header_name, table_name, create_sql, file):
con = pymysql.connect(host=mysql_host, port=mysql_port, user=mysql_user, password=mysql_passwd, database=mysql_db)
cur = con.cursor()
try:
cur.execute(create_sql)
print("create table " + table_name + " successfully!")
except Exception as e:
print("create table " + table_name + " fail! because of :", e)
finally:
con.close()
# 读取文件时,默认第一行为标题,设置header=None,表示没有标题,写表头name=['a','b','c']设置列标题
data = pd.read_table(file, header=None, names=header_name)
# 连接MySql数据库, //后的参数为: 用户名, 密码, 主机, 数据库名
engine = create_engine(
"mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset={5}".format(mysql_user, mysql_passwd, mysql_host, mysql_port,
mysql_db, mysql_charset))
# 使用to_sql()方法插入数据, 参数1: 表的名字,建议不要用中文, con: 之前创建的连接对象,if_exists:"replace"表名存在, 则删掉重建,"append"表名存在, 则会追加数据
try:
data.to_sql(table_name, con=engine, index=False, if_exists="append")
print("insert data to " + table_name + " successfully!")
except Exception as error:
print("insert data to table " + table_name + " fail! because of :", error)
三、实例代码
在代码实例中引入公共python模块,定义MySQL连接参数,实现命令行参数化。
#!/usr/bin/env python3
# -*- coding:utf8 -*-
import os
import sys
import glob
import zipfile
import shutil
import argparse
import pymysql
import pandas as pd
from xml.dom import minidom
from sqlalchemy import create_engine
def find_zip(zip_path):
zip_list = []
for root, dirs, files in os.walk(zip_path): # 递归取出目录及子目录下的所有文件
for f in files:
if f.endswith(".zip"):
print(f)
zip_list.append(os.path.join(root, f))
for file in zip_list:
unzip_dir = un_zip(file)
deal_file(unzip_dir)
shutil.rmtree(unzip_dir) # 删除文件夹下所有的文件、文件夹
def un_zip(file_name):
zip_file = zipfile.ZipFile(file_name)
if os.path.isdir(file_name.split(".")[0]):
pass
else:
os.mkdir(file_name.split(".")[0])
for names in zip_file.namelist():
zip_file.extract(names, file_name.split(".")[0])
zip_file.close()
return file_name.split(".")[0]
def deal_file(file_dir):
xml_file = glob.glob(os.path.join(file_dir, "*.xml"))
xml_result = deal_xml(xml_file[0]
header_name = xml_result[0]
table_name = xml_result[1]
create_sql = xml_result[2]
bcp_file = glob.glob(os.path.join(file_dir, "*.bcp"))
file = bcp_file[0]
write_mysql(header_name, table_name, create_sql, file)
def deal_xml(xml_file):
# 打开xml文档
dom = minidom.parse(xml_file)
# 得到文档元素对象
root = dom.documentElement
# 根据标签名获取元素节点,是个列表
dataset = root.getElementsByTagName('DATASET')
# 根据标签名获取指定元素节点下属子节点
item_list = dataset[3].getElementsByTagName('ITEM')
head_list = []
for item in item_list: #获取数据列属性名称
value = item.getAttribute("eng").replace(' ', '').replace('/', '_')
head_list.append(value)
namelist = dataset[1].getElementsByTagName('ITEM') #获取数据类型名称
table_name = namelist[2].getAttribute("val")
create_sql = """CREATE TABLE IF NOT EXISTS """ + table_name + " (" + " text,".join(head_list) + " text) " #拼接SQL语句
return head_list, table_name, create_sql
def write_mysql(header_name, table_name, create_sql, file):
#通过游标的方式执行sql,创建数据表,可以指定列数据类型
con = pymysql.connect(host=mysql_host, port=mysql_port, user=mysql_user, password=mysql_passwd, database=mysql_db)
cur = con.cursor()
try:
cur.execute(create_sql)
print("create table " + table_name + " successfully!")
except Exception as e:
print("create table " + table_name + " fail! because of :", e)
finally:
con.close()
# 读取文件时,默认第一行为标题,设置header=None,表示没有标题,写表头name=['a','b','c']设置列标题
data = pd.read_table(file, header=None, names=header_name)
# 连接MySql数据库, //后的参数为: 用户名, 密码, 主机, 数据库名
engine = create_engine(
"mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset={5}".format(mysql_user, mysql_passwd, mysql_host, mysql_port,
mysql_db, mysql_charset))
# 块方式写MySQL,使用to_sql()方法插入数据, 参数1: 表的名字,建议不要用中文, con: 之前创建的连接对象,if_exists:"replace"表名存在, 则删掉重建,"append"表名存在, 则会追加数据
try:
data.to_sql(table_name, con=engine, index=False, if_exists="append")
print("insert data to " + table_name + " successfully!")
except Exception as error:
print("insert data to table " + table_name + " fail! because of :", error)
#全局变量,指定MySQL连接参数
mysql_host = '数据库IP地址'
mysql_port = 3306
mysql_user = '数据库登录名'
mysql_passwd = '数据库登录密码'
mysql_charset = 'utf8mb4'
# 执行脚本默认读取同级data下数据,若传入指定--dir目录名参数,则处理指定目录下数据,指定--db参数,则将数据存入指定数据库
parser = argparse.ArgumentParser()
parser.add_argument("--dir", default="data/")
parser.add_argument("--db", default="pushData")
args = parser.parse_args()
uzip_dir = args.dir
mysql_db = args.db
find_zip(uzip_dir)