Python连接MYSQL数据库类
2021-01-29 本文已影响0人
yichen_china
pymysql.connect()参数说明
字段 | 说明 |
---|---|
host(str): | MySQL服务器地址 |
port(int): | MySQL服务器端口号 |
user(str): | 用户名 |
passwd(str): | 密码 |
db(str): | 数据库名称 |
charset(str): | 连接编码、 |
connection对象支持的方法
字段 | 说明 |
---|---|
cursor() | 使用该连接创建并返回游标 |
commit() | 提交当前事务 |
rollback() | 回滚当前事务 |
close() | 关闭连接 |
cursor对象支持的方法
字段 | 说明 |
---|---|
execute(op) | 执行一个数据库的查询命令 |
fetchone() | 取得结果集的下一行 |
fetchmany(size) | 获取结果集的下几行 |
fetchall() | 获取结果集中的所有行 |
rowcount() | 返回数据条数或影响行数 |
close() | 关闭游标对象 |
DBHelper.py:
#!D:\Programs\Python\Python37
# -*- coding:utf-8 -*-
# @Author : Echo
# @Time :2021/1/29 10:12
import pymysql
import logging
import sys
from config.database import db_config
# 加入日志
# 获取logger实例
logger = logging.getLogger("baseSpider")
# 指定输出格式
formatter = logging.Formatter('%(asctime)s\
%(levelname)-8s:%(message)s')
# 文件日志
file_handler = logging.FileHandler("operation_database.log")
file_handler.setFormatter(formatter)
# 控制台日志
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
# 为logge添加具体的日志处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.setLevel(logging.INFO)
class DBHelper():
# 构造函数,初始化数据库连接
def __init__(self,sql,params=None):
self.sql = sql
self.params = params
self.conn = None
self.cur = None
def connectiondatabase(self):
print(db_config['host'],db_config['username'],db_config['password'],db_config['database'],db_config['charset'])
try:
self.conn = pymysql.connect(host=db_config['host'],user=db_config['username'],
passwd=db_config['password'],db=db_config['database'],charset=db_config['charset'])
except:
logger.error("connectDatabase failed")
return False
self.cur = self.conn.cursor()
return True
# 关闭数据库
def closedatabase(self):
# 如果数据打开,则关闭;否则没有操作
if self.conn and self.cur:
self.cur.close()
self.conn.close()
return True
# 执行数据库的sq语句,主要用来做插入操作
def execute(self):
self.connectiondatabase()
try:
if self.conn and self.cur:
# 正常逻辑,执行sql,提交操作
self.cur.execute(self.sql,self.params)
self.conn.commit()
except:
logger.error("execute failed: " + self.sql)
logger.error("params: " + self.params)
self.closedatabase()
return False
return True
# 用来查询表数据
def select(self):
self.connectiondatabase()
# 执行SQL语句
b=self.cur.execute(self.sql,self.params)
# print(self.cur.description) # 获取字段列表
result = self.cur.fetchall()
#返回值和数据表字段组成json格式
lists = []
t = 0
for x in result:
i = 0
onelist = {}
for field in self.cur.description:
onelist[field[0]] = x[i]
i = i + 1
lists.append(onelist)
# for q in lists:
# print(q['update_time'])
# # print(q['data'])
# # print(result)
return lists
配置信息 db_config.py
db_config = {
'host':'127.0.0.1',
'port':3306,
'username':'root',
'password':'admin',
'database':'DataBase',
'charset':'utf8'
}
测试代码:
from mySQL import DBHelper as mdb
def main():
# 连接 mysql,获取连接的对象
SQL=r'SELECT * FROM TUsers'
db=mdb.DBHelper(SQL)
rows=db.select()
# 依次遍历结果集,发现每个元素,就是表中的一条记录,用一个元组来显示
for row in rows:
print(row)
if __name__ == '__main__':
main()