python导入数据工具
2022-02-15 本文已影响0人
flyinskybiu
前段时间为了ICP证书,写了一个以node.js为后台,vue为前台页面的小项目。众所周知在小的项目它也要数据来支撑,这不是被逼着捣鼓如何快速的导入数据到数据库,我这种方法不一定是最好的,但是在现阶段我觉的是最有意思的。
项目比较简单,大佬勿喷,第一次用python写东西多多包涵!有错误的地方希望大神指点。
简单的介绍一下使用到库
- pymysql:用来链接操作数据库
- xlwt :导出xls文件
- pandas:数据清理,及插入(非常强大的一个库)
话不多说直接上代码
#!/usr/bin/python3
import pymysql
import requests
import json
import xlwt
import os
import shutil
import pandas as pd
import qtui.dao.path as path
from sqlalchemy import create_engine
class DatabaseAccess():
def __init__(self) -> None:
print("初始成功,数据库连接信息")
# self.isConnectionOpen()
self.__db_host = "localhost"
self.__db_port = 3306
self.__db_user = "root"
self.__db_password = "123456"
self.__db_database = "storedb"
# 爬取数据
def getdataforurl(self):
print("爬取数据,自己写吧,我扒的就不展示了")
# 链接数据库
def isConnectionOpen(self):
self.__db = pymysql.connect(
host=self.__db_host,
port=self.__db_port,
user=self.__db_user,
password=self.__db_password,
database=self.__db_database,
charset='utf8'
)
# 插入数据
def linesinsert(self, data):
try:
# 连接数据库
self.isConnectionOpen()
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = self.__db.cursor()
# 插入数据语句 简单的sql语句 插入数据库(后面用pandas好像更方便在handle_data方法中)
# 软件供应
# query = """insert into software (id, software_introduction, applicable_industries,category,contacts,display_title,mail_box,qq,software_price,team_city,team_joining_time,team_logo,team_name,telephone,vps_table_name) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
# values = (data["id"], data["software_introduction"], data["applicable_industries"], data["category"], data["contacts"], data["display_title"], data["mail_box"],
# data["qq"], data["software_price"], data["team_city"], data["team_joining_time"], data["team_logo"], data["team_name"], data["telephone"], data["vps_table_name"])
# 找团队
# query = """insert into team (id, case_work, city,contacts,display_title,image,joining_time,mail_box,qq,specialty,team_introduction,telephone,vps_table_name,name_of_employer) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
# values = (data["id"], data["case_work"], data["city"], data["contacts"], data["display_title"], data["image"], data["joining_time"], data["mail_box"],
# data["qq"], data["specialty"], data["team_introduction"], data["telephone"], data["vps_table_name"], "Sky666")
# 找项目
query = """insert into project (id, city,display_title,deadline,development_cycle,project_budget,employer_joining_time,end_of_bidding,item_no,project_classification,project_introduction,project_status,release_time,vps_table_name,image_header,name_of_employer) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
values = (data["id"], data["city"], data["display_title"], data["deadline"], data["development_cycle"], data["project_budget"],
data["employer_joining_time"], data["end_of_bidding"], data["item_no"], data["project_classification"], data["project_introduction"], data["project_status"], data["release_time"], data["vps_table_name"], "http://www.hfwish.com/theme/wrf_wbfb_v2/images/1-191031162301Y9.jpg", "Sky666")
cursor.execute(query, values)
except Exception as e:
print(e)
finally:
# 关闭数据库连接
cursor.close()
self.__db.commit()
self.__db.close()
# 查询所有表的名称
def check_table_name(self):
print("查询表名中....")
results = []
try:
self.isConnectionOpen()
with self.__db.cursor() as cursor:
sql = '''SHOW TABLES'''
cursor.execute(sql)
result = cursor.fetchall()
for i in range(len(result)):
results.append(result[i][0])
except Exception as e:
print(e)
finally:
# 关闭数据库连接
cursor.close()
self.__db.commit()
self.__db.close()
print("查询成功==========================")
return results
# 根据表名导出示例模板
def export(self, table_name):
print("导出模板")
try:
# 连接数据库
self.isConnectionOpen()
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = self.__db.cursor()
sql = 'select * from %s;' % table_name
cursor.execute(sql) # 执行sql
fileds = [filed[0] for filed in cursor.description] # 所有的字段
all_data = cursor.fetchall()
book = xlwt.Workbook(encoding='utf-8')
# 背景色--淡绿色
patternLightGreen = xlwt.Pattern()
patternLightGreen.pattern = xlwt.Pattern.SOLID_PATTERN
patternLightGreen.pattern_fore_colour = 42
styleLightGreen = xlwt.XFStyle()
styleLightGreen.pattern = patternLightGreen
sheet = book.add_sheet(table_name)
for col, filed in enumerate(fileds): # 写表头的,
sheet.write(0, col, filed, styleLightGreen) # xls表头
# print(all_data)
if len(all_data) > 0:
row = 1 # 第一行
for col, filed in enumerate(all_data[len(all_data)-1]): # 控制列
sheet.write(row, col, filed)
book.save('%s模板.xls' % table_name)
bPath = os.getcwd()
# 获取当前文件路径
file_path = os.path.join(bPath, '%s模板.xls' %
table_name)
# 移动文件到E盘地方
target_path = os.path.join(path.GetDir.get_BASE_DIR3("dataHook"))
# 使用shutil包的move方法移动文件
shutil.move(file_path, target_path)
print("导出%s表模板成功!" % table_name,
"------------------------------------------文件路劲:E:\ICP\ICP-Date\dataHook")
finally:
# 关闭数据库连接
cursor.close()
self.__db.commit()
self.__db.close()
# 根据表名导出所有数据
def exportAll(self, table_name):
print("导出表结构")
try:
# 连接数据库
self.isConnectionOpen()
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = self.__db.cursor()
sql = 'select * from %s;' % table_name
cursor.execute(sql) # 执行sql
fileds = [filed[0] for filed in cursor.description] # 所有的字段
all_data = cursor.fetchall()
book = xlwt.Workbook(encoding='utf-8')
# 背景色--淡绿色
patternLightGreen = xlwt.Pattern()
patternLightGreen.pattern = xlwt.Pattern.SOLID_PATTERN
patternLightGreen.pattern_fore_colour = 42
styleLightGreen = xlwt.XFStyle()
styleLightGreen.pattern = patternLightGreen
sheet = book.add_sheet('sheet1')
for col, filed in enumerate(fileds): # 写表头的,
sheet.write(0, col, filed, styleLightGreen) # xls表头
# print(all_data)
row = 1 # 第一行
for data in all_data: # 行
for col, filed in enumerate(data): # 控制列
sheet.write(row, col, filed)
row += 1 # 每次写完一行,行数加一
# print("导出%s表成功!" % table_name,
# "------------------------------------------")
book.save('%s.xls' % table_name)
bPath = os.getcwd()
# 获取当前文件路径
file_path = os.path.join(bPath, '%s.xls' % table_name)
# 移动文件到E盘地方
target_path = os.path.join(path.GetDir.get_BASE_DIR3("dataHook"))
# 使用shutil包的move方法移动文件
shutil.move(file_path, target_path)
print("导出%s表成功!" % table_name,
"------------------------------------------文件路劲:E:\ICP\ICP-Date\dataHook")
finally:
# 关闭数据库连接
cursor.close()
self.__db.commit()
self.__db.close()
# 查询数据
def check_date(self):
# 重新建立数据库连接
self.isConnectionOpen()
cursor = self.__db.cursor()
# 查询数据库并打印内容
cursor.execute('''select * from catering_sale''')
results = cursor.fetchall()
for row in results:
print(row)
# 关闭
cursor.close()
self.__db.commit()
self.__db.close()
# 读取xls
def read_excl(self, table_name):
print("开始读取文件")
allFile = []
try:
allFile = path.GetDir.file_name(
path.GetDir.get_BASE_DIR3("inserData"), table_name+"模板")
except Exception as e:
print("读取文件报错==========================")
print(e)
finally:
if len(allFile) == 0:
print("未查询到%s模板.xls文件" % table_name, "请检查inserData文件夹")
return False
else:
print("所有%s.xls文件路径" % table_name, allFile)
# select = input("查询到多个文件,请选择第几个? ")
return allFile[0]
# 读取根据文件路径处理数据
def handle_data(self, paths, name):
# 开始清理数据==============================
print("开始清理数据==============================path:", paths)
rPath = os.path.join(paths)
print("rPath", rPath)
df = pd.read_excel(rPath) # 使用pd读取数据
# 清洗数据: 没有列头, 缺失值,空行,重复数据,非ASCII 字符(没弄),
df.dropna(axis=0, how='any', inplace=True) # 删除有空行
df.drop_duplicates("id", "first", inplace=True) # 删除Id重复数据0
print(df.to_string())
print("清理数据完成===================================最终数据")
print("开始插入数据库=================================START")
try:
self.isConnectionOpen()
db_info = {"host": self.__db_host,
"port": self.__db_port,
"user": self.__db_user,
"password": self.__db_password,
"database": self.__db_database,
"charset": 'utf8'}
engine = create_engine(
'mysql+pymysql://%(user)s:%(password)s@%(host)s:%(port)d/%(database)s?charset=utf8' % db_info, encoding='utf-8')
df.to_sql(name, con=engine,
if_exists='append', index=False)
target_path = os.path.join(path.GetDir.get_BASE_DIR3("finalData"))
target_path = target_path + "%s.csv" % name
df.to_csv(target_path, encoding='utf-8', index=False) # 写成csv文件保存
except Exception as e:
print(e, "异常数据")
finally:
# 关闭数据库连接
self.__db.commit()
self.__db.close()
print("插入数据结束")
if __name__ == "__main__":
# 创建实例化对象
db = DatabaseAccess()
# db.linesinsert()
# db.check_date()
# db.getdataforurl()
one = input("是否有匹配的模板数据表请输入是/否:")
if one == '是':
table_name = db.check_table_name()
print("当前表:", table_name)
name = input("输入你要导入的表名称:")
paths = db.read_excl(name)
if paths == False:
print("-------------需要把对应表的数据.xls格式模板放入inserData文件夹下------------------------")
else:
db.handle_data(paths, name)
else:
table_name = db.check_table_name()
print("当前表:", table_name)
name = input("输入你要导出模板的表名称:")
db.export(name)
# table_name = db.check_table_name()
# print("当前表:", table_name)
# name = input("选择你要导出的表名称:")
# 根据表名导出示例模板
# db.export(name)
# 根据表名导出说有数据
# db.exportAll("team")
# 根据表名插入数据
# paths = db.read_excl("team")
# db.handle_data(paths, "team")
# f = open(r"e:/ICP/ICP-Date/dataHook/team.xls", 'rb')
# its_code = f.read()
# print(chardet.detect(its_code))